Skip to content

Commit 6901445

Browse files
committed
MQTT5 support.
1 parent 2ba5912 commit 6901445

File tree

8 files changed

+844
-132
lines changed

8 files changed

+844
-132
lines changed

src/MQTTClient.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, siz
5656
}
5757

5858
static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_string_t topic,
59-
lwmqtt_message_t message) {
59+
lwmqtt_message_t message, lwmqtt_serialized_properties_t props) {
6060
// get callback
6161
auto cb = (MQTTClientCallback *)ref;
6262

@@ -72,7 +72,7 @@ static void MQTTClientHandler(lwmqtt_client_t * /*client*/, void *ref, lwmqtt_st
7272

7373
// call the advanced callback and return if available
7474
if (cb->advanced != nullptr) {
75-
cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len);
75+
cb->advanced(cb->client, terminated_topic, (char *)message.payload, (int)message.payload_len, props);
7676
return;
7777
}
7878

@@ -118,7 +118,7 @@ MQTTClient::~MQTTClient() {
118118
free(this->writeBuf);
119119
}
120120

121-
void MQTTClient::begin(const char hostname[], int port, Client &client) {
121+
void MQTTClient::begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol) {
122122
// set hostname and port
123123
this->setHost(hostname, port);
124124

@@ -127,6 +127,7 @@ void MQTTClient::begin(const char hostname[], int port, Client &client) {
127127

128128
// initialize client
129129
lwmqtt_init(&this->client, this->writeBuf, this->bufSize, this->readBuf, this->bufSize);
130+
lwmqtt_set_protocol(&this->client, protocol);
130131

131132
// set timers
132133
lwmqtt_set_timers(&this->client, &this->timer1, &this->timer2, lwmqtt_arduino_timer_set, lwmqtt_arduino_timer_get);
@@ -222,7 +223,8 @@ void MQTTClient::setOptions(int keepAlive, bool cleanSession, int timeout) {
222223
this->timeout = (uint32_t)timeout;
223224
}
224225

225-
bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos) {
226+
bool MQTTClient::publish(const char topic[], const char payload[], int length, bool retained, int qos,
227+
lwmqtt_properties_t props) {
226228
// return immediately if not connected
227229
if (!this->connected()) {
228230
return false;
@@ -236,7 +238,7 @@ bool MQTTClient::publish(const char topic[], const char payload[], int length, b
236238
message.qos = lwmqtt_qos_t(qos);
237239

238240
// publish message
239-
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, this->timeout);
241+
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, props, this->timeout);
240242
if (this->_lastError != LWMQTT_SUCCESS) {
241243
// close connection
242244
this->close();
@@ -294,14 +296,15 @@ bool MQTTClient::connect(const char clientId[], const char username[], const cha
294296
return true;
295297
}
296298

297-
bool MQTTClient::subscribe(const char topic[], int qos) {
299+
bool MQTTClient::subscribe(const char topic[], int qos, lwmqtt_properties_t props) {
298300
// return immediately if not connected
299301
if (!this->connected()) {
300302
return false;
301303
}
302304

303305
// subscribe to topic
304-
this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), (lwmqtt_qos_t)qos, this->timeout);
306+
lwmqtt_sub_options_t subopts = {.qos = (lwmqtt_qos_t)qos};
307+
this->_lastError = lwmqtt_subscribe_one(&this->client, lwmqtt_string(topic), subopts, props, this->timeout);
305308
if (this->_lastError != LWMQTT_SUCCESS) {
306309
// close connection
307310
this->close();
@@ -312,14 +315,14 @@ bool MQTTClient::subscribe(const char topic[], int qos) {
312315
return true;
313316
}
314317

315-
bool MQTTClient::unsubscribe(const char topic[]) {
318+
bool MQTTClient::unsubscribe(const char topic[], lwmqtt_properties_t props) {
316319
// return immediately if not connected
317320
if (!this->connected()) {
318321
return false;
319322
}
320323

321324
// unsubscribe from topic
322-
this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), this->timeout);
325+
this->_lastError = lwmqtt_unsubscribe_one(&this->client, lwmqtt_string(topic), props, this->timeout);
323326
if (this->_lastError != LWMQTT_SUCCESS) {
324327
// close connection
325328
this->close();
@@ -368,14 +371,14 @@ bool MQTTClient::connected() {
368371
return this->netClient != nullptr && this->netClient->connected() == 1 && this->_connected;
369372
}
370373

371-
bool MQTTClient::disconnect() {
374+
bool MQTTClient::disconnect(uint8_t reason, lwmqtt_properties_t props) {
372375
// return immediately if not connected anymore
373376
if (!this->connected()) {
374377
return false;
375378
}
376379

377380
// cleanly disconnect
378-
this->_lastError = lwmqtt_disconnect(&this->client, this->timeout);
381+
this->_lastError = lwmqtt_disconnect(&this->client, reason, props, this->timeout);
379382

380383
// close
381384
this->close();

src/MQTTClient.h

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ typedef struct {
2323
class MQTTClient;
2424

2525
typedef void (*MQTTClientCallbackSimple)(String &topic, String &payload);
26-
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length);
26+
typedef void (*MQTTClientCallbackAdvanced)(MQTTClient *client, char topic[], char bytes[], int length,
27+
lwmqtt_serialized_properties_t props);
2728

2829
typedef struct {
2930
MQTTClient *client = nullptr;
@@ -50,7 +51,7 @@ class MQTTClient {
5051
lwmqtt_arduino_network_t network = {nullptr};
5152
lwmqtt_arduino_timer_t timer1 = {0, nullptr};
5253
lwmqtt_arduino_timer_t timer2 = {0, nullptr};
53-
lwmqtt_client_t client = {0};
54+
lwmqtt_client_t client = {LWMQTT_MQTT311, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
5455

5556
bool _connected = false;
5657
lwmqtt_return_code_t _returnCode = (lwmqtt_return_code_t)0;
@@ -61,8 +62,9 @@ class MQTTClient {
6162

6263
~MQTTClient();
6364

64-
void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client); }
65-
void begin(const char hostname[], int port, Client &client);
65+
void begin(const char hostname[], Client &client) { this->begin(hostname, 1883, client, LWMQTT_MQTT311); }
66+
void begin(const char hostname[], int port, Client &client) { this->begin(hostname, port, client, LWMQTT_MQTT311); }
67+
void begin(const char hostname[], int port, Client &client, lwmqtt_protocol_t protocol);
6668

6769
void onMessage(MQTTClientCallbackSimple cb);
6870
void onMessageAdvanced(MQTTClientCallbackAdvanced cb);
@@ -91,6 +93,10 @@ class MQTTClient {
9193
bool publish(const String &topic, const String &payload, bool retained, int qos) {
9294
return this->publish(topic.c_str(), payload.c_str(), retained, qos);
9395
}
96+
bool publish(const String &topic, const String &payload, bool retained, int qos, lwmqtt_properties_t props) {
97+
return this->publish(topic.c_str(), payload.c_str(), payload.length(), retained, qos, props);
98+
}
99+
94100
bool publish(const char topic[], const String &payload) { return this->publish(topic, payload.c_str()); }
95101
bool publish(const char topic[], const String &payload, bool retained, int qos) {
96102
return this->publish(topic, payload.c_str(), retained, qos);
@@ -104,23 +110,41 @@ class MQTTClient {
104110
bool publish(const char topic[], const char payload[], int length) {
105111
return this->publish(topic, payload, length, false, 0);
106112
}
107-
bool publish(const char topic[], const char payload[], int length, bool retained, int qos);
113+
bool publish(const char topic[], const char payload[], int length, bool retained, int qos) {
114+
lwmqtt_properties_t props = lwmqtt_empty_props;
115+
return this->publish(topic, payload, length, retained, qos, props);
116+
}
117+
118+
bool publish(const char topic[], const char payload[], int length, bool retained, int qos, lwmqtt_properties_t props);
108119

109120
bool subscribe(const String &topic) { return this->subscribe(topic.c_str()); }
110121
bool subscribe(const String &topic, int qos) { return this->subscribe(topic.c_str(), qos); }
111122
bool subscribe(const char topic[]) { return this->subscribe(topic, 0); }
112-
bool subscribe(const char topic[], int qos);
123+
bool subscribe(const char topic[], int qos) {
124+
lwmqtt_properties_t props = lwmqtt_empty_props;
125+
return this->subscribe(topic, qos, props);
126+
}
127+
bool subscribe(const char topic[], int qos, lwmqtt_properties_t props);
113128

114129
bool unsubscribe(const String &topic) { return this->unsubscribe(topic.c_str()); }
115-
bool unsubscribe(const char topic[]);
130+
bool unsubscribe(const char topic[]) {
131+
lwmqtt_properties_t props = lwmqtt_empty_props;
132+
return this->unsubscribe(topic, props);
133+
}
134+
bool unsubscribe(const char topic[], lwmqtt_properties_t props);
116135

117136
bool loop();
118137
bool connected();
119138

120139
lwmqtt_err_t lastError() { return this->_lastError; }
121140
lwmqtt_return_code_t returnCode() { return this->_returnCode; }
122141

123-
bool disconnect();
142+
bool disconnect() {
143+
lwmqtt_properties_t props = lwmqtt_empty_props;
144+
return this->disconnect(0, props);
145+
}
146+
147+
bool disconnect(uint8_t reason, lwmqtt_properties_t props);
124148

125149
private:
126150
void close();

0 commit comments

Comments
 (0)