feat: Connection packet and multi socket

Added Packet Interface with headers
Added Connection Packet base class
Added multi connection socket based on threads
This commit is contained in:
brenodetomini
2024-01-24 20:53:53 -03:00
parent 6c4f723de7
commit 03dfb18e27
22 changed files with 281 additions and 177 deletions

View File

@@ -9,4 +9,4 @@ target_sources(libcore
)
target_include_directories(libcore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(libcore PRIVATE spdlog)
target_link_libraries(libcore PRIVATE spdlog libmqttd)

View File

@@ -4,9 +4,7 @@ ConnectionListener::ConnectionListener(int port) {
this->connection_listener_port = port;
}
ConnectionListener::~ConnectionListener() {
this->stop();
}
ConnectionListener::~ConnectionListener() { this->stop(); }
void ConnectionListener::start() {
this->listener = std::thread{&ConnectionListener::listen, this};
@@ -20,7 +18,7 @@ void ConnectionListener::stop() {
}
void ConnectionListener::join() {
if(this->listener.joinable())
if (this->listener.joinable())
this->listener.join();
}
@@ -35,26 +33,31 @@ void ConnectionListener::listen() {
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(this->connection_listener_port);
int rc = bind(connection_listener_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if(rc != 0) {
int rc = bind(connection_listener_socket, (struct sockaddr *)&serverAddr,
sizeof(serverAddr));
if (rc != 0) {
int errno_tmp = errno;
spdlog::error("Failed to bind socket: " + std::string(strerror(errno_tmp)));
exit(ExitCode::BIND_ERROR);
}
rc = ::listen(connection_listener_socket, 50);
if (rc != 0) {
int errno_tmp = errno;
spdlog::error("Failed to listen on socket: " + std::string(strerror(errno_tmp)));
spdlog::error("Failed to listen on socket: " +
std::string(strerror(errno_tmp)));
exit(ExitCode::LISTEN_ERROR);
}
this->is_listening = true;
spdlog::info("Listening on port " + std::to_string(this->connection_listener_port));
spdlog::info("Listening on port " +
std::to_string(this->connection_listener_port));
while (this->is_listening) {
addr_size = sizeof(serverStorage);
int new_connection = accept(connection_listener_socket, (struct sockaddr *)&serverStorage, &addr_size);
int new_connection_socket =
accept(connection_listener_socket, (struct sockaddr *)&serverStorage,
&addr_size);
Session session(new_connection_socket);
}
}

View File

@@ -9,6 +9,8 @@
#include <spdlog/spdlog.h>
#include <exit_codes.hpp>
#include <session.hpp>
class ConnectionListener{
public:
@@ -24,6 +26,8 @@ private:
int connection_listener_port;
std::thread listener;
// std::vector<Session> current_sessions;
void listen();
};

View File

@@ -3,9 +3,10 @@
enum ExitCode {
SUCCESS = 0,
ERROR=1,
BIND_ERROR=101,
LISTEN_ERROR=102
ERROR = 1,
BIND_ERROR = 101,
LISTEN_ERROR = 102,
RECV_ERROR = 103,
};
#endif // INCLUDE_CORE_EXIT_CODES_HPP_
#endif // INCLUDE_CORE_EXIT_CODES_HPP_

View File

@@ -6,4 +6,7 @@ target_sources(libmqttd
PUBLIC ${HPP_FILES_LOCAL}
)
add_subdirectory(packet_interface)
add_subdirectory(connection)
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -1,9 +0,0 @@
#include <connect_packet.hpp>
ConnectPacket::ConnectPacket() : ControlPacket() {
return;
}
ConnectPacket::~ConnectPacket() {
return;
}

View File

@@ -1,71 +0,0 @@
#ifndef INCLUDE_NETWORK_CONNECT_PACKET_HPP_
#define INCLUDE_NETWORK_CONNECT_PACKET_HPP_
#include <control_packet.hpp>
#include <cstdint>
#include <variable_byte_int.hpp>
class ConnectPacket : public ControlPacket {
public:
ConnectPacket();
~ConnectPacket();
inline void set_protocol_name(std::string value) {
this->variable_header.protocol_name = value;
};
inline void set_protocol_version(std::byte value) {
this->variable_header.protocol_version = value;
};
inline bool get_clean_start() const {
return this->variable_header.connect_flags[1];
};
inline void set_clean_start(bool value) {
this->variable_header.connect_flags[1] = value;
};
inline bool get_will_flag() const {
return this->variable_header.connect_flags[2];
};
inline void set_will_flag(bool value) {
this->variable_header.connect_flags[2] = value;
};
inline uint get_will_qos() const {
return static_cast<uint_fast8_t>(
(this->variable_header.connect_flags >> 3).to_ulong());
};
inline bool get_will_retain() const {
return this->variable_header.connect_flags[5];
};
inline void set_will_retain(bool value) {
this->variable_header.connect_flags[5] = value;
};
inline bool get_password_flag() const {
return this->variable_header.connect_flags[6];
};
inline void set_password_flag(bool value) {
this->variable_header.connect_flags[6] = value;
}
inline bool get_username_flag() const {
return this->variable_header.connect_flags[7];
};
inline void set_username_flag(bool value) {
this->variable_header.connect_flags[7] = value;
};
private:
struct {
uint16_t length;
std::string protocol_name;
std::byte protocol_version;
std::bitset<8> connect_flags;
uint16_t keep_alive;
} variable_header;
};
#endif // INCLUDE_NETWORK_CONNECT_PACKET_HPP_

View File

@@ -0,0 +1,9 @@
FILE(GLOB CPP_FILES_LOCAL CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES_LOCAL CONFIGURE_DEPENDS *.hpp)
target_sources(libmqttd
PRIVATE ${CPP_FILES_LOCAL}
PUBLIC ${HPP_FILES_LOCAL}
)
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -0,0 +1,14 @@
#include "connect_packet.hpp"
ConnectPacket::ConnectPacket(PacketInterface &packet) : PacketInterface(packet) {
}
ConnectPacket::~ConnectPacket() {
}
ConnectPacket::ConnectPacket(std::vector<std::byte> data) : PacketInterface(data) {
}

View File

@@ -0,0 +1,18 @@
#ifndef INCLUDE_CONNECTION_CONNECT_PACKET_HPP_
#define INCLUDE_CONNECTION_CONNECT_PACKET_HPP_
#include <packet_interface/packet_interface.hpp>
#include <vector>
class ConnectPacket : public PacketInterface{
public:
ConnectPacket(PacketInterface&);
ConnectPacket(std::vector<std::byte>);
~ConnectPacket();
private:
};
#endif // INCLUDE_CONNECTION_CONNECT_PACKET_HPP_

View File

@@ -1,51 +0,0 @@
#ifndef INCLUDE_NETWORK_CONTROL_PACKET_CPP_
#define INCLUDE_NETWORK_CONTROL_PACKET_CPP_
#include <bitset>
#include <control_packet_type.hpp>
#include <variable_byte_int.hpp>
class ControlPacket {
public:
ControlPacket() : fixed_header(){};
~ControlPacket(){};
inline ControlPacketType get_packet_type() const {
return this->fixed_header.packet_type;
};
inline void set_packet_type(int value) {
this->fixed_header.packet_type = static_cast<ControlPacketType>(value);
};
inline std::bitset<4> get_packet_flags() const {
return this->fixed_header.packet_flags;
};
inline void set_packet_flags(std::bitset<4> value) {
this->fixed_header.packet_flags = value;
};
inline int_vb get_remaining_length() const {
return this->fixed_header.remaining_length;
};
inline void set_remaining_length(int value) {
this->fixed_header.remaining_length = int_vb(value);
};
inline uint length() const {
return sizeof(this->fixed_header) + this->fixed_header.remaining_length;
}
private:
struct fixed_header_t {
ControlPacketType packet_type;
std::bitset<4> packet_flags;
int_vb remaining_length;
fixed_header_t()
: packet_type(static_cast<ControlPacketType>(0)), packet_flags(0),
remaining_length(0) {}
} fixed_header;
};
#endif // INCLUDE_NETWORK_CONTROL_PACKET_CPP_

View File

@@ -1,23 +0,0 @@
#ifndef INCLUDE_NETWORK_CONTROL_PACKET_TYPE_HPP_
#define INCLUDE_NETWORK_CONTROL_PACKET_TYPE_HPP_
enum ControlPacketType {
RESERVERD = 0,
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
AUTH = 15
};
#endif // INCLUDE_NETWORK_CONTROL_PACKET_TYPE_HPP_

View File

@@ -0,0 +1,9 @@
FILE(GLOB CPP_FILES CONFIGURE_DEPENDS *.cpp)
FILE(GLOB HPP_FILES CONFIGURE_DEPENDS *.hpp)
target_sources(libmqttd
PRIVATE ${CPP_FILES}
PUBLIC ${HPP_FILES}
)
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -0,0 +1,34 @@
#ifndef INCLUDE_PACKET_INTERFACE_FIXED_HEADER_HPP_
#define INCLUDE_PACKET_INTERFACE_FIXED_HEADER_HPP_
#include "variable_byte_int.hpp"
#include <bitset>
typedef struct fixed_header fixed_header_t;
enum class PacketType : uint32_t {
RESERVERD = 0,
CONNECT = 1,
CONNACK = 2,
PUBLISH = 3,
PUBACK = 4,
PUBREC = 5,
PUBREL = 6,
PUBCOMP = 7,
SUBSCRIBE = 8,
SUBACK = 9,
UNSUBSCRIBE = 10,
UNSUBACK = 11,
PINGREQ = 12,
PINGRESP = 13,
DISCONNECT = 14,
AUTH = 15
};
struct fixed_header {
PacketType packet_type = PacketType(0);
std::bitset<4> packet_flags = {0};
int_vb remaining_length = int_vb(0);
};
#endif // INCLUDE_PACKET_INTERFACE_FIXED_HEADER_HPP_

View File

@@ -0,0 +1,21 @@
#include "packet_interface.hpp"
PacketInterface::PacketInterface(const std::vector<std::byte> &data) {
unsigned char fixed_header_packet_type = (static_cast<unsigned char>(data[0]) >> 4) & 0x0F;
this->fixed_header.packet_type = PacketType(fixed_header_packet_type);
unsigned char fixed_header_packet_flags = (static_cast<unsigned char>(data[0]) & 0x0F);
this->fixed_header.packet_flags = fixed_header_packet_flags;
this->fixed_header.remaining_length = int_vb(static_cast<unsigned int>(data[1]));
this->variable_header.reset(new variable_header_t);
}
PacketInterface::PacketInterface(PacketInterface &packet) {
this->fixed_header.packet_type = packet.fixed_header.packet_type;
this->fixed_header.packet_flags = packet.fixed_header.packet_flags;
this->fixed_header.remaining_length = packet.fixed_header.remaining_length;
this->variable_header = std::move(packet.variable_header);
}

View File

@@ -0,0 +1,28 @@
#ifndef INCLUDE_NETWORK_PACKET_INTERFACE_HPP_
#define INCLUDE_NETWORK_PACKET_INTERFACE_HPP_
#include "fixed_header.hpp"
#include "variable_header.hpp"
#include <memory>
#include <variable_byte_int.hpp>
#include <vector>
class PacketInterface {
public:
PacketInterface(const std::vector<std::byte>&);
PacketInterface(PacketInterface&);
~PacketInterface() = default;
inline uint length() const {
return sizeof(this->fixed_header) + this->fixed_header.remaining_length;
}
inline PacketType get_packet_type() const { return this->fixed_header.packet_type; };
inline std::bitset<4> get_packet_flags() const { return this->fixed_header.packet_flags; };
protected:
fixed_header_t fixed_header;
std::unique_ptr<variable_header_t> variable_header;
};
#endif // INCLUDE_NETWORK_PACKET_INTERFACE_HPP_

View File

@@ -0,0 +1,50 @@
#ifndef INCLUDE_PACKET_INTERFACE_VARIABLE_HEADER_HPP_
#define INCLUDE_PACKET_INTERFACE_VARIABLE_HEADER_HPP_
#include "variable_byte_int.hpp"
#include <map>
#include <variant>
enum class PropertyIdentifier : uint32_t {
PAYLOAD_FORMAT_INDICATOR = 1,
NESSAGE_EXPIRY_INTERVAL = 2,
CONTENT_TYPE = 3,
RESPONSE_TOPIC = 8,
CORRELATION_DATA = 9,
SUBSCRIPTION_IDENTIFIER = 11,
SESSION_EXPIRY_INTERVAL = 17,
ASSIGEND_CLIENT_IDENTIFIER = 18,
SEREVR_KEEP_ALIVE = 19,
AUTHENTICATION_METHOD = 21,
AUTHENTICATION_DATA = 22,
REQUEST_PROBLEM_INFORMATION = 23,
WILL_DELAY_INTERVAL = 24,
REQUEST_RESPONSE_INFORMATION = 25,
RESPONSE_INFORMATION = 26,
SERVER_REFERENCE = 28,
REASON_STRING = 31,
RECEIVE_MAXIMUM = 33,
TOPIC_ALIAS_MAXIMUM = 34,
TOPIC_ALIAS = 35,
MAXIMUM_QOS = 36,
RETAIN_AVAILABLE = 37,
USER_PROPERTY = 38,
MAXIMUM_PACKET_SIZE = 39,
WILCARD_SUBSCRIPTION_AVAILABLE = 40,
SUBSCRIPTION_IDENTIFIER_AVAILABLE = 41,
SHARED_SUBSCRIPTION_AVAILABLE = 42
};
using PropertyValue = std::variant<bool, uint32_t, std::string>;
using PropertyMap = std::map<PropertyIdentifier, PropertyValue>;
struct variable_header {
uint16_t packet_identifier = 0;
int_vb property_length = int_vb(0);
PropertyMap properties;
};
typedef struct variable_header variable_header_t;
#endif // INCLUDE_PACKET_INTERFACE_VARIABLE_HEADER_HPP_

View File

@@ -5,5 +5,5 @@ target_sources(libmqttd
PRIVATE ${CPP_FILES_LOCAL}
PUBLIC ${HPP_FILES_LOCAL}
)
target_link_libraries(libmqttd PRIVATE spdlog)
target_include_directories(libmqttd PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})

View File

@@ -1,3 +1,48 @@
#include <session.hpp>
Session::Session(const int &socket_fd) {
this->socket = socket_fd;
this->start();
}
Session::~Session() {
this->stop();
}
void Session::start() {
this->is_session_active = true;
this->communication_thread = std::thread{&Session::listen, this};
this->join();
}
void Session::stop() {
this->is_session_active = false;
shutdown(this->socket, SHUT_RDWR);
close(this->socket);
this->join();
}
void Session::join() {
if (this->communication_thread.joinable())
this->communication_thread.join();
if (this->keepalive_thread.joinable())
this->keepalive_thread.join();
}
void Session::listen() {
std::vector<std::byte> buffer(buffer_size);
while (this->is_active()) {
int bytes_received = recv(this->socket, buffer.data(), buffer.size(), 0);
if (bytes_received <= 0) {
int errno_tmp = errno;
spdlog::error("Failed to receive data on socket: " +
std::string(strerror(errno_tmp)));
return;
}
PacketInterface packet(buffer);
spdlog::info(static_cast<int>(packet.get_packet_type()));
}
}

View File

@@ -2,16 +2,36 @@
#define INCLUDE_PROTOCOL_SESSION_HPP_
#include <string>
#include <thread>
#include <sys/socket.h>
#include <unistd.h>
#include <array>
#include <spdlog/spdlog.h>
#include <packet_interface.hpp>
const unsigned int buffer_size = 2048;
class Session {
public:
Session(int);
Session();
Session(const int& socket_fd);
~Session();
void start();
void stop();
void join();
inline bool is_active() const { return this->is_session_active; };
private:
bool is_session_active;
int socket;
std::string client_id;
time_t last_keepalive;
std::thread communication_thread;
std::thread keepalive_thread;
void listen();
};
#endif // INCLUDE_PROTOCOL_SESSION_HPP_

View File

@@ -3,7 +3,6 @@
#include "version.hpp"
#include <backtrace.hpp>
#include <connect_packet.hpp>
#include <connection_listener.hpp>
#include <set>
@@ -11,7 +10,7 @@ ConnectionListener clistener;
void termination_handler(int signal_num) {
spdlog::info("Received signal " + std::string(strsignal(signal_num)));
std::set<unsigned int> backtrace_signals = {SIGILL, SIGBUS, SIGFPE, SIGSEGV};
std::set<unsigned int> backtrace_signals = {SIGILL, SIGBUS, SIGFPE, SIGSEGV, SIGABRT};
auto is_backtrace_signal = backtrace_signals.find(signal_num);
if (is_backtrace_signal != backtrace_signals.end()) {

View File

@@ -1,5 +1,5 @@
#define MQTTD_VERSION_MAJOR 0
#define MQTTD_VERSION_MINOR 0
#define MQTTD_VERSION_PATCH 1
#define MQTTD_COMMIT_HASH 91e34f07dce2db74ce336244668d65988fb106d4
#define MQTTD_BUILD_TIMESTAMP 1706107831
#define MQTTD_COMMIT_HASH 6c4f723de7a83fd35793aadab4e8457db4d04c7c
#define MQTTD_BUILD_TIMESTAMP 1706138231