feat: added multiple connection listener and connection packet

Added multiple connection listener
Added Control Packet and Connection Packet
Added Backtrace
This commit is contained in:
brenodetomini
2024-01-24 11:54:31 -03:00
parent 91e34f07dc
commit 6c4f723de7
22 changed files with 457 additions and 64 deletions

View File

@@ -5,8 +5,8 @@ set(ENV{TARGET} "x86_64-linux-gnu")
set(CMAKE_CXX_FLAGS "-Wall -Wextra")
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0")
set(CMAKE_CXX_FLAGS_RELEASE "-O3")
set(CMAKE_CXX_FLAGS_DEBUG "-g2 -O0")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -g2")
list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake")
enable_testing()

View File

@@ -1,7 +1,8 @@
include(GetUnwind)
include(GetSpdlog)
add_subdirectory(libmqttd)
add_subdirectory(core)
add_executable(mqttd mqttd.cpp)
target_link_libraries(mqttd PRIVATE libmqttd libunwind::static spdlog)
target_link_libraries(mqttd PRIVATE libcore libmqttd spdlog)
target_include_directories(mqttd PUBLIC "${PROJECT_BINARY_DIR}")

15
src/core/CMakeLists.txt Normal file
View File

@@ -0,0 +1,15 @@
add_library(libcore "")
add_subdirectory(backtrace)
add_subdirectory(broker)
FILE(GLOB CPP_FILES CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES CONFIGURE_DEPENDS *.hpp)
target_sources(libcore
PRIVATE ${CPP_FILES}
PUBLIC ${HPP_FILES}
)
target_include_directories(libcore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -0,0 +1,14 @@
FILE(GLOB CPP_FILES CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES CONFIGURE_DEPENDS *.hpp)
include(GetUnwind)
target_sources(libcore
PRIVATE ${CPP_FILES}
PUBLIC ${HPP_FILES}
)
target_link_libraries(libcore PRIVATE libunwind::static)
target_include_directories(libcore PUBLIC $<TARGET_PROPERTY:libunwind::static,INTERFACE_INCLUDE_DIRECTORIES> ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -0,0 +1,34 @@
#include "backtrace.hpp"
void backtrace::backtrace() {
unw_cursor_t cursor;
unw_context_t context;
unw_getcontext(&context);
unw_init_local(&cursor, &context);
int n = 0;
while (unw_step(&cursor)) {
unw_word_t ip, sp, off;
unw_get_reg(&cursor, UNW_REG_IP, &ip);
unw_get_reg(&cursor, UNW_REG_SP, &sp);
char symbol[256] = {"<unknown>"};
char *name = symbol;
if (!unw_get_proc_name(&cursor, symbol, sizeof(symbol), &off)) {
int status;
if ((name = abi::__cxa_demangle(symbol, NULL, NULL, &status)) == 0)
name = symbol;
}
printf("#%-2d 0x%016" PRIxPTR " sp=0x%016" PRIxPTR " %s + 0x%" PRIxPTR "\n",
++n, static_cast<uintptr_t>(ip), static_cast<uintptr_t>(sp), name,
static_cast<uintptr_t>(off));
if (name != symbol)
free(name);
}
}

View File

@@ -0,0 +1,22 @@
#ifndef BACKTRACE_H
#define BACKTRACE_H
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#define UNW_LOCAL_ONLY
#include <libunwind.h>
#include <cxxabi.h>
#include <stdio.h>
#include <stdlib.h>
#include <dlfcn.h>
#include <string.h>
namespace backtrace {
void backtrace();
}
#endif /* end of include guard: BACKTRACE_H */

View File

@@ -0,0 +1,12 @@
FILE(GLOB CPP_FILES CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES CONFIGURE_DEPENDS *.hpp)
include(GetSpdlog)
target_sources(libcore
PRIVATE ${CPP_FILES}
PUBLIC ${HPP_FILES}
)
target_include_directories(libcore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(libcore PRIVATE spdlog)

View File

@@ -0,0 +1,60 @@
#include <connection_listener.hpp>
ConnectionListener::ConnectionListener(int port) {
this->connection_listener_port = port;
}
ConnectionListener::~ConnectionListener() {
this->stop();
}
void ConnectionListener::start() {
this->listener = std::thread{&ConnectionListener::listen, this};
}
void ConnectionListener::stop() {
this->is_listening = false;
shutdown(this->connection_listener_socket, SHUT_RDWR);
close(this->connection_listener_socket);
this->join();
}
void ConnectionListener::join() {
if(this->listener.joinable())
this->listener.join();
}
void ConnectionListener::listen() {
struct sockaddr_in serverAddr;
struct sockaddr_storage serverStorage;
socklen_t addr_size;
connection_listener_socket = socket(AF_INET, SOCK_STREAM, 0);
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(this->connection_listener_port);
int rc = bind(connection_listener_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if(rc != 0) {
int errno_tmp = errno;
spdlog::error("Failed to bind socket: " + std::string(strerror(errno_tmp)));
exit(ExitCode::BIND_ERROR);
}
rc = ::listen(connection_listener_socket, 50);
if (rc != 0) {
int errno_tmp = errno;
spdlog::error("Failed to listen on socket: " + std::string(strerror(errno_tmp)));
exit(ExitCode::LISTEN_ERROR);
}
this->is_listening = true;
spdlog::info("Listening on port " + std::to_string(this->connection_listener_port));
while (this->is_listening) {
addr_size = sizeof(serverStorage);
int new_connection = accept(connection_listener_socket, (struct sockaddr *)&serverStorage, &addr_size);
}
}

View File

@@ -0,0 +1,30 @@
#ifndef INCLUDE_BROKER_CONNECTION_LISTENER_HPP_
#define INCLUDE_BROKER_CONNECTION_LISTENER_HPP_
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/un.h>
#include <spdlog/spdlog.h>
#include <exit_codes.hpp>
class ConnectionListener{
public:
ConnectionListener(int port = 1883);
~ConnectionListener();
void start();
void stop();
void join();
private:
bool is_listening;
int connection_listener_socket;
int connection_listener_port;
std::thread listener;
void listen();
};
#endif // INCLUDE_BROKER_CONNECTION_LISTENER_HPP_

11
src/core/exit_codes.hpp Normal file
View File

@@ -0,0 +1,11 @@
#ifndef INCLUDE_CORE_EXIT_CODES_HPP_
#define INCLUDE_CORE_EXIT_CODES_HPP_
enum ExitCode {
SUCCESS = 0,
ERROR=1,
BIND_ERROR=101,
LISTEN_ERROR=102
};
#endif // INCLUDE_CORE_EXIT_CODES_HPP_

View File

@@ -1 +1,6 @@
add_library(libmqttd "")
target_include_directories(libmqttd PUBLIC "${PROJECT_BINARY_DIR}")
add_subdirectory(types)
add_subdirectory(network)
add_subdirectory(protocol)

View File

@@ -0,0 +1,9 @@
FILE(GLOB CPP_FILES_LOCAL CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES_LOCAL CONFIGURE_DEPENDS *.hpp)
target_sources(libmqttd
PRIVATE ${CPP_FILES_LOCAL}
PUBLIC ${HPP_FILES_LOCAL}
)
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -0,0 +1,9 @@
#include <connect_packet.hpp>
ConnectPacket::ConnectPacket() : ControlPacket() {
return;
}
ConnectPacket::~ConnectPacket() {
return;
}

View File

@@ -0,0 +1,71 @@
#ifndef INCLUDE_NETWORK_CONNECT_PACKET_HPP_
#define INCLUDE_NETWORK_CONNECT_PACKET_HPP_
#include <control_packet.hpp>
#include <cstdint>
#include <variable_byte_int.hpp>
class ConnectPacket : public ControlPacket {
public:
ConnectPacket();
~ConnectPacket();
inline void set_protocol_name(std::string value) {
this->variable_header.protocol_name = value;
};
inline void set_protocol_version(std::byte value) {
this->variable_header.protocol_version = value;
};
inline bool get_clean_start() const {
return this->variable_header.connect_flags[1];
};
inline void set_clean_start(bool value) {
this->variable_header.connect_flags[1] = value;
};
inline bool get_will_flag() const {
return this->variable_header.connect_flags[2];
};
inline void set_will_flag(bool value) {
this->variable_header.connect_flags[2] = value;
};
inline uint get_will_qos() const {
return static_cast<uint_fast8_t>(
(this->variable_header.connect_flags >> 3).to_ulong());
};
inline bool get_will_retain() const {
return this->variable_header.connect_flags[5];
};
inline void set_will_retain(bool value) {
this->variable_header.connect_flags[5] = value;
};
inline bool get_password_flag() const {
return this->variable_header.connect_flags[6];
};
inline void set_password_flag(bool value) {
this->variable_header.connect_flags[6] = value;
}
inline bool get_username_flag() const {
return this->variable_header.connect_flags[7];
};
inline void set_username_flag(bool value) {
this->variable_header.connect_flags[7] = value;
};
private:
struct {
uint16_t length;
std::string protocol_name;
std::byte protocol_version;
std::bitset<8> connect_flags;
uint16_t keep_alive;
} variable_header;
};
#endif // INCLUDE_NETWORK_CONNECT_PACKET_HPP_

View File

@@ -0,0 +1,51 @@
#ifndef INCLUDE_NETWORK_CONTROL_PACKET_CPP_
#define INCLUDE_NETWORK_CONTROL_PACKET_CPP_
#include <bitset>
#include <control_packet_type.hpp>
#include <variable_byte_int.hpp>
class ControlPacket {
public:
ControlPacket() : fixed_header(){};
~ControlPacket(){};
inline ControlPacketType get_packet_type() const {
return this->fixed_header.packet_type;
};
inline void set_packet_type(int value) {
this->fixed_header.packet_type = static_cast<ControlPacketType>(value);
};
inline std::bitset<4> get_packet_flags() const {
return this->fixed_header.packet_flags;
};
inline void set_packet_flags(std::bitset<4> value) {
this->fixed_header.packet_flags = value;
};
inline int_vb get_remaining_length() const {
return this->fixed_header.remaining_length;
};
inline void set_remaining_length(int value) {
this->fixed_header.remaining_length = int_vb(value);
};
inline uint length() const {
return sizeof(this->fixed_header) + this->fixed_header.remaining_length;
}
private:
struct fixed_header_t {
ControlPacketType packet_type;
std::bitset<4> packet_flags;
int_vb remaining_length;
fixed_header_t()
: packet_type(static_cast<ControlPacketType>(0)), packet_flags(0),
remaining_length(0) {}
} fixed_header;
};
#endif // INCLUDE_NETWORK_CONTROL_PACKET_CPP_

View File

@@ -0,0 +1,23 @@
#ifndef INCLUDE_NETWORK_CONTROL_PACKET_TYPE_HPP_
#define INCLUDE_NETWORK_CONTROL_PACKET_TYPE_HPP_
enum ControlPacketType {
RESERVERD = 0,
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
AUTH = 15
};
#endif // INCLUDE_NETWORK_CONTROL_PACKET_TYPE_HPP_

View File

@@ -0,0 +1,9 @@
FILE(GLOB CPP_FILES_LOCAL CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES_LOCAL CONFIGURE_DEPENDS *.hpp)
target_sources(libmqttd
PRIVATE ${CPP_FILES_LOCAL}
PUBLIC ${HPP_FILES_LOCAL}
)
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -0,0 +1,3 @@
#include <session.hpp>

View File

@@ -0,0 +1,17 @@
#ifndef INCLUDE_PROTOCOL_SESSION_HPP_
#define INCLUDE_PROTOCOL_SESSION_HPP_
#include <string>
class Session {
public:
Session(int);
~Session();
private:
int socket;
std::string client_id;
time_t last_keepalive;
};
#endif // INCLUDE_PROTOCOL_SESSION_HPP_

View File

@@ -1,5 +1,10 @@
FILE(GLOB CPP_FILES CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES CONFIGURE_DEPENDS *.hpp)
FILE(GLOB CPP_FILES_LOCAL CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES_LOCAL CONFIGURE_DEPENDS *.hpp)
target_sources(libmqttd
PRIVATE ${CPP_FILES_LOCAL}
PUBLIC ${HPP_FILES_LOCAL}
)
add_library(libmqttd ${CPP_FILES})
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -1,70 +1,62 @@
#include "version.hpp"
#include <csignal>
#include <iostream>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "version.hpp"
#include <backtrace.hpp>
#include <connect_packet.hpp>
#include <connection_listener.hpp>
#include <set>
#define UNW_LOCAL_ONLY
#include <libunwind.h>
ConnectionListener clistener;
#include <cxxabi.h>
void termination_handler(int signal_num) {
spdlog::info("Received signal " + std::string(strsignal(signal_num)));
std::set<unsigned int> backtrace_signals = {SIGILL, SIGBUS, SIGFPE, SIGSEGV};
#include <stdio.h>
#include <stdlib.h>
auto is_backtrace_signal = backtrace_signals.find(signal_num);
if (is_backtrace_signal != backtrace_signals.end()) {
backtrace::backtrace();
exit(ExitCode::ERROR);
}
void backtrace()
{
unw_cursor_t cursor;
unw_context_t context;
exit(ExitCode::SUCCESS);
}
unw_getcontext(&context);
unw_init_local(&cursor, &context);
void set_signal_handlers() {
std::set<unsigned int> termination_signals = {
SIGILL, SIGBUS, SIGFPE, SIGSEGV, SIGSYS,
SIGABRT, SIGHUP, SIGINT, SIGQUIT, SIGTERM};
int n=0;
while ( unw_step(&cursor) ) {
unw_word_t ip, sp, off;
// Blocking signals with set
sigset_t block_mask;
sigemptyset(&block_mask);
sigprocmask(SIG_BLOCK, &block_mask, NULL);
for (auto it = termination_signals.begin(); it != termination_signals.end();
it++) {
sigaddset(&block_mask, *it);
}
unw_get_reg(&cursor, UNW_REG_IP, &ip);
unw_get_reg(&cursor, UNW_REG_SP, &sp);
char symbol[256] = {"<unknown>"};
char *name = symbol;
if ( !unw_get_proc_name(&cursor, symbol, sizeof(symbol), &off) ) {
int status;
if ( (name = abi::__cxa_demangle(symbol, NULL, NULL, &status)) == 0 )
name = symbol;
}
printf("#%-2d 0x%016" PRIxPTR " sp=0x%016" PRIxPTR " %s + 0x%" PRIxPTR "\n",
++n,
static_cast<uintptr_t>(ip),
static_cast<uintptr_t>(sp),
name,
static_cast<uintptr_t>(off));
if ( name != symbol )
free(name);
struct sigaction sigHandler;
memset(&sigHandler, 0, sizeof(sigHandler));
sigHandler.sa_handler = termination_handler;
sigHandler.sa_mask = block_mask;
sigHandler.sa_flags = 0;
for (auto it = termination_signals.begin(); it != termination_signals.end();
it++) {
sigaction(*it, &sigHandler, NULL);
}
}
int fac(int n)
{
if ( n == 0 ) {
backtrace();
return 1;
} else {
return n*fac(n-1);
}
}
int main(int argc, char *argv[]) {
int main() {
std::cout << "MQTTd Version " << MQTTD_VERSION_MAJOR << "."
<< MQTTD_VERSION_MINOR << "."
<< MQTTD_VERSION_PATCH << std::endl;
fac(10);
<< MQTTD_VERSION_MINOR << "." << MQTTD_VERSION_PATCH << std::endl;
set_signal_handlers();
clistener.start();
clistener.join();
return EXIT_SUCCESS;
return 1;
}

View File

@@ -1,5 +1,5 @@
#define MQTTD_VERSION_MAJOR 0
#define MQTTD_VERSION_MINOR 0
#define MQTTD_VERSION_PATCH 0
#define MQTTD_COMMIT_HASH 770aef77f8415b2f856e2b1bedcf3d44be877b1f
#define MQTTD_BUILD_TIMESTAMP 1706013355
#define MQTTD_VERSION_PATCH 1
#define MQTTD_COMMIT_HASH 91e34f07dce2db74ce336244668d65988fb106d4
#define MQTTD_BUILD_TIMESTAMP 1706107831