fix: fixing int_vb and connect_ack

Fixing default constructor of int_vb to initalize values to 0
Implementing basic connect ACK
Implemeting basic send function in session
This commit is contained in:
brenodetomini
2024-02-11 13:29:01 -03:00
parent fd49879ba9
commit 886ee7f281
14 changed files with 82 additions and 46 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
# VScode
.vscode/
# CMake
.cache/
build/

View File

@@ -1,32 +1,36 @@
#include "packet_interface.hpp"
#include "fixed_header.hpp"
#include "packet_interface.hpp"
#include "property.hpp"
#include <connect_ack.hpp>
#include <cstddef>
#include <vector>
ConnectACK::ConnectACK() : PacketInterface(){
ConnectACK::ConnectACK() : PacketInterface() {
PacketInterface::fixed_header.packet_type = PacketType::CONNACK;
PacketInterface::fixed_header.packet_flags = 0;
// There is a obrigatory reason code and connect acknowledge flags after fixed header, thus we are garanteed to have at least 2 bytes
PacketInterface::fixed_header.remaining_length = 2;
};
void set_properties(const MQTTProperties &value) {
void ConnectACK::add_property(const PropertyIdentifier &prop, const PropertyValue &value) {
this->properties[prop] = value;
}
std::vector<std::byte> ConnectACK::as_bytes() const {
std::vector<std::byte> byte_vector;
byte_vector = PacketInterface::as_bytes();
if(session_present)
byte_vector.push_back(std::byte(1));
std::vector<std::byte> ConnectACK::as_bytes() {
std::vector<std::byte> variable_header_bytes;
if (session_present)
variable_header_bytes.emplace_back(std::byte(1));
else
byte_vector.push_back(std::byte(0));
variable_header_bytes.emplace_back(std::byte(0));
byte_vector.push_back(std::byte(reason_code));
variable_header_bytes.emplace_back(std::byte(reason_code));
std::vector<std::byte> properties_bytes = properties.as_bytes();
byte_vector.insert(byte_vector.end(), properties_bytes.begin(), properties_bytes.end());
variable_header_bytes.insert(variable_header_bytes.end(), properties_bytes.begin(), properties_bytes.end());
return byte_vector;
PacketInterface::fixed_header.remaining_length = variable_header_bytes.size();
std::vector<std::byte> fixed_header_bytes = PacketInterface::as_bytes();
fixed_header_bytes.insert(fixed_header_bytes.end(), variable_header_bytes.begin(), variable_header_bytes.end());
return fixed_header_bytes;
}

View File

@@ -36,8 +36,8 @@ public:
void set_session_present(const bool &value) { this->session_present = value; };
void set_reason_code(const ConnectReasonCode &value) { this->reason_code = value; };
void set_properties(const MQTTProperties &value);
std::vector<std::byte> as_bytes() const;
void add_property(const PropertyIdentifier&, const PropertyValue&);
std::vector<std::byte> as_bytes();
private:
//Variable Header

View File

@@ -51,6 +51,7 @@ ConnectionFlagValue ConnectPacket::get_connection_flag(ConnectFlags flag) const
return static_cast<bool>(utilities::bit::get(connection_flags_value, flag_pos));
}
std::string ConnectPacket::as_string() const {
std::ostringstream packet_str;

View File

@@ -30,7 +30,7 @@ public:
ConnectionFlagValue get_connection_flag(ConnectFlags flag) const;
std::string as_string() const final;
std::vector<std::byte> as_bytes() const final { throw NotImplemented(); };
std::vector<std::byte> as_bytes() final { throw NotImplemented(); };
private:
void parse_variable_header() final;

View File

@@ -40,7 +40,7 @@ std::string PacketInterface::as_string() const {
return packet_str.str();
}
std::vector<std::byte> PacketInterface::as_bytes() const {
std::vector<std::byte> PacketInterface::as_bytes() {
std::vector<std::byte> packet_bytes;
uint8_t flags_value = static_cast<uint8_t>(fixed_header.packet_flags.to_ulong());
@@ -48,7 +48,7 @@ std::vector<std::byte> PacketInterface::as_bytes() const {
first_byte = static_cast<uint16_t>(fixed_header.packet_type) << 4; // Shift de 4 bits para deixar espaço para os flags
first_byte |= flags_value;
packet_bytes.push_back(static_cast<std::byte>(first_byte));
packet_bytes.emplace_back(static_cast<std::byte>(first_byte));
std::vector<std::byte> remaining_length_bytes = fixed_header.remaining_length.as_bytes();
packet_bytes.insert(packet_bytes.end(), remaining_length_bytes.begin(), remaining_length_bytes.end());

View File

@@ -21,7 +21,7 @@ public:
inline std::bitset<4> get_packet_flags() const { return this->fixed_header.packet_flags; };
virtual std::string as_string() const;
virtual std::vector<std::byte> as_bytes() const;
virtual std::vector<std::byte> as_bytes();
protected:
std::vector<std::byte> raw_data;

View File

@@ -13,7 +13,10 @@
#include <type_traits>
#include <vector>
MQTTProperties::MQTTProperties() { this->properties[PropertyIdentifier::USER_PROPERTY] = UserProperty(); }
MQTTProperties::MQTTProperties() {
this->properties[PropertyIdentifier::USER_PROPERTY] = UserProperty();
this->length = 0;
}
MQTTProperties::MQTTProperties(const std::vector<std::byte>::const_iterator &property_start) {
this->properties[PropertyIdentifier::USER_PROPERTY] = UserProperty();
@@ -239,6 +242,11 @@ std::vector<std::byte> MQTTProperties::as_bytes() const {
return properties_bytes;
}
PropertyValue& MQTTProperties::operator[](PropertyIdentifier prop) {
return this->properties[prop];
this->length = int(this->length)+ 1;
}
struct VariantPrinter {
std::ostream &os;
template <typename T> void operator()(const T &value) const { os << value; }

View File

@@ -5,6 +5,7 @@
#include <cstddef>
#include <cstdint>
#include <map>
#include <ostream>
#include <queue>
#include <spdlog/spdlog.h>
#include <types.hpp>
@@ -65,6 +66,7 @@ public:
friend std::ostream &operator<<(std::ostream &os, const std::byte &value);
friend std::ostream &operator<<(std::ostream &os, const MQTTProperties &value);
PropertyValue &operator[](PropertyIdentifier);
private:
int_vb length;

View File

@@ -1,7 +1,12 @@
#include "connect_ack.hpp"
#include "connect_packet.hpp"
#include "property.hpp"
#include "spdlog/spdlog.h"
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <session.hpp>
#include <sstream>
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
@@ -34,6 +39,16 @@ void Session::join() {
this->keepalive_thread.join();
}
std::size_t Session::send(const std::vector<std::byte> &buffer) {
std::ostringstream log_msg;
log_msg << "Sending " << buffer.size() << " bytes";
spdlog::trace(log_msg.str());
::send(this->socket, buffer.data(), buffer.size(), 0);
return buffer.size();
}
void Session::listen() {
std::vector<std::byte> buffer(buffer_size);
@@ -41,11 +56,10 @@ void Session::listen() {
buffer.clear();
buffer.resize(buffer_size);
int bytes_received = recv(this->socket, buffer.data(), buffer.size(), 0);
std::size_t bytes_received = recv(this->socket, buffer.data(), buffer.size(), 0);
if (bytes_received < 0) {
int errno_tmp = errno;
spdlog::error("Failed to receive data on socket: " +
std::string(strerror(errno_tmp)));
spdlog::error("Failed to receive data on socket: " + std::string(strerror(errno_tmp)));
return;
}
@@ -53,18 +67,22 @@ void Session::listen() {
switch (packet.get_packet_type()) {
case PacketType::CONNECT: {
ConnectPacket conn_packet(packet);
spdlog::trace(conn_packet.as_string());
// spdlog::trace(conn_packet.as_string());
// ConnectACK ack;
// ack.set_reason_code(ConnectReasonCode::SUCCESS);
// std::vector<std::byte> ack_bytes = ack.as_bytes();
// send(this->socket, ack_bytes.data(), ack_bytes.size(), 0);
ConnectACK ack;
ack.set_reason_code(ConnectReasonCode::SUCCESS);
// ack.add_property(PropertyIdentifier::SESSION_EXPIRY_INTERVAL, PropertyValue(uint32_t(60)));
// ack.add_property(PropertyIdentifier::RECEIVE_MAXIMUM, PropertyValue(uint16_t(60)));
// ack.add_property(PropertyIdentifier::MAXIMUM_QOS, PropertyValue(std::byte(0)));
// ack.add_property(PropertyIdentifier::RETAIN_AVAILABLE, PropertyValue(std::byte(0)));
// ack.add_property(PropertyIdentifier::MAXIMUM_PACKET_SIZE, PropertyValue(uint32_t(UINT32_MAX)));
std::vector<std::byte> ack_bytes = ack.as_bytes();
this->send(ack_bytes);
break;
}
default: {
spdlog::error("Received unknown packet type " +
std::to_string(static_cast<int>(packet.get_packet_type())));
spdlog::error("Received unknown packet type " + std::to_string(static_cast<int>(packet.get_packet_type())));
return;
}
}

View File

@@ -1,12 +1,14 @@
#ifndef INCLUDE_PROTOCOL_SESSION_HPP_
#define INCLUDE_PROTOCOL_SESSION_HPP_
#include <cstddef>
#include <packet_interface.hpp>
#include <spdlog/spdlog.h>
#include <string>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#include <vector>
const unsigned int buffer_size = 2048;
@@ -31,6 +33,7 @@ private:
std::thread keepalive_thread;
void listen();
std::size_t send(const std::vector<std::byte> &);
};
#endif // INCLUDE_PROTOCOL_SESSION_HPP_

View File

@@ -6,6 +6,8 @@
VariableByteInteger::VariableByteInteger() {
this->decoded_value = 0;
this->encoded_value = {std::byte(0)};
this->bytes_count = 1;
}
VariableByteInteger::VariableByteInteger(const uint32_t &value) {

View File

@@ -22,16 +22,13 @@ void termination_handler(int signal_num) {
}
void set_signal_handlers() {
std::set<unsigned int> termination_signals = {
SIGILL, SIGBUS, SIGFPE, SIGSEGV, SIGSYS,
SIGABRT, SIGHUP, SIGINT, SIGQUIT, SIGTERM};
std::set<unsigned int> termination_signals = {SIGILL, SIGBUS, SIGFPE, SIGSEGV, SIGSYS, SIGABRT, SIGHUP, SIGINT, SIGQUIT, SIGTERM};
// Blocking signals with set
sigset_t block_mask;
sigemptyset(&block_mask);
sigprocmask(SIG_BLOCK, &block_mask, NULL);
for (auto it = termination_signals.begin(); it != termination_signals.end();
it++) {
for (auto it = termination_signals.begin(); it != termination_signals.end(); it++) {
sigaddset(&block_mask, *it);
}
@@ -40,19 +37,17 @@ void set_signal_handlers() {
sigHandler.sa_handler = termination_handler;
sigHandler.sa_mask = block_mask;
sigHandler.sa_flags = 0;
for (auto it = termination_signals.begin(); it != termination_signals.end();
it++) {
for (auto it = termination_signals.begin(); it != termination_signals.end(); it++) {
sigaction(*it, &sigHandler, NULL);
}
}
int main() {
std::cout << "MQTTd Version " << MQTTD_VERSION_MAJOR << "."
<< MQTTD_VERSION_MINOR << "." << MQTTD_VERSION_PATCH << std::endl;
std::cout << "MQTTd Version " << MQTTD_VERSION_MAJOR << "." << MQTTD_VERSION_MINOR << "." << MQTTD_VERSION_PATCH << std::endl;
set_signal_handlers();
spdlog::set_level(spdlog::level::trace);
clistener.start();
clistener.join();

View File

@@ -1,5 +1,5 @@
#define MQTTD_VERSION_MAJOR 0
#define MQTTD_VERSION_MINOR 0
#define MQTTD_VERSION_PATCH 1
#define MQTTD_COMMIT_HASH 65302458650650a7d293cf3fbe8bbf65501f242e
#define MQTTD_BUILD_TIMESTAMP 1707058988
#define MQTTD_COMMIT_HASH fd49879ba9e777030db160223c904c842f2983a6
#define MQTTD_BUILD_TIMESTAMP 1707613195