diff --git a/README.md b/README.md index 023e70a..53ba60d 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,14 @@ void setOptions(int keepAlive, bool cleanSession, int timeout); - The `cleanSession` option controls the session retention on the broker side (default: true). - The `timeout` option controls the default timeout for all commands in milliseconds (default: 1000). +```c++ +void setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs); +``` + +- The `segmentLength` option controls maximum segment write length for each successive segmented write into network buffer (default: 65535). +- The `writeDelayMs` option controls delay in ms between each successive segmented write into network buffer (default: 0). +- This function controls the flow of data into the network buffer, helping to prevent buffer overflows and network congestion, particularly when handling large payloads. + Set a custom clock source "custom millis" callback to enable deep sleep applications: ```c++ diff --git a/src/MQTTClient.cpp b/src/MQTTClient.cpp index 5925ec2..fc2c4c3 100644 --- a/src/MQTTClient.cpp +++ b/src/MQTTClient.cpp @@ -91,7 +91,29 @@ inline lwmqtt_err_t lwmqtt_arduino_network_write(void *ref, uint8_t *buffer, siz auto n = (lwmqtt_arduino_network_t *)ref; // write bytes - *sent = n->client->write(buffer, len); + size_t partial_written = 0; + size_t written = 0; + + while (true) { + if (len - written > n->segmentLength) { + partial_written = n->client->write(buffer + written, n->segmentLength); + written += partial_written; + } + else { + partial_written = n->client->write(buffer + written, len - written); + written += partial_written; + break; + } + + delay(n->writeDelayMs); + + if (partial_written <= 0) { + return LWMQTT_NETWORK_FAILED_WRITE; + } + } + + *sent = written; + if (*sent <= 0) { return LWMQTT_NETWORK_FAILED_WRITE; } @@ -195,6 +217,9 @@ void MQTTClient::begin(Client &_client) { // set callback lwmqtt_set_callback(&this->client, (void *)&this->callback, MQTTClientHandler); + + this->network.segmentLength = this->_segmentLength; + this->network.writeDelayMs = this->_writeDelayMs; } void MQTTClient::onMessage(MQTTClientCallbackSimple cb) { @@ -314,6 +339,14 @@ void MQTTClient::setCleanSession(bool _cleanSession) { this->cleanSession = _cle void MQTTClient::setTimeout(int _timeout) { this->timeout = _timeout; } +void MQTTClient::setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs) { + this->_segmentLength = segmentLength; + this->_writeDelayMs = writeDelayMs; + + this->network.segmentLength = this->_segmentLength; + this->network.writeDelayMs = this->_writeDelayMs; +} + void MQTTClient::dropOverflow(bool enabled) { // configure drop overflow lwmqtt_drop_overflow(&this->client, enabled, &this->_droppedMessages); diff --git a/src/MQTTClient.h b/src/MQTTClient.h index 55071e2..9ea718e 100644 --- a/src/MQTTClient.h +++ b/src/MQTTClient.h @@ -41,6 +41,8 @@ typedef struct { typedef struct { Client *client; + size_t segmentLength; + uint32_t writeDelayMs; } lwmqtt_arduino_network_t; class MQTTClient; @@ -93,6 +95,9 @@ class MQTTClient { lwmqtt_err_t _lastError = (lwmqtt_err_t)0; uint32_t _droppedMessages = 0; + size_t _segmentLength = 65535; + uint32_t _writeDelayMs = 0; + public: void *ref = nullptr; @@ -141,6 +146,8 @@ class MQTTClient { this->setTimeout(_timeout); } + void setNetworkSegmentedWrite(size_t segmentLength, uint32_t writeDelayMs); + void dropOverflow(bool enabled); uint32_t droppedMessages() { return this->_droppedMessages; }