1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
36
37
38
39
40
41
48
49
50
51
54
55
56
57
58
59
60
61
62
63
64
65
66
67
74
75
78
79
80
81
82
83
84
85
90
91
97
98
99
100
101
102
103
104
105
108
109
110
111
120
121
122
125
126
127
128
129
130
131
132
133
134
137
138
139
140
141
142
143
144
145
146
153
154
157
158
159
160
161
162
163
164
165
166
175
176
177
180
181
182
183
184
185
188
189
190
191
192
193
194
195
196
197
203
204
207
208
209
210
213
214
215
216
217
218
219
220
221
222
223
224
225
226
233
234
235
238
239
240
241
244
245
246
247
253
254
257
258
263
264
265
266
267
268
269
270
274
275
276
277
278
279
280
283
284
285
286
287
288
289
295
296
299
300
301
302
305
306
307
315
316
317
318
319
320
321
322
323
324
325
335
337
338
339
340
341
342
343
344
345
346
351
352
355
356
357
358
359
360
361
373
374
375
376
377
378
379
382
383
392
393
394
397
398
401
402
403
406
407
408
409
416
417
435
436
437
438
439
440
441
442
443
446
447
450
451
454
455
462
463
476
477
478
479
487
488
496
497
505
506
514
515
516
517
518
519
522
523
524
527
528
531
532
535
536
537
538
539
540
541
542
543
544
545
546
549
550
553
554
557
558
559
560
561
566
567
572
573
578
579
583
584
585
586
587
590
591
592
593
594
595
596
597
598
599
600
602
603
604
606
614
616
617
618
619
620
621
622
623
626
627
628
629
630
636
637
/* ... */
#include <string.h>
#include "mqtt_client.h"
#include "mqtt_msg.h"
#include "mqtt_config.h"
#include "platform.h"5 includes
#define MQTT_MAX_FIXED_HEADER_SIZE 5
#define MQTT_3_1_VARIABLE_HEADER_SIZE 12
#define MQTT_3_1_1_VARIABLE_HEADER_SIZE 10
enum mqtt_connect_flag {
MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
MQTT_CONNECT_FLAG_WILL = 1 << 2,
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
}{ ... };
static int append_string(mqtt_connection_t *connection, const char *string, int len)
{
if (connection->outbound_message.length + len + 2 > connection->buffer_length) {
return -1;
}{...}
connection->buffer[connection->outbound_message.length++] = len >> 8;
connection->buffer[connection->outbound_message.length++] = len & 0xff;
memcpy(connection->buffer + connection->outbound_message.length, string, len);
connection->outbound_message.length += len;
return len + 2;
}{ ... }
static uint16_t append_message_id(mqtt_connection_t *connection, uint16_t message_id)
{
while (message_id == 0) {
#if MQTT_MSG_ID_INCREMENTAL
message_id = ++connection->last_message_id;
#else
message_id = platform_random(65535);
#endif
}{...}
if (connection->outbound_message.length + 2 > connection->buffer_length) {
return 0;
}{...}
connection->buffer[connection->outbound_message.length++] = message_id >> 8;
connection->buffer[connection->outbound_message.length++] = message_id & 0xff;
return message_id;
}{ ... }
static int set_message_header_size(mqtt_connection_t *connection)
{
connection->outbound_message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}{ ... }
static mqtt_message_t *fail_message(mqtt_connection_t *connection)
{
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = 0;
return &connection->outbound_message;
}{ ... }
static mqtt_message_t *fini_message(mqtt_connection_t *connection, int type, int dup, int qos, int retain)
{
int message_length = connection->outbound_message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
if (connection->outbound_message.fragmented_msg_total_length) {
total_length = connection->outbound_message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}{...}
int len_bytes = 0;
do {
encoded_length = total_length % 128;
total_length /= 128;
if (total_length > 0) {
encoded_length |= 0x80;
}{...}
encoded_lens[len_bytes] = encoded_length;
len_bytes++;
}{...} while (total_length > 0);
if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) {
return fail_message(connection);
}{...}
connection->outbound_message.length = message_length + len_bytes + 1;
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->outbound_message.data = connection->buffer + offs;
connection->outbound_message.fragmented_msg_data_offset -= offs;
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
for (int j = 0; j < len_bytes; j++) {
connection->buffer[offs++] = encoded_lens[j];
}{...}
return &connection->outbound_message;
}{ ... }
size_t mqtt_get_total_length(const uint8_t *buffer, size_t length, int *fixed_size_len)
{
int i;
size_t totlen = 0;
for (i = 1; i < length; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}{...}
}{...}
totlen += i;
if (fixed_size_len) {
*fixed_size_len = i;
}{...}
return totlen;
}{ ... }
bool mqtt_header_complete(uint8_t *buffer, size_t buffer_length)
{
uint16_t i;
uint16_t topiclen;
for (i = 1; i < MQTT_MAX_FIXED_HEADER_SIZE; ++i) {
if (i >= buffer_length) {
return false;
}{...}
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}{...}
}{...}
if (i + 2 >= buffer_length) {
return false;
}{...}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
i += 2;
}{...}
return buffer_length >= i;
}{ ... }
char *mqtt_get_publish_topic(uint8_t *buffer, size_t *length)
{
int i;
int topiclen;
for (i = 1; i < *length; ++i) {
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}{...}
}{...}
if (i + 2 >= *length) {
return NULL;
}{...}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen > *length) {
return NULL;
}{...}
*length = topiclen;
return (char *)(buffer + i);
}{ ... }
char *mqtt_get_publish_data(uint8_t *buffer, size_t *length)
{
int i;
int totlen = 0;
int topiclen;
int blength = *length;
*length = 0;
for (i = 1; i < blength; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}{...}
}{...}
totlen += i;
if (i + 2 >= blength) {
return NULL;
}{...}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen >= blength) {
return NULL;
}{...}
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
if (i + 2 >= blength) {
return NULL;
}{...}
i += 2;
}{...}
if (totlen < i) {
return NULL;
}{...}
if (totlen <= blength) {
*length = totlen - i;
}{...} else {
*length = blength - i;
}{...}
return (char *)(buffer + i);
}{ ... }
char *mqtt_get_suback_data(uint8_t *buffer, size_t *length)
{
if (*length > 4) {
*length -= 4;
return (char *)(buffer + 4);
}{...}
*length = 0;
return NULL;
}{ ... }
uint16_t mqtt_get_id(uint8_t *buffer, size_t length)
{
if (length < 1) {
return 0;
}{...}
switch (mqtt_get_type(buffer)) {
case MQTT_MSG_TYPE_PUBLISH: {
int i;
int topiclen;
for (i = 1; i < length; ++i) {
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}{...}
}{...}
if (i + 2 >= length) {
return 0;
}{...}
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen > length) {
return 0;
}{...}
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
if (i + 2 > length) {
return 0;
}{...}
}{...} else {
return 0;
}{...}
return (buffer[i] << 8) | buffer[i + 1];
}{...}
... case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE: {
if (length >= 4 && (buffer[1] & 0x80) == 0) {
return (buffer[2] << 8) | buffer[3];
}{...} else {
return 0;
}{...}
}{...}
...
default:
return 0;...
}{...}
}{ ... }
mqtt_message_t *mqtt_msg_connect(mqtt_connection_t *connection, mqtt_connect_info_t *info)
{
set_message_header_size(connection);
int header_len;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
header_len = MQTT_3_1_VARIABLE_HEADER_SIZE;
}{...} else {
header_len = MQTT_3_1_1_VARIABLE_HEADER_SIZE;
}{...}
if (connection->outbound_message.length + header_len > connection->buffer_length) {
return fail_message(connection);
}{...}
char *variable_header = (char *)(connection->buffer + connection->outbound_message.length);
connection->outbound_message.length += header_len;
int header_idx = 0;
variable_header[header_idx++] = 0;
if (info->protocol_ver == MQTT_PROTOCOL_V_3_1) {
variable_header[header_idx++] = 6;
memcpy(&variable_header[header_idx], "MQIsdp", 6);
header_idx = header_idx + 6;
variable_header[header_idx++] = 3;
}{...} else {
variable_header[header_idx++] = 4;
memcpy(&variable_header[header_idx], "MQTT", 4);
header_idx = header_idx + 4;
variable_header[header_idx++] = 4;
}{...}
int flags_offset = header_idx;
variable_header[header_idx++] = 0;
variable_header[header_idx++] = info->keepalive >> 8;
variable_header[header_idx] = info->keepalive & 0xff;
if (info->clean_session) {
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
}{...}
if (info->client_id != NULL && info->client_id[0] != '\0') {
if (append_string(connection, info->client_id, strlen(info->client_id)) < 0) {
return fail_message(connection);
}{...}
}{...} else {
if (append_string(connection, "", 0) < 0) {
return fail_message(connection);
}{...}
}{...}
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) {
return fail_message(connection);
}{...}
if (append_string(connection, info->will_message, info->will_length) < 0) {
return fail_message(connection);
}{...}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL;
if (info->will_retain) {
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_WILL_RETAIN;
}{...}
variable_header[flags_offset] |= (info->will_qos & 3) << 3;
}{...}
if (info->username != NULL && info->username[0] != '\0') {
if (append_string(connection, info->username, strlen(info->username)) < 0) {
return fail_message(connection);
}{...}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_USERNAME;
}{...}
if (info->password != NULL && info->password[0] != '\0') {
if (info->username == NULL || info->username[0] == '\0') {
/* ... */
if (append_string(connection, "", 0) < 0) {
return fail_message(connection);
}{...}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_USERNAME;
}{...}
if (append_string(connection, info->password, strlen(info->password)) < 0) {
return fail_message(connection);
}{...}
variable_header[flags_offset] |= MQTT_CONNECT_FLAG_PASSWORD;
}{...}
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
}{ ... }
mqtt_message_t *mqtt_msg_publish(mqtt_connection_t *connection, const char *topic, const char *data, int data_length, int qos, int retain, uint16_t *message_id)
{
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}{...}
if (append_string(connection, topic, strlen(topic)) < 0) {
return fail_message(connection);
}{...}
if (data == NULL && data_length > 0) {
return fail_message(connection);
}{...}
if (qos > 0) {
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}{...}
}{...} else {
*message_id = 0;
}{...}
if (data != NULL) {
if (connection->outbound_message.length + data_length > connection->buffer_length) {
connection->outbound_message.fragmented_msg_data_offset = connection->outbound_message.length;
memcpy(connection->buffer + connection->outbound_message.length, data, connection->buffer_length - connection->outbound_message.length);
connection->outbound_message.length = connection->buffer_length;
connection->outbound_message.fragmented_msg_total_length = data_length + connection->outbound_message.fragmented_msg_data_offset;
}{...} else {
memcpy(connection->buffer + connection->outbound_message.length, data, data_length);
connection->outbound_message.length += data_length;
connection->outbound_message.fragmented_msg_total_length = 0;
}{...}
}{...}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}{ ... }
mqtt_message_t *mqtt_msg_puback(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}{...}
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}{ ... }
mqtt_message_t *mqtt_msg_pubrec(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}{...}
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}{ ... }
mqtt_message_t *mqtt_msg_pubrel(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}{...}
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}{ ... }
mqtt_message_t *mqtt_msg_pubcomp(mqtt_connection_t *connection, uint16_t message_id)
{
set_message_header_size(connection);
if (append_message_id(connection, message_id) == 0) {
return fail_message(connection);
}{...}
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}{ ... }
mqtt_message_t *mqtt_msg_subscribe(mqtt_connection_t *connection, const esp_mqtt_topic_t topic_list[], int size, uint16_t *message_id)
{
set_message_header_size(connection);
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}{...}
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].filter[0] == '\0') {
return fail_message(connection);
}{...}
if (append_string(connection, topic_list[topic_number].filter, strlen(topic_list[topic_number].filter)) < 0) {
return fail_message(connection);
}{...}
if (connection->outbound_message.length + 1 > connection->buffer_length) {
return fail_message(connection);
}{...}
connection->buffer[connection->outbound_message.length] = topic_list[topic_number].qos;
connection->outbound_message.length ++;
}{...}
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}{ ... }
mqtt_message_t *mqtt_msg_unsubscribe(mqtt_connection_t *connection, const char *topic, uint16_t *message_id)
{
set_message_header_size(connection);
if (topic == NULL || topic[0] == '\0') {
return fail_message(connection);
}{...}
if ((*message_id = append_message_id(connection, 0)) == 0) {
return fail_message(connection);
}{...}
if (append_string(connection, topic, strlen(topic)) < 0) {
return fail_message(connection);
}{...}
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
}{ ... }
mqtt_message_t *mqtt_msg_pingreq(mqtt_connection_t *connection)
{
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}{ ... }
mqtt_message_t *mqtt_msg_pingresp(mqtt_connection_t *connection)
{
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}{ ... }
mqtt_message_t *mqtt_msg_disconnect(mqtt_connection_t *connection)
{
set_message_header_size(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}{ ... }
/* ... */
int mqtt_has_valid_msg_hdr(uint8_t *buffer, size_t length)
{
int qos, dup;
if (length < 1) {
return 0;
}{...}
switch (mqtt_get_type(buffer)) {
case MQTT_MSG_TYPE_CONNECT:
case MQTT_MSG_TYPE_CONNACK:
case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_PINGREQ:
case MQTT_MSG_TYPE_PINGRESP:
case MQTT_MSG_TYPE_DISCONNECT:
return (buffer[0] & 0x0f) == 0; ...
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE:
return (buffer[0] & 0x0f) == 0x02; ...
case MQTT_MSG_TYPE_PUBLISH:
qos = mqtt_get_qos(buffer);
dup = mqtt_get_dup(buffer);
/* ... */
return (qos < 3) && ((qos > 0) || (dup == 0));...
default:
return 0;...
}{...}
}{ ... }
esp_err_t mqtt_msg_buffer_init(mqtt_connection_t *connection, int buffer_size)
{
memset(&connection->outbound_message, 0, sizeof(mqtt_message_t));
connection->buffer = (uint8_t *)calloc(buffer_size, sizeof(uint8_t));
if (!connection->buffer) {
return ESP_ERR_NO_MEM;
}{...}
connection->buffer_length = buffer_size;
return ESP_OK;
}{ ... }
void mqtt_msg_buffer_destroy(mqtt_connection_t *connection)
{
if (connection) {
free(connection->buffer);
}{...}
}{ ... }