Skip to content

Commit bbc8465

Browse files
authored
🐎 Make client safer when TCP buffer is filled (marvinroger#48)
* 🐎 Implement initial code for safer ACKs * 🐎 Make ACKs safer Part of marvinroger#46 (PUBACK, PUBREC, PUBREL, PUBCOMP) * 🐎 Make CONNECT and DISCONNECT packets saf * 📝 Provide information on memory management Closes marvinroger#9 * 👕 Fix lint
1 parent 0d20a53 commit bbc8465

11 files changed

+240
-142
lines changed

docs/2.-API-reference.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,12 @@ Return if the client is currently connected to the broker or not.
122122

123123
Connect to the server.
124124

125-
#### void disconnect()
125+
#### void disconnect(bool `force` = false)
126126

127127
Disconnect from the server.
128128

129+
* **`force`**: Whether to force the disconnection. Defaults to `false` (clean disconnection).
130+
129131
#### uint16_t subscribe(const char\* `topic`, uint8_t `qos`)
130132

131133
Subscribe to the given topic at the given QoS.

docs/3.-Memory-management.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Memory management
2+
3+
AsyncMqttClient does not use an internal buffer, it uses the raw TCP buffer.
4+
5+
The max receive size is about 1460 bytes per call to your onMessage callback. But the amount of data you can receive is unlimited, as if you receive, say, a 300kB payload (such as an OTA payload), then your `onMessage` callback will be called about 200 times, with the according len, index and total parameters. Keep in mind the library will call your `onMessage` callbacks with the same topic buffer, so if you change the buffer on one call, the buffer will remain changed on subsequent calls.
6+
7+
You can send data as long as you stay below the available TCP window (which is about 3-4kB on the ESP8266). The data is indeed held in memory by the async TCP code until ACK is received. If the TCP window was sufficient to send your packet, the `publish` method will return a packet ID indicating the packet was sent. Otherwise, a `0` will be returned, and it's your responsability to resend the packet with `publish`.
File renamed without changes.

docs/index.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Welcome to the AsyncMqttClient for ESP8266 docs.
55
-----
66

77
#### 1. [Getting started](1.-Getting-started.md)
8-
#### 2. [API reference](2.-API-reference.md)
9-
#### 3. [Limitations and known issues](3.-Limitations-and-known-issues.md)
10-
#### 4. [Troubleshooting](4.-Troubleshooting.md)
8+
#### 2. [API reference](2.-API-reference.md)
9+
#### 3. [Memory management](3.-Memory-management.md)
10+
#### 4. [Limitations and known issues](4.-Limitations-and-known-issues.md)
11+
#### 5. [Troubleshooting](5.-Troubleshooting.md)

keywords.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ setCredentials KEYWORD2
1818
setWill KEYWORD2
1919
setServer KEYWORD2
2020
setSecure KEYWORD2
21-
addServerFingerprin KEYWORD2
21+
addServerFingerprint KEYWORD2
2222

2323
onConnect KEYWORD2
2424
onDisconnect KEYWORD2
2525
onSubscribe KEYWORD2
2626
onUnsubscribe KEYWORD2
27-
onPublish KEYWORD2
28-
onPublish KEYWORD2
27+
onMessage KEYWORD2
28+
onPublish KEYWORD2
2929

3030
connected KEYWORD2
3131
connect KEYWORD2

src/AsyncMqttClient.cpp

Lines changed: 155 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
AsyncMqttClient::AsyncMqttClient()
44
: _connected(false)
5+
, _connectPacketNotEnoughSpace(false)
6+
, _disconnectFlagged(false)
57
, _lastClientActivity(0)
68
, _lastServerActivity(0)
79
, _lastPingRequestTime(0)
@@ -147,10 +149,16 @@ void AsyncMqttClient::_freeCurrentParsedPacket() {
147149
void AsyncMqttClient::_clear() {
148150
_lastPingRequestTime = 0;
149151
_connected = false;
152+
_disconnectFlagged = false;
153+
_connectPacketNotEnoughSpace = false;
150154
_freeCurrentParsedPacket();
155+
151156
_pendingPubRels.clear();
152157
_pendingPubRels.shrink_to_fit();
153158

159+
_toSendAcks.clear();
160+
_toSendAcks.shrink_to_fit();
161+
154162
_nextPacketId = 1;
155163
_parsingInformation.bufferState = AsyncMqttClientInternals::BufferState::NONE;
156164
}
@@ -257,6 +265,37 @@ void AsyncMqttClient::_onConnect(AsyncClient* client) {
257265
if (_username != nullptr) remainingLength += 2 + usernameLength;
258266
if (_password != nullptr) remainingLength += 2 + passwordLength;
259267
uint8_t remainingLengthLength = AsyncMqttClientInternals::Helpers::encodeRemainingLength(remainingLength, fixedHeader + 1);
268+
269+
uint32_t neededSpace = 1 + remainingLengthLength;
270+
neededSpace += 2;
271+
neededSpace += protocolNameLength;
272+
neededSpace += 1;
273+
neededSpace += 1;
274+
neededSpace += 2;
275+
neededSpace += 2;
276+
neededSpace += clientIdLength;
277+
if (_willTopic != nullptr) {
278+
neededSpace += 2;
279+
neededSpace += willTopicLength;
280+
281+
neededSpace += 2;
282+
if (_willPayload != nullptr) neededSpace += willPayloadLength;
283+
}
284+
if (_username != nullptr) {
285+
neededSpace += 2;
286+
neededSpace += usernameLength;
287+
}
288+
if (_password != nullptr) {
289+
neededSpace += 2;
290+
neededSpace += passwordLength;
291+
}
292+
293+
if (_client.space() < neededSpace) {
294+
_connectPacketNotEnoughSpace = true;
295+
_client.close(true);
296+
return;
297+
}
298+
260299
_client.add(fixedHeader, 1 + remainingLengthLength);
261300
_client.add(protocolNameLengthBytes, 2);
262301
_client.add("MQTT", protocolNameLength);
@@ -287,7 +326,17 @@ void AsyncMqttClient::_onConnect(AsyncClient* client) {
287326
void AsyncMqttClient::_onDisconnect(AsyncClient* client) {
288327
(void)client;
289328
_clear();
290-
for (auto callback : _onDisconnectUserCallbacks) callback(AsyncMqttClientDisconnectReason::TCP_DISCONNECTED);
329+
AsyncMqttClientDisconnectReason reason;
330+
331+
if (_connectPacketNotEnoughSpace) {
332+
reason = AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE;
333+
} else {
334+
reason = AsyncMqttClientDisconnectReason::TCP_DISCONNECTED;
335+
}
336+
337+
for (auto callback : _onDisconnectUserCallbacks) callback(reason);
338+
339+
_connectPacketNotEnoughSpace = false;
291340
}
292341

293342
void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) {
@@ -299,8 +348,7 @@ void AsyncMqttClient::_onError(AsyncClient* client, int8_t error) {
299348
void AsyncMqttClient::_onTimeout(AsyncClient* client, uint32_t time) {
300349
(void)client;
301350
(void)time;
302-
// disconnection will be handled by ping/pong management now
303-
// _clear();
351+
// disconnection will be handled by ping/pong management
304352
}
305353

306354
void AsyncMqttClient::_onAck(AsyncClient* client, size_t len, uint32_t time) {
@@ -381,19 +429,29 @@ void AsyncMqttClient::_onData(AsyncClient* client, char* data, size_t len) {
381429
}
382430

383431
void AsyncMqttClient::_onPoll(AsyncClient* client) {
384-
if (_connected) {
385-
// if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections
386-
if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) {
387-
disconnect();
388-
389-
// send ping to ensure the server will receive at least one message inside keepalive window
390-
} else if (_lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) {
391-
_sendPing();
392-
393-
// send ping to verify if the server is still there (ensure this is not a half connection)
394-
} else if (_connected && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) {
395-
_sendPing();
396-
}
432+
if (!_connected) return;
433+
434+
// if there is too much time the client has sent a ping request without a response, disconnect client to avoid half open connections
435+
if (_lastPingRequestTime != 0 && (millis() - _lastPingRequestTime) >= (_keepAlive * 1000 * 2)) {
436+
disconnect();
437+
return;
438+
// send ping to ensure the server will receive at least one message inside keepalive window
439+
} else if (_lastPingRequestTime == 0 && (millis() - _lastClientActivity) >= (_keepAlive * 1000 * 0.7)) {
440+
_sendPing();
441+
442+
// send ping to verify if the server is still there (ensure this is not a half connection)
443+
} else if (_connected && _lastPingRequestTime == 0 && (millis() - _lastServerActivity) >= (_keepAlive * 1000 * 0.7)) {
444+
_sendPing();
445+
}
446+
447+
// handle to send ack packets
448+
449+
_sendAcks();
450+
451+
// handle disconnect
452+
453+
if (_disconnectFlagged) {
454+
_sendDisconnect();
397455
}
398456
}
399457

@@ -451,36 +509,18 @@ void AsyncMqttClient::_onMessage(char* topic, char* payload, uint8_t qos, bool d
451509
}
452510

453511
void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
454-
if (qos == 1) {
455-
char fixedHeader[2];
456-
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBACK;
457-
fixedHeader[0] = fixedHeader[0] << 4;
458-
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED;
459-
fixedHeader[1] = 2;
460-
461-
char packetIdBytes[2];
462-
packetIdBytes[0] = packetId >> 8;
463-
packetIdBytes[1] = packetId & 0xFF;
512+
AsyncMqttClientInternals::PendingAck pendingAck;
464513

465-
_client.add(fixedHeader, 2);
466-
_client.add(packetIdBytes, 2);
467-
_client.send();
468-
_lastClientActivity = millis();
514+
if (qos == 1) {
515+
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBACK;
516+
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBACK_RESERVED;
517+
pendingAck.packetId = packetId;
518+
_toSendAcks.push_back(pendingAck);
469519
} else if (qos == 2) {
470-
char fixedHeader[2];
471-
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBREC;
472-
fixedHeader[0] = fixedHeader[0] << 4;
473-
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED;
474-
fixedHeader[1] = 2;
475-
476-
char packetIdBytes[2];
477-
packetIdBytes[0] = packetId >> 8;
478-
packetIdBytes[1] = packetId & 0xFF;
479-
480-
_client.add(fixedHeader, 2);
481-
_client.add(packetIdBytes, 2);
482-
_client.send();
483-
_lastClientActivity = millis();
520+
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREC;
521+
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREC_RESERVED;
522+
pendingAck.packetId = packetId;
523+
_toSendAcks.push_back(pendingAck);
484524

485525
bool pubRelAwaiting = false;
486526
for (AsyncMqttClientInternals::PendingPubRel pendingPubRel : _pendingPubRels) {
@@ -495,6 +535,8 @@ void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
495535
pendingPubRel.packetId = packetId;
496536
_pendingPubRels.push_back(pendingPubRel);
497537
}
538+
539+
_sendAcks();
498540
}
499541

500542
_freeCurrentParsedPacket();
@@ -503,27 +545,20 @@ void AsyncMqttClient::_onPublish(uint16_t packetId, uint8_t qos) {
503545
void AsyncMqttClient::_onPubRel(uint16_t packetId) {
504546
_freeCurrentParsedPacket();
505547

506-
char fixedHeader[2];
507-
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBCOMP;
508-
fixedHeader[0] = fixedHeader[0] << 4;
509-
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED;
510-
fixedHeader[1] = 2;
511-
512-
char packetIdBytes[2];
513-
packetIdBytes[0] = packetId >> 8;
514-
packetIdBytes[1] = packetId & 0xFF;
515-
516-
_client.add(fixedHeader, 2);
517-
_client.add(packetIdBytes, 2);
518-
_client.send();
519-
_lastClientActivity = millis();
548+
AsyncMqttClientInternals::PendingAck pendingAck;
549+
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBCOMP;
550+
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBCOMP_RESERVED;
551+
pendingAck.packetId = packetId;
552+
_toSendAcks.push_back(pendingAck);
520553

521554
for (size_t i = 0; i < _pendingPubRels.size(); i++) {
522555
if (_pendingPubRels[i].packetId == packetId) {
523556
_pendingPubRels.erase(_pendingPubRels.begin() + i);
524557
_pendingPubRels.shrink_to_fit();
525558
}
526559
}
560+
561+
_sendAcks();
527562
}
528563

529564
void AsyncMqttClient::_onPubAck(uint16_t packetId) {
@@ -535,20 +570,13 @@ void AsyncMqttClient::_onPubAck(uint16_t packetId) {
535570
void AsyncMqttClient::_onPubRec(uint16_t packetId) {
536571
_freeCurrentParsedPacket();
537572

538-
char fixedHeader[2];
539-
fixedHeader[0] = AsyncMqttClientInternals::PacketType.PUBREL;
540-
fixedHeader[0] = fixedHeader[0] << 4;
541-
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED;
542-
fixedHeader[1] = 2;
543-
544-
char packetIdBytes[2];
545-
packetIdBytes[0] = packetId >> 8;
546-
packetIdBytes[1] = packetId & 0xFF;
573+
AsyncMqttClientInternals::PendingAck pendingAck;
574+
pendingAck.packetType = AsyncMqttClientInternals::PacketType.PUBREL;
575+
pendingAck.headerFlag = AsyncMqttClientInternals::HeaderFlag.PUBREL_RESERVED;
576+
pendingAck.packetId = packetId;
577+
_toSendAcks.push_back(pendingAck);
547578

548-
_client.add(fixedHeader, 2);
549-
_client.add(packetIdBytes, 2);
550-
_client.send();
551-
_lastClientActivity = millis();
579+
_sendAcks();
552580
}
553581

554582
void AsyncMqttClient::_onPubComp(uint16_t packetId) {
@@ -576,6 +604,55 @@ bool AsyncMqttClient::_sendPing() {
576604
return true;
577605
}
578606

607+
void AsyncMqttClient::_sendAcks() {
608+
uint8_t neededAckSpace = 2 + 2;
609+
610+
for (size_t i = 0; i < _toSendAcks.size(); i++) {
611+
if (_client.space() < neededAckSpace) break;
612+
613+
AsyncMqttClientInternals::PendingAck pendingAck = _toSendAcks[i];
614+
615+
char fixedHeader[2];
616+
fixedHeader[0] = pendingAck.packetType;
617+
fixedHeader[0] = fixedHeader[0] << 4;
618+
fixedHeader[0] = fixedHeader[0] | pendingAck.headerFlag;
619+
fixedHeader[1] = 2;
620+
621+
char packetIdBytes[2];
622+
packetIdBytes[0] = pendingAck.packetId >> 8;
623+
packetIdBytes[1] = pendingAck.packetId & 0xFF;
624+
625+
_client.add(fixedHeader, 2);
626+
_client.add(packetIdBytes, 2);
627+
_client.send();
628+
629+
_toSendAcks.erase(_toSendAcks.begin() + i);
630+
_toSendAcks.shrink_to_fit();
631+
632+
_lastClientActivity = millis();
633+
}
634+
}
635+
636+
bool AsyncMqttClient::_sendDisconnect() {
637+
const uint8_t neededSpace = 2;
638+
639+
if (_client.space() < neededSpace) return false;
640+
641+
char fixedHeader[2];
642+
fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT;
643+
fixedHeader[0] = fixedHeader[0] << 4;
644+
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED;
645+
fixedHeader[1] = 0;
646+
647+
_client.add(fixedHeader, 2);
648+
_client.send();
649+
_client.close(true);
650+
651+
_disconnectFlagged = false;
652+
653+
return true;
654+
}
655+
579656
uint16_t AsyncMqttClient::_getNextPacketId() {
580657
uint16_t nextPacketId = _nextPacketId;
581658

@@ -607,18 +684,15 @@ void AsyncMqttClient::connect() {
607684
#endif
608685
}
609686

610-
void AsyncMqttClient::disconnect() {
687+
void AsyncMqttClient::disconnect(bool force) {
611688
if (!_connected) return;
612689

613-
char fixedHeader[2];
614-
fixedHeader[0] = AsyncMqttClientInternals::PacketType.DISCONNECT;
615-
fixedHeader[0] = fixedHeader[0] << 4;
616-
fixedHeader[0] = fixedHeader[0] | AsyncMqttClientInternals::HeaderFlag.DISCONNECT_RESERVED;
617-
fixedHeader[1] = 0;
618-
619-
_client.add(fixedHeader, 2);
620-
_client.send();
621-
_client.close(true);
690+
if (force) {
691+
_client.close(true);
692+
} else {
693+
_disconnectFlagged = true;
694+
_sendDisconnect();
695+
}
622696
}
623697

624698
uint16_t AsyncMqttClient::subscribe(const char* topic, uint8_t qos) {

0 commit comments

Comments
 (0)