1
11
12
13
14
15
16
17
18
19
20
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
68
69
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
95
96
102
103
108
109
110
114
115
116
117
126
127
130
133
136
138
139
140
141
142
143
144
145
146
165
178
179
192
193
194
202
203
211
212
225
226
239
240
241
242
243
244
254
255
256
257
258
259
260
261
262
263
264
265
266
270
271
272
273
274
277
278
286
287
291
292
293
294
295
306
307
308
309
310
314
315
316
317
318
319
320
321
322
325
326
327
328
329
330
331
334
335
336
337
338
339
341
342
343
344
345
346
347
348
351
352
353
354
355
356
357
360
361
362
363
364
365
367
368
369
370
372
373
377
381
382
383
384
385
386
390
391
392
393
400
401
402
403
406
407
408
409
412
413
414
415
416
417
418
419
422
423
424
427
428
429
432
433
436
437
438
439
440
441
442
443
452
453
454
455
456
468
471
474
475
481
484
487
493
494
497
509
510
511
514
515
518
519
520
523
524
529
532
533
538
539
540
543
544
545
546
547
548
551
552
553
554
555
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
582
583
584
585
586
590
591
596
597
602
603
608
609
610
611
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
634
635
636
637
638
639
640
643
644
645
646
647
648
649
650
651
652
653
654
655
656
660
661
662
663
664
665
666
671
672
699
700
701
702
703
704
705
706
707
708
713
714
718
719
720
721
722
723
724
725
726
727
728
737
741
742
743
752
753
754
755
756
759
760
761
762
763
764
765
768
769
773
774
778
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
859
860
861
862
863
866
867
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
890
891
892
893
894
895
896
897
898
901
904
905
908
911
914
917
918
919
920
921
922
923
924
925
928
929
930
931
932
933
934
935
936
937
938
939
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
987
988
991
992
993
994
995
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1021
1022
1023
1024
1025
1026
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1042
1043
1044
1045
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1071
1072
1073
1077
1078
1079
1080
1081
1085
1086
1087
1088
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1143
1144
1147
1151
1152
1153
1154
1155
1156
1157
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1225
1226
1229
1230
1234
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1269
1270
1271
1272
1273
1276
1277
1278
1283
1284
1285
1286
1287
1288
1294
1295
1299
1300
1301
1302
1303
1304
1307
1308
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1333
1341
1345
1346
1347
1348
1349
1350
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1390
1391
1408
1412
1413
1414
1415
1419
1420
1421
1422
1423
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1446
1450
1451
1452
1453
1454
1455
1456
1464
1468
1469
1470
1471
1472
1473
1474
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1513
1514
1515
1520
1521
1522
1523
1524
1525
1526
1527
1528
1538
1539
1540
1541
1542
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1602
1603
1604
1605
1610
1611
1612
1615
1616
1617
1618
1619
1620
1621
1622
1623
1629
1630
1635
1636
1639
1640
1641
1642
1643
1644
1645
1646
1647
1652
1653
1657
1658
1662
1663
1664
1665
1666
1697
1698
1701
1702
1703
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1725
1726
1727
1728
1729
1730
1733
1734
1735
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1757
1758
1763
1764
1765
1771
1777
1778
1779
1780
1781
1782
1783
1787
1788
1789
1790
1791
1792
1793
1794
1798
1799
1803
1804
1805
1806
1807
1808
1809
1810
1811
1823
1827
1830
1831
1832
1833
1834
1835
1839
1840
1841
1842
1843
1848
1849
1850
1856
1857
1858
1859
1860
1861
1862
1867
1868
1869
1870
1871
1872
1876
1877
1881
1882
1883
1884
1885
1886
1887
1888
1892
1893
1896
1897
1901
1902
1903
1904
1905
1906
1911
1916
1917
1918
1919
1922
1923
1928
1933
1934
1935
1936
1940
1941
1942
1947
1948
1949
1950
1951
1952
1953
1958
1959
1960
1961
1965
1969
1970
1984
1989
1990
1991
1992
1996
1997
1998
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2029
2030
2034
2035
2036
2037
2038
2039
2040
2043
2044
2045
2046
2047
2048
2049
2050
2061
2062
2063
2064
2065
2066
2067
2071
2072
2077
2078
2079
2080
2088
2089
2094
2097
2098
2104
2105
2106
2110
2111
2112
2113
2114
2115
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2158
2159
2160
2161
2162
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2179
2180
2181
2182
2183
2184
2185
2186
2190
2191
2196
2199
2200
2206
2207
2208
2209
2217
2218
2219
2223
2224
2225
2226
2227
2228
2231
2232
2234
2235
2237
2238
2239
2240
2241
2242
2245
2246
2248
2249
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2266
2267
2268
2269
2270
2271
2272
2273
2274
2277
2278
2279
2280
2283
2284
2285
2286
2287
#include <stdint.h>
#include <stdlib.h>
#include "esp_err.h"
#include "esp_log.h"
#include "esp_heap_caps.h"
#include "esp_transport.h"
#include "mqtt_client.h"
#include "mqtt_client_priv.h"
#include "mqtt_msg.h"
#include "mqtt_outbox.h"10 includes
_Static_assert(sizeof(uint64_t) == sizeof(outbox_tick_t), "mqtt-client tick type size different from outbox tick type");
#ifdef ESP_EVENT_ANY_ID
_Static_assert(MQTT_EVENT_ANY == ESP_EVENT_ANY_ID, "mqtt-client event enum does not match the global EVENT_ANY_ID");
#endif
static const char *TAG = "mqtt_client";
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
/* ... */
ESP_EVENT_DEFINE_BASE(MQTT_EVENTS);/* ... */
#endif
#define MQTT_OVER_TCP_SCHEME "mqtt"
#define MQTT_OVER_SSL_SCHEME "mqtts"
#define MQTT_OVER_WS_SCHEME "ws"
#define MQTT_OVER_WSS_SCHEME "wss"
const static int STOPPED_BIT = (1 << 0);
const static int RECONNECT_BIT = (1 << 1);
const static int DISCONNECT_BIT = (1 << 2);
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
static void esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len);
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms);
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client);
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client);
/* ... */
static int esp_mqtt_handle_transport_read_error(int err, esp_mqtt_client_handle_t client, bool mid_message)
{
if (err == ERR_TCP_TRANSPORT_CONNECTION_TIMEOUT) {
if (mid_message) {
return -1;
}{...}
ESP_LOGV(TAG, "%s: transport_read(): call timed out before data was ready!", __func__);
return 0;
}{...}
if (err == ERR_TCP_TRANSPORT_CONNECTION_CLOSED_BY_FIN) {
ESP_LOGE(TAG, "%s: transport_read(): EOF", __func__);
}{...}
ESP_LOGE(TAG, "%s: transport_read() error: errno=%d", __func__, errno);
esp_mqtt_client_dispatch_transport_error(client);
return -2;
}{ ... }
#if MQTT_ENABLE_SSL
enum esp_mqtt_ssl_cert_key_api {
MQTT_SSL_DATA_API_CA_CERT,
MQTT_SSL_DATA_API_CLIENT_CERT,
MQTT_SSL_DATA_API_CLIENT_KEY,
MQTT_SSL_DATA_API_MAX,
}{ ... };
static esp_err_t esp_mqtt_set_cert_key_data(esp_transport_handle_t ssl, enum esp_mqtt_ssl_cert_key_api what, const char *cert_key_data, int cert_key_len)
{
char *data = (char *)cert_key_data;
int ssl_transport_api_id = what;
int len = cert_key_len;
if (!data) {
return ESP_OK;
}{...}
if (len == 0) {
ssl_transport_api_id += MQTT_SSL_DATA_API_MAX;
len = strlen(data);
}{...}
#ifndef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
else {
ESP_LOGE(TAG, "Explicit cert-/key-len is not available in IDF version %s", IDF_VER);
return ESP_ERR_NOT_SUPPORTED;
}{...}
#endif/* ... */
if (0 == strcmp(data, "NULL")) {
data = NULL;
len = 0;
}{...}
switch (ssl_transport_api_id) {
#ifdef MQTT_SUPPORTED_FEATURE_DER_CERTIFICATES
case MQTT_SSL_DATA_API_CA_CERT:
esp_transport_ssl_set_cert_data_der(ssl, data, len);
break;...
case MQTT_SSL_DATA_API_CLIENT_CERT:
esp_transport_ssl_set_client_cert_data_der(ssl, data, len);
break;...
case MQTT_SSL_DATA_API_CLIENT_KEY:
esp_transport_ssl_set_client_key_data_der(ssl, data, len);
break;/* ... */
#endif
case MQTT_SSL_DATA_API_CA_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_cert_data(ssl, data, len);
break;...
case MQTT_SSL_DATA_API_CLIENT_CERT + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_cert_data(ssl, data, len);
break;...
case MQTT_SSL_DATA_API_CLIENT_KEY + MQTT_SSL_DATA_API_MAX:
esp_transport_ssl_set_client_key_data(ssl, data, len);
break;...
default:
return ESP_ERR_INVALID_ARG;...
}{...}
return ESP_OK;
}{ ... }
static esp_err_t esp_mqtt_set_ssl_transport_properties(esp_transport_list_handle_t transport_list, mqtt_config_storage_t *cfg)
{
esp_transport_handle_t ssl = esp_transport_list_get_transport(transport_list, MQTT_OVER_SSL_SCHEME);
if (cfg->use_global_ca_store == true) {
esp_transport_ssl_enable_global_ca_store(ssl);
}{...} else if (cfg->crt_bundle_attach != NULL) {
#ifdef MQTT_SUPPORTED_FEATURE_CERTIFICATE_BUNDLE
#ifdef CONFIG_MBEDTLS_CERTIFICATE_BUNDLE
esp_transport_ssl_crt_bundle_attach(ssl, cfg->crt_bundle_attach);
#else
ESP_LOGE(TAG, "Certificate bundle is not enabled for mbedTLS in menuconfig");
goto esp_mqtt_set_transport_failed;/* ... */
#endif /* ... */
#else
ESP_LOGE(TAG, "Certificate bundle feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...} else {
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CA_CERT, cfg->cacert_buf, cfg->cacert_bytes),
goto esp_mqtt_set_transport_failed);
}{...}
if (cfg->psk_hint_key) {
#if defined(MQTT_SUPPORTED_FEATURE_PSK_AUTHENTICATION) && MQTT_ENABLE_SSL
#ifdef CONFIG_ESP_TLS_PSK_VERIFICATION
esp_transport_ssl_set_psk_key_hint(ssl, cfg->psk_hint_key);
#else
ESP_LOGE(TAG, "PSK authentication configured but not enabled in menuconfig: Please enable ESP_TLS_PSK_VERIFICATION option");
goto esp_mqtt_set_transport_failed;/* ... */
#endif/* ... */
#else
ESP_LOGE(TAG, "PSK authentication is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
if (cfg->alpn_protos) {
#if defined(MQTT_SUPPORTED_FEATURE_ALPN) && MQTT_ENABLE_SSL
#if defined(CONFIG_MBEDTLS_SSL_ALPN) || defined(CONFIG_WOLFSSL_HAVE_ALPN)
esp_transport_ssl_set_alpn_protocol(ssl, (const char **)cfg->alpn_protos);
#else
ESP_LOGE(TAG, "APLN configured but not enabled in menuconfig: Please enable MBEDTLS_SSL_ALPN or WOLFSSL_HAVE_ALPN option");
goto esp_mqtt_set_transport_failed;/* ... */
#endif/* ... */
#else
ESP_LOGE(TAG, "APLN is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
if (cfg->skip_cert_common_name_check) {
#if defined(MQTT_SUPPORTED_FEATURE_SKIP_CRT_CMN_NAME_CHECK) && MQTT_ENABLE_SSL
esp_transport_ssl_skip_common_name_check(ssl);
#else
ESP_LOGE(TAG, "Skip certificate common name check is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
if (cfg->common_name) {
#if defined(MQTT_SUPPORTED_FEATURE_CRT_CMN_NAME) && MQTT_ENABLE_SSL
esp_transport_ssl_set_common_name(ssl, cfg->common_name);
#else
ESP_LOGE(TAG, "Setting expected certificate common name is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
if (cfg->use_secure_element) {
#ifdef MQTT_SUPPORTED_FEATURE_SECURE_ELEMENT
#ifdef CONFIG_ESP_TLS_USE_SECURE_ELEMENT
esp_transport_ssl_use_secure_element(ssl);
#else
ESP_LOGE(TAG, "Secure element not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;/* ... */
#endif /* ... */
#else
ESP_LOGE(TAG, "Secure element feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
if (cfg->ds_data != NULL) {
#ifdef MQTT_SUPPORTED_FEATURE_DIGITAL_SIGNATURE
#ifdef CONFIG_ESP_TLS_USE_DS_PERIPHERAL
esp_transport_ssl_set_ds_data(ssl, cfg->ds_data);
#else
ESP_LOGE(TAG, "Digital Signature not enabled for esp-tls in menuconfig");
goto esp_mqtt_set_transport_failed;/* ... */
#endif /* ... */
#else
ESP_LOGE(TAG, "Digital Signature feature is not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_CERT, cfg->clientcert_buf, cfg->clientcert_bytes),
goto esp_mqtt_set_transport_failed);
ESP_OK_CHECK(TAG, esp_mqtt_set_cert_key_data(ssl, MQTT_SSL_DATA_API_CLIENT_KEY, cfg->clientkey_buf, cfg->clientkey_bytes),
goto esp_mqtt_set_transport_failed);
if (cfg->clientkey_password && cfg->clientkey_password_len) {
#if defined(MQTT_SUPPORTED_FEATURE_CLIENT_KEY_PASSWORD) && MQTT_ENABLE_SSL
esp_transport_ssl_set_client_key_password(ssl,
cfg->clientkey_password,
cfg->clientkey_password_len);/* ... */
#else
ESP_LOGE(TAG, "Password protected keys are not available in IDF version %s", IDF_VER);
goto esp_mqtt_set_transport_failed;/* ... */
#endif
}{...}
return ESP_OK;
esp_mqtt_set_transport_failed:
return ESP_FAIL;
}{ ... }
#endif/* ... */
static esp_err_t esp_mqtt_check_cfg_conflict(const mqtt_config_storage_t *cfg, const esp_mqtt_client_config_t *user_cfg)
{
if (cfg == NULL || user_cfg == NULL) {
ESP_LOGE(TAG, "Invalid configuration");
return ESP_ERR_INVALID_ARG;
}{...}
esp_err_t ret = ESP_OK;
bool ssl_cfg_enabled = cfg->use_global_ca_store || cfg->cacert_buf || cfg->clientcert_buf || cfg->psk_hint_key || cfg->alpn_protos;
bool is_ssl_scheme = false;
if (cfg->scheme) {
is_ssl_scheme = (strncasecmp(cfg->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(cfg->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0);
}{...}
if (!is_ssl_scheme && ssl_cfg_enabled) {
if (cfg->uri) {
ESP_LOGW(TAG, "SSL related configs set, but the URI scheme specifies a non-SSL scheme, scheme = %s", cfg->scheme);
}{...} else {
ESP_LOGW(TAG, "SSL related configs set, but the transport protocol is a non-SSL scheme, transport = %d", user_cfg->broker.address.transport);
}{...}
ret = ESP_ERR_INVALID_ARG;
}{...}
if (cfg->uri && user_cfg->broker.address.transport) {
ESP_LOGW(TAG, "Transport config set, but overridden by scheme from URI: transport = %d, uri scheme = %s", user_cfg->broker.address.transport, cfg->scheme);
ret = ESP_ERR_INVALID_ARG;
}{...}
return ret;
}{ ... }
bool esp_mqtt_set_if_config(char const *const new_config, char **old_config)
{
if (new_config) {
free(*old_config);
*old_config = strdup(new_config);
if (*old_config == NULL) {
return false;
}{...}
}{...}
return true;
}{ ... }
static esp_err_t esp_mqtt_client_create_transport(esp_mqtt_client_handle_t client)
{
esp_err_t ret = ESP_OK;
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
client->transport_list = NULL;
}{...}
if (client->config->scheme) {
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, return ESP_ERR_NO_MEM);
if ((strncasecmp(client->config->scheme, MQTT_OVER_TCP_SCHEME, sizeof(MQTT_OVER_TCP_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0)) {
esp_transport_handle_t tcp = esp_transport_tcp_init();
ESP_MEM_CHECK(TAG, tcp, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(tcp, MQTT_TCP_DEFAULT_PORT);
if (client->config->if_name) {
esp_transport_tcp_set_interface_name(tcp, client->config->if_name);
}{...}
esp_transport_list_add(client->transport_list, tcp, MQTT_OVER_TCP_SCHEME);
if (strncasecmp(client->config->scheme, MQTT_OVER_WS_SCHEME, sizeof(MQTT_OVER_WS_SCHEME)) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t ws = esp_transport_ws_init(tcp);
ESP_MEM_CHECK(TAG, ws, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ws, MQTT_WS_DEFAULT_PORT);
if (client->config->path) {
esp_transport_ws_set_path(ws, client->config->path);
}{...}
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(ws, MQTT_OVER_TCP_SCHEME);
#endif
esp_transport_list_add(client->transport_list, ws, MQTT_OVER_WS_SCHEME);/* ... */
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_WS to use %s", client->config->scheme);
ret = ESP_FAIL;/* ... */
#endif
}{...}
}{...} else if ((strncasecmp(client->config->scheme, MQTT_OVER_SSL_SCHEME, sizeof(MQTT_OVER_SSL_SCHEME)) == 0) || (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0)) {
#if MQTT_ENABLE_SSL
esp_transport_handle_t ssl = esp_transport_ssl_init();
ESP_MEM_CHECK(TAG, ssl, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(ssl, MQTT_SSL_DEFAULT_PORT);
if (client->config->if_name) {
esp_transport_ssl_set_interface_name(ssl, client->config->if_name);
}{...}
esp_transport_list_add(client->transport_list, ssl, MQTT_OVER_SSL_SCHEME);
if (strncasecmp(client->config->scheme, MQTT_OVER_WSS_SCHEME, sizeof(MQTT_OVER_WSS_SCHEME)) == 0) {
#if MQTT_ENABLE_WS
esp_transport_handle_t wss = esp_transport_ws_init(ssl);
ESP_MEM_CHECK(TAG, wss, return ESP_ERR_NO_MEM);
esp_transport_set_default_port(wss, MQTT_WSS_DEFAULT_PORT);
if (client->config->path) {
esp_transport_ws_set_path(wss, client->config->path);
}{...}
#ifdef MQTT_SUPPORTED_FEATURE_WS_SUBPROTOCOL
esp_transport_ws_set_subprotocol(wss, MQTT_OVER_TCP_SCHEME);
#endif
esp_transport_list_add(client->transport_list, wss, MQTT_OVER_WSS_SCHEME);/* ... */
#else
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_WS to use %s", client->config->scheme);
ret = ESP_FAIL;/* ... */
#endif
}{...}
#else/* ... */
ESP_LOGE(TAG, "Please enable MQTT_ENABLE_SSL to use %s", client->config->scheme);
ret = ESP_FAIL;/* ... */
#endif
}{...} else {
ESP_LOGE(TAG, "Not support this mqtt scheme %s", client->config->scheme);
ret = ESP_FAIL;
}{...}
}{...} else {
ESP_LOGE(TAG, "No scheme found");
ret = ESP_FAIL;
}{...}
return ret;
}{ ... }
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}{...}
MQTT_API_LOCK(client);
esp_err_t err = ESP_OK;
if (!client->config) {
client->config = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, client->config, {
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
}{...});
}{...}
mqtt_msg_buffer_destroy(&client->mqtt_state.connection);
int buffer_size = config->buffer.size;
if (buffer_size <= 0) {
buffer_size = MQTT_BUFFER_SIZE_BYTE;
}{...}
int out_buffer_size = config->buffer.out_size > 0 ? config->buffer.out_size : buffer_size;
if (mqtt_msg_buffer_init(&client->mqtt_state.connection, out_buffer_size) != ESP_OK) {
goto _mqtt_set_config_failed;
}{...}
free(client->mqtt_state.in_buffer);
client->mqtt_state.in_buffer = (uint8_t *)malloc(buffer_size);
ESP_MEM_CHECK(TAG, client->mqtt_state.in_buffer, goto _mqtt_set_config_failed);
client->mqtt_state.in_buffer_length = buffer_size;
client->config->message_retransmit_timeout = config->session.message_retransmit_timeout;
if (config->session.message_retransmit_timeout <= 0) {
client->config->message_retransmit_timeout = MQTT_DEFAULT_RETRANSMIT_TIMEOUT_MS;
}{...}
client->config->task_prio = config->task.priority;
if (client->config->task_prio <= 0) {
client->config->task_prio = MQTT_TASK_PRIORITY;
}{...}
client->config->task_stack = config->task.stack_size;
if (client->config->task_stack <= 0) {
client->config->task_stack = MQTT_TASK_STACK;
}{...}
if (config->broker.address.port) {
client->config->port = config->broker.address.port;
}{...}
err = ESP_ERR_NO_MEM;
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.hostname, &client->config->host), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.path, &client->config->path), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.username, &client->mqtt_state.connection.information.username), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.authentication.password, &client->mqtt_state.connection.information.password), goto _mqtt_set_config_failed);
if (!config->credentials.set_null_client_id) {
if (config->credentials.client_id) {
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->credentials.client_id, &client->mqtt_state.connection.information.client_id), goto _mqtt_set_config_failed);
}{...} else if (client->mqtt_state.connection.information.client_id == NULL) {
client->mqtt_state.connection.information.client_id = platform_create_id_string();
}{...}
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->mqtt_state.connection.information.client_id);
}{...}
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->broker.address.uri, &client->config->uri), goto _mqtt_set_config_failed);
ESP_MEM_CHECK(TAG, esp_mqtt_set_if_config(config->session.last_will.topic, &client->mqtt_state.connection.information.will_topic), goto _mqtt_set_config_failed);
if (config->session.last_will.msg_len && config->session.last_will.msg) {
free(client->mqtt_state.connection.information.will_message);
client->mqtt_state.connection.information.will_message = malloc(config->session.last_will.msg_len);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
memcpy(client->mqtt_state.connection.information.will_message, config->session.last_will.msg, config->session.last_will.msg_len);
client->mqtt_state.connection.information.will_length = config->session.last_will.msg_len;
}{...} else if (config->session.last_will.msg) {
free(client->mqtt_state.connection.information.will_message);
client->mqtt_state.connection.information.will_message = strdup(config->session.last_will.msg);
ESP_MEM_CHECK(TAG, client->mqtt_state.connection.information.will_message, goto _mqtt_set_config_failed);
client->mqtt_state.connection.information.will_length = strlen(config->session.last_will.msg);
}{...}
if (config->session.last_will.qos) {
client->mqtt_state.connection.information.will_qos = config->session.last_will.qos;
}{...}
if (config->session.last_will.retain) {
client->mqtt_state.connection.information.will_retain = config->session.last_will.retain;
}{...}
if (config->session.disable_clean_session == client->mqtt_state.connection.information.clean_session) {
client->mqtt_state.connection.information.clean_session = !config->session.disable_clean_session;
if (!client->mqtt_state.connection.information.clean_session && config->credentials.set_null_client_id) {
ESP_LOGE(TAG, "Clean Session flag must be true if client has a null id");
}{...}
}{...}
if (config->session.keepalive) {
client->mqtt_state.connection.information.keepalive = config->session.keepalive;
}{...}
if (client->mqtt_state.connection.information.keepalive == 0) {
client->mqtt_state.connection.information.keepalive = MQTT_KEEPALIVE_TICK;
}{...}
if (config->session.disable_keepalive) {
client->mqtt_state.connection.information.keepalive = 0;
}{...}
if (config->session.protocol_ver) {
client->mqtt_state.connection.information.protocol_ver = config->session.protocol_ver;
}{...}
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_UNDEFINED) {
#ifdef MQTT_PROTOCOL_311
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1_1;
#else
client->mqtt_state.connection.information.protocol_ver = MQTT_PROTOCOL_V_3_1;
#endif
}{...} else if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifndef MQTT_PROTOCOL_5
ESP_LOGE(TAG, "Please first enable MQTT_PROTOCOL_5 feature in menuconfig");
goto _mqtt_set_config_failed;/* ... */
#endif
}{...}
client->config->network_timeout_ms = config->network.timeout_ms;
if (client->config->network_timeout_ms <= 0) {
client->config->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS;
}{...}
if (config->network.refresh_connection_after_ms) {
client->config->refresh_connection_after_ms = config->network.refresh_connection_after_ms;
}{...}
client->config->auto_reconnect = true;
if (config->network.disable_auto_reconnect == client->config->auto_reconnect) {
client->config->auto_reconnect = !config->network.disable_auto_reconnect;
}{...}
if (config->network.reconnect_timeout_ms) {
client->config->reconnect_timeout_ms = config->network.reconnect_timeout_ms;
}{...} else {
client->config->reconnect_timeout_ms = MQTT_RECON_DEFAULT_MS;
}{...}
if (config->network.transport) {
client->config->transport = config->network.transport;
}{...}
if (config->network.if_name) {
client->config->if_name = calloc(1, sizeof(struct ifreq) + 1);
ESP_MEM_CHECK(TAG, client->config->if_name, goto _mqtt_set_config_failed);
memcpy(client->config->if_name, config->network.if_name, sizeof(struct ifreq));
}{...}
if (config->broker.verification.alpn_protos) {
for (int i = 0; i < client->config->num_alpn_protos; i++) {
free(client->config->alpn_protos[i]);
}{...}
free(client->config->alpn_protos);
client->config->num_alpn_protos = 0;
const char **p;
for (p = config->broker.verification.alpn_protos; *p != NULL; p++ ) {
client->config->num_alpn_protos++;
}{...}
client->config->alpn_protos = calloc(client->config->num_alpn_protos + 1, sizeof(*config->broker.verification.alpn_protos));
ESP_MEM_CHECK(TAG, client->config->alpn_protos, goto _mqtt_set_config_failed);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
client->config->alpn_protos[i] = strdup(config->broker.verification.alpn_protos[i]);
ESP_MEM_CHECK(TAG, client->config->alpn_protos[i], goto _mqtt_set_config_failed);
}{...}
}{...}
client->config->use_global_ca_store = config->broker.verification.use_global_ca_store;
client->config->cacert_buf = config->broker.verification.certificate;
client->config->cacert_bytes = config->broker.verification.certificate_len;
client->config->psk_hint_key = config->broker.verification.psk_hint_key;
client->config->crt_bundle_attach = config->broker.verification.crt_bundle_attach;
client->config->clientcert_buf = config->credentials.authentication.certificate;
client->config->clientcert_bytes = config->credentials.authentication.certificate_len;
client->config->clientkey_buf = config->credentials.authentication.key;
client->config->clientkey_bytes = config->credentials.authentication.key_len;
client->config->skip_cert_common_name_check = config->broker.verification.skip_cert_common_name_check;
client->config->common_name = config->broker.verification.common_name;
client->config->use_secure_element = config->credentials.authentication.use_secure_element;
client->config->ds_data = config->credentials.authentication.ds_data;
if (config->credentials.authentication.key_password && config->credentials.authentication.key_password_len) {
client->config->clientkey_password_len = config->credentials.authentication.key_password_len;
client->config->clientkey_password = malloc(client->config->clientkey_password_len);
ESP_MEM_CHECK(TAG, client->config->clientkey_password, goto _mqtt_set_config_failed);
memcpy(client->config->clientkey_password, config->credentials.authentication.key_password, client->config->clientkey_password_len);
}{...}
if (config->broker.address.transport) {
free(client->config->scheme);
client->config->scheme = NULL;
if (config->broker.address.transport == MQTT_TRANSPORT_OVER_TCP) {
client->config->scheme = create_string(MQTT_OVER_TCP_SCHEME, strlen(MQTT_OVER_TCP_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}{...}
#if MQTT_ENABLE_WS
else if (config->broker.address.transport == MQTT_TRANSPORT_OVER_WS) {
client->config->scheme = create_string(MQTT_OVER_WS_SCHEME, strlen(MQTT_OVER_WS_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}{...}
#endif/* ... */
#if MQTT_ENABLE_SSL
else if (config->broker.address.transport == MQTT_TRANSPORT_OVER_SSL) {
client->config->scheme = create_string(MQTT_OVER_SSL_SCHEME, strlen(MQTT_OVER_SSL_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}{...}
#endif/* ... */
#if MQTT_ENABLE_WSS
else if (config->broker.address.transport == MQTT_TRANSPORT_OVER_WSS) {
client->config->scheme = create_string(MQTT_OVER_WSS_SCHEME, strlen(MQTT_OVER_WSS_SCHEME));
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_set_config_failed);
}{...}
#endif/* ... */
}{...}
if (config->broker.address.uri) {
if (esp_mqtt_client_set_uri(client, client->config->uri) != ESP_OK) {
err = ESP_FAIL;
goto _mqtt_set_config_failed;
}{...}
}{...}
client->config->outbox_limit = config->outbox.limit;
esp_err_t config_has_conflict = esp_mqtt_check_cfg_conflict(client->config, config);
MQTT_API_UNLOCK(client);
return config_has_conflict;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK(client);
return err;
}{ ... }
void esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
{
if (client->config == NULL) {
return;
}{...}
free(client->mqtt_state.in_buffer);
mqtt_msg_buffer_destroy(&client->mqtt_state.connection);
free(client->config->host);
free(client->config->uri);
free(client->config->path);
free(client->config->scheme);
for (int i = 0; i < client->config->num_alpn_protos; i++) {
free(client->config->alpn_protos[i]);
}{...}
free(client->config->alpn_protos);
free(client->config->clientkey_password);
free(client->config->if_name);
free(client->mqtt_state.connection.information.will_topic);
free(client->mqtt_state.connection.information.will_message);
free(client->mqtt_state.connection.information.client_id);
free(client->mqtt_state.connection.information.username);
free(client->mqtt_state.connection.information.password);
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_destory(client);
#endif
memset(&client->mqtt_state.connection.information, 0, sizeof(mqtt_connect_info_t));
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
if (client->config->event_loop_handle) {
esp_event_loop_delete(client->config->event_loop_handle);
}{...}
#endif/* ... */
esp_transport_destroy(client->config->transport);
memset(client->config, 0, sizeof(mqtt_config_storage_t));
free(client->config);
client->config = NULL;
}{ ... }
static inline bool has_timed_out(uint64_t last_tick, uint64_t timeout)
{
uint64_t next = last_tick + timeout;
return (int64_t)(next - platform_tick_get_ms()) <= 0;
}{ ... }
static esp_err_t process_keepalive(esp_mqtt_client_handle_t client)
{
if (client->mqtt_state.connection.information.keepalive > 0) {
const uint64_t keepalive_ms = client->mqtt_state.connection.information.keepalive * 1000;
if (client->wait_for_ping_resp == true ) {
if (has_timed_out(client->keepalive_tick, keepalive_ms)) {
ESP_LOGE(TAG, "No PING_RESP, disconnected");
esp_mqtt_abort_connection(client);
client->wait_for_ping_resp = false;
return ESP_FAIL;
}{...}
return ESP_OK;
}{...}
if (has_timed_out(client->keepalive_tick, keepalive_ms / 2)) {
if (esp_mqtt_client_ping(client) == ESP_FAIL) {
ESP_LOGE(TAG, "Can't send ping, disconnected");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}{...}
client->wait_for_ping_resp = true;
return ESP_OK;
}{...}
}{...}
return ESP_OK;
}{ ... }
static inline esp_err_t esp_mqtt_write(esp_mqtt_client_handle_t client)
{
int wlen = 0, widx = 0, len = client->mqtt_state.connection.outbound_message.length;
while (len > 0) {
wlen = esp_transport_write(client->transport,
(char *)client->mqtt_state.connection.outbound_message.data + widx,
len,
client->config->network_timeout_ms);
if (wlen < 0) {
ESP_LOGE(TAG, "Writing failed: errno=%d", errno);
esp_mqtt_client_dispatch_transport_error(client);
return ESP_FAIL;
}{...}
if (wlen == 0) {
ESP_LOGE(TAG, "Writing didn't complete in specified timeout: errno=%d", errno);
return ESP_ERR_TIMEOUT;
}{...}
widx += wlen;
len -= wlen;
}{...}
return ESP_OK;
}{ ... }
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms)
{
int read_len, connect_rsp_code = 0;
client->wait_for_ping_resp = false;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_connect(&client->mqtt_state.connection,
&client->mqtt_state.connection.information, &client->mqtt5_config->connect_property_info, &client->mqtt5_config->will_property_info);/* ... */
#endif
}{...} else {
mqtt_msg_connect(&client->mqtt_state.connection,
&client->mqtt_state.connection.information);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Connect message cannot be created");
return ESP_FAIL;
}{...}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->mqtt_state.pending_msg_id = mqtt5_get_id(client->mqtt_state.connection.outbound_message.data,
client->mqtt_state.connection.outbound_message.length);/* ... */
#endif
}{...} else {
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.connection.outbound_message.data,
client->mqtt_state.connection.outbound_message.length);
}{...}
ESP_LOGD(TAG, "Sending MQTT CONNECT message, type: %d, id: %04X",
client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
if (esp_mqtt_write(client) != ESP_OK) {
return ESP_FAIL;
}{...}
client->mqtt_state.in_buffer_read_len = 0;
client->mqtt_state.message_length = 0;
uint64_t connack_recv_started = platform_tick_get_ms();
do {
read_len = mqtt_message_receive(client, client->config->network_timeout_ms);
}{...} while (read_len == 0 && platform_tick_get_ms() - connack_recv_started < client->config->network_timeout_ms);
if (read_len <= 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, read_len);
return ESP_FAIL;
}{...}
if (mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK) {
ESP_LOGE(TAG, "Invalid MSG_TYPE response: %d, read_len: %d", mqtt_get_type(client->mqtt_state.in_buffer), read_len);
return ESP_FAIL;
}{...}
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_parse_connack(client, &connect_rsp_code) == ESP_OK) {
client->send_publish_packet_count = 0;
return ESP_OK;
}{...}
#endif/* ... */
}{...} else {
client->mqtt_state.in_buffer_read_len = 0;
connect_rsp_code = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
if (connect_rsp_code == MQTT_CONNECTION_ACCEPTED) {
ESP_LOGD(TAG, "Connected");
return ESP_OK;
}{...}
switch (connect_rsp_code) {
case MQTT_CONNECTION_REFUSE_PROTOCOL:
ESP_LOGW(TAG, "Connection refused, bad protocol");
break;...
case MQTT_CONNECTION_REFUSE_SERVER_UNAVAILABLE:
ESP_LOGW(TAG, "Connection refused, server unavailable");
break;...
case MQTT_CONNECTION_REFUSE_BAD_USERNAME:
ESP_LOGW(TAG, "Connection refused, bad username or password");
break;...
case MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED:
ESP_LOGW(TAG, "Connection refused, not authorized");
break;...
default:
ESP_LOGW(TAG, "Connection refused, Unknow reason");
break;...
}{...}
}{...}
client->event.event_id = MQTT_EVENT_ERROR;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_CONNECTION_REFUSED;
client->event.error_handle->connect_return_code = connect_rsp_code;
client->event.error_handle->esp_tls_stack_err = 0;
client->event.error_handle->esp_tls_last_esp_err = 0;
client->event.error_handle->esp_tls_cert_verify_flags = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_FAIL;
}{ ... }
static void esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
{
MQTT_API_LOCK(client);
esp_transport_close(client->transport);
client->wait_timeout_ms = client->config->reconnect_timeout_ms;
client->reconnect_tick = platform_tick_get_ms();
client->state = MQTT_STATE_WAIT_RECONNECT;
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->wait_for_ping_resp = false;
esp_mqtt_dispatch_event_with_msgid(client);
MQTT_API_UNLOCK(client);
}{ ... }
static bool create_client_data(esp_mqtt_client_handle_t client)
{
client->event.error_handle = calloc(1, sizeof(esp_mqtt_error_codes_t));
ESP_MEM_CHECK(TAG, client->event.error_handle, return false)
client->api_lock = xSemaphoreCreateRecursiveMutex();
ESP_MEM_CHECK(TAG, client->api_lock, return false);
client->outbox = outbox_init();
ESP_MEM_CHECK(TAG, client->outbox, return false);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, return false);
return true;
}{ ... }
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config)
{
esp_mqtt_client_handle_t client = heap_caps_calloc(1, sizeof(struct esp_mqtt_client),
#if MQTT_EVENT_QUEUE_SIZE > 1
MALLOC_CAP_INTERNAL | MALLOC_CAP_8BIT);/* ... */
#else
MALLOC_CAP_DEFAULT);
#endif
ESP_MEM_CHECK(TAG, client, return NULL);
if (!create_client_data(client)) {
goto _mqtt_init_failed;
}{...}
if (esp_mqtt_set_config(client, config) != ESP_OK) {
goto _mqtt_init_failed;
}{...}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_loop_args_t no_task_loop = {
.queue_size = MQTT_EVENT_QUEUE_SIZE,
.task_name = NULL,
}{...};
esp_event_loop_create(&no_task_loop, &client->config->event_loop_handle);
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_init(&client->queued_events, 0);
#endif/* ... */
#endif
client->keepalive_tick = platform_tick_get_ms();
client->reconnect_tick = platform_tick_get_ms();
client->refresh_connection_tick = platform_tick_get_ms();
client->wait_for_ping_resp = false;
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_create_default_config(client) != ESP_OK) {
goto _mqtt_init_failed;
}{...}
#endif/* ... */
return client;
_mqtt_init_failed:
esp_mqtt_client_destroy(client);
return NULL;
}{ ... }
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}{...}
if (client->run) {
esp_mqtt_client_stop(client);
}{...}
esp_mqtt_destroy_config(client);
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
}{...}
if (client->outbox) {
outbox_destroy(client->outbox);
}{...}
if (client->status_bits) {
vEventGroupDelete(client->status_bits);
}{...}
if (client->api_lock) {
vSemaphoreDelete(client->api_lock);
}{...}
free(client->event.error_handle);
free(client);
return ESP_OK;
}{ ... }
static char *create_string(const char *ptr, int len)
{
char *ret;
if (len <= 0) {
return NULL;
}{...}
ret = calloc(1, len + 1);
ESP_MEM_CHECK(TAG, ret, return NULL);
memcpy(ret, ptr, len);
return ret;
}{ ... }
esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri)
{
struct http_parser_url puri;
http_parser_url_init(&puri);
int parser_status = http_parser_parse_url(uri, strlen(uri), 0, &puri);
if (parser_status != 0) {
ESP_LOGE(TAG, "Error parse uri = %s", uri);
return ESP_FAIL;
}{...}
MQTT_API_LOCK(client);
#pragma GCC diagnostic push
#if __clang__
#pragma clang diagnostic ignored "-Wunknown-warning-option"
#else
#pragma GCC diagnostic ignored "-Wpragmas"
#endif
#pragma GCC diagnostic ignored "-Wanalyzer-malloc-leak"
free(client->config->scheme);
free(client->config->host);
free(client->config->path);
client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len);
client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len);
client->config->path = NULL;
#pragma GCC diagnostic pop
if (puri.field_data[UF_PATH].len || puri.field_data[UF_QUERY].len) {
int asprintf_ret_value;
if (puri.field_data[UF_QUERY].len == 0) {
asprintf_ret_value = asprintf(&client->config->path,
"%.*s",
puri.field_data[UF_PATH].len, uri + puri.field_data[UF_PATH].off);
}{...} else if (puri.field_data[UF_PATH].len == 0) {
asprintf_ret_value = asprintf(&client->config->path,
"/?%.*s",
puri.field_data[UF_QUERY].len, uri + puri.field_data[UF_QUERY].off);
}{...} else {
asprintf_ret_value = asprintf(&client->config->path,
"%.*s?%.*s",
puri.field_data[UF_PATH].len, uri + puri.field_data[UF_PATH].off,
puri.field_data[UF_QUERY].len, uri + puri.field_data[UF_QUERY].off);
}{...}
if (asprintf_ret_value == -1) {
ESP_LOGE(TAG,"%s(%d): %s", __FUNCTION__, __LINE__, "Memory exhausted");
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
}{...}
}{...}
if (puri.field_data[UF_PORT].len) {
client->config->port = strtol((const char *)(uri + puri.field_data[UF_PORT].off), NULL, 10);
}{...}
char *user_info = create_string(uri + puri.field_data[UF_USERINFO].off, puri.field_data[UF_USERINFO].len);
if (user_info) {
char *pass = strchr(user_info, ':');
if (pass) {
pass[0] = 0;
pass ++;
client->mqtt_state.connection.information.password = strdup(pass);
}{...}
client->mqtt_state.connection.information.username = strdup(user_info);
free(user_info);
}{...}
MQTT_API_UNLOCK(client);
return ESP_OK;
}{ ... }
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->event.msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
#endif
}{...} else {
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
}{...}
return esp_mqtt_dispatch_event(client);
}{ ... }
esp_err_t esp_mqtt_dispatch_custom_event(esp_mqtt_client_handle_t client, esp_mqtt_event_t *event)
{
esp_err_t ret = esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, MQTT_USER_EVENT, event, sizeof(*event), 0);
#if MQTT_EVENT_QUEUE_SIZE > 1
if (ret == ESP_OK) {
atomic_fetch_add(&client->queued_events, 1);
}{...}
#endif/* ... */
return ret;
}{ ... }
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.client = client;
client->event.protocol_ver = client->mqtt_state.connection.information.protocol_ver;
esp_err_t ret = ESP_FAIL;
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
esp_event_post_to(client->config->event_loop_handle, MQTT_EVENTS, client->event.event_id, &client->event, sizeof(client->event), portMAX_DELAY);
ret = esp_event_loop_run(client->config->event_loop_handle, 0);/* ... */
#else
return ESP_FAIL;
#endif
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_client_delete_user_property(client->event.property->user_property);
client->event.property->user_property = NULL;/* ... */
#endif
}{...}
return ret;
}{ ... }
static esp_err_t deliver_publish(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_read_len = client->mqtt_state.in_buffer_read_len;
size_t msg_total_len = client->mqtt_state.message_length;
size_t msg_topic_len = msg_read_len, msg_data_len = msg_read_len;
size_t msg_data_offset = 0;
char *msg_topic = NULL, *msg_data = NULL;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
if (esp_mqtt5_get_publish_data(client, msg_buf, msg_read_len, &msg_topic, &msg_topic_len, &msg_data, &msg_data_len) != ESP_OK) {
ESP_LOGE(TAG, "%s: esp_mqtt5_get_publish_data() failed", __func__);
return ESP_FAIL;
}{...}
#endif/* ... */
}{...} else {
msg_topic = mqtt_get_publish_topic(msg_buf, &msg_topic_len);
if (msg_topic == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_topic() failed", __func__);
return ESP_FAIL;
}{...}
ESP_LOGD(TAG, "%s: msg_topic_len=%"NEWLIB_NANO_COMPAT_FORMAT, __func__, NEWLIB_NANO_COMPAT_CAST(msg_topic_len));
msg_data = mqtt_get_publish_data(msg_buf, &msg_data_len);
if (msg_data_len > 0 && msg_data == NULL) {
ESP_LOGE(TAG, "%s: mqtt_get_publish_data() failed", __func__);
return ESP_FAIL;
}{...}
}{...}
client->event.retain = mqtt_get_retain(msg_buf);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
client->event.msg_id = mqtt5_get_id(msg_buf, msg_read_len);
#endif
}{...} else {
client->event.msg_id = mqtt_get_id(msg_buf, msg_read_len);
}{...}
client->event.qos = mqtt_get_qos(msg_buf);
client->event.dup = mqtt_get_dup(msg_buf);
client->event.total_data_len = msg_data_len + msg_total_len - msg_read_len;
post_data_event:
ESP_LOGD(TAG, "Get data len= %"NEWLIB_NANO_COMPAT_FORMAT", topic len=%"NEWLIB_NANO_COMPAT_FORMAT", total_data: %d offset: %"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(msg_data_len), NEWLIB_NANO_COMPAT_CAST(msg_topic_len),
client->event.total_data_len, NEWLIB_NANO_COMPAT_CAST(msg_data_offset));
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = msg_data_len > 0 ? msg_data : NULL;
client->event.data_len = msg_data_len;
client->event.current_data_offset = msg_data_offset;
client->event.topic = msg_topic;
client->event.topic_len = msg_topic_len;
esp_mqtt_dispatch_event(client);
if (msg_read_len < msg_total_len) {
size_t buf_len = client->mqtt_state.in_buffer_length;
msg_data = (char *)client->mqtt_state.in_buffer;
msg_topic = NULL;
msg_topic_len = 0;
msg_data_offset += msg_data_len;
int ret = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer,
msg_total_len - msg_read_len > buf_len ? buf_len : msg_total_len - msg_read_len,
client->config->network_timeout_ms);
if (ret <= 0) {
return esp_mqtt_handle_transport_read_error(ret, client, false) == 0 ? ESP_OK : ESP_FAIL;
}{...}
msg_data_len = ret;
msg_read_len += msg_data_len;
goto post_data_event;
}{...}
return ESP_OK;
}{ ... }
static esp_err_t deliver_suback(esp_mqtt_client_handle_t client)
{
uint8_t *msg_buf = client->mqtt_state.in_buffer;
size_t msg_data_len = client->mqtt_state.in_buffer_read_len;
char *msg_data = NULL;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
msg_data = mqtt5_get_suback_data(msg_buf, &msg_data_len, &client->event.property->user_property);
#else
return ESP_FAIL;/* ... */
#endif
}{...} else {
msg_data = mqtt_get_suback_data(msg_buf, &msg_data_len);
}{...}
if (msg_data_len <= 0) {
ESP_LOGE(TAG, "Failed to acquire suback data");
return ESP_FAIL;
}{...}
client->event.error_handle->esp_tls_stack_err = 0;
client->event.error_handle->esp_tls_last_esp_err = 0;
client->event.error_handle->esp_tls_cert_verify_flags = 0;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_NONE;
client->event.error_handle->connect_return_code = MQTT_CONNECTION_ACCEPTED;
for (int topic = 0; topic < msg_data_len; ++topic) {
if ((uint8_t)msg_data[topic] >= 0x80) {
client->event.error_handle->error_type = MQTT_ERROR_TYPE_SUBSCRIBE_FAILED;
break;
}{...}
}{...}
client->event.data_len = msg_data_len;
client->event.total_data_len = msg_data_len;
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
client->event.data = msg_data;
client->event.current_data_offset = 0;
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK;
}{ ... }
static bool remove_initiator_message(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
{
if (outbox_delete(client->outbox, msg_id, msg_type) == ESP_OK) {
ESP_LOGD(TAG, "Removed pending_id=%d", client->mqtt_state.pending_msg_id);
return true;
}{...}
ESP_LOGD(TAG, "Failed to remove pending_id=%d", client->mqtt_state.pending_msg_id);
return false;
}{ ... }
static outbox_item_handle_t mqtt_enqueue(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.connection.outbound_message.data;
msg.len = client->mqtt_state.connection.outbound_message.length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
return outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}{ ... }
/* ... */
static int mqtt_message_receive(esp_mqtt_client_handle_t client, int read_poll_timeout_ms)
{
int read_len, total_len, fixed_header_len;
uint8_t *buf = client->mqtt_state.in_buffer + client->mqtt_state.in_buffer_read_len;
esp_transport_handle_t t = client->transport;
client->mqtt_state.message_length = 0;
if (client->mqtt_state.in_buffer_read_len == 0) {
/* ... */
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client, false);
}{...}
ESP_LOGD(TAG, "%s: first byte: 0x%x", __func__, *buf);
/* ... */
if (!mqtt_has_valid_msg_hdr(buf, read_len)) {
ESP_LOGE(TAG, "%s: received a message with an invalid header=0x%x", __func__, *buf);
goto err;
}{...}
buf++;
client->mqtt_state.in_buffer_read_len++;
}{...}
if ((client->mqtt_state.in_buffer_read_len == 1) ||
((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80))) {
do {
/* ... */
read_len = esp_transport_read(t, (char *)buf, 1, read_poll_timeout_ms);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}{...}
ESP_LOGD(TAG, "%s: read \"remaining length\" byte: 0x%x", __func__, *buf);
buf++;
client->mqtt_state.in_buffer_read_len++;
}{...} while ((client->mqtt_state.in_buffer_read_len < 6) && (*(buf - 1) & 0x80));
}{...}
total_len = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len, &fixed_header_len);
ESP_LOGD(TAG, "%s: total message length: %d (already read: %"NEWLIB_NANO_COMPAT_FORMAT")", __func__, total_len, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len));
client->mqtt_state.message_length = total_len;
if (client->mqtt_state.in_buffer_length < total_len) {
if (mqtt_get_type(client->mqtt_state.in_buffer) == MQTT_MSG_TYPE_PUBLISH) {
/* ... */
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
read_len = esp_transport_read(t, (char *)buf, client->mqtt_state.in_buffer_read_len - fixed_header_len + 2, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}{...}
client->mqtt_state.in_buffer_read_len += read_len;
buf += read_len;
if (client->mqtt_state.in_buffer_read_len < fixed_header_len + 2) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"NEWLIB_NANO_COMPAT_FORMAT")",
__func__, total_len, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len));
return 0;
}{...}
}{...}
int topic_len = client->mqtt_state.in_buffer[fixed_header_len] << 8;
topic_len |= client->mqtt_state.in_buffer[fixed_header_len + 1];
total_len = fixed_header_len + topic_len + (mqtt_get_qos(client->mqtt_state.in_buffer) > 0 ? 2 : 0);
ESP_LOGD(TAG, "%s: total len modified to %d as message longer than input buffer", __func__, total_len);
if (client->mqtt_state.in_buffer_length < total_len) {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
}{...} else {
total_len = client->mqtt_state.in_buffer_length;
}{...}
}{...} else {
ESP_LOGE(TAG, "%s: message is too big, insufficient buffer size", __func__);
goto err;
}{...}
}{...}
if (client->mqtt_state.in_buffer_read_len < total_len) {
read_len = esp_transport_read(t, (char *)buf, total_len - client->mqtt_state.in_buffer_read_len, read_poll_timeout_ms);
ESP_LOGD(TAG, "%s: read_len=%d", __func__, read_len);
if (read_len <= 0) {
return esp_mqtt_handle_transport_read_error(read_len, client, true);
}{...}
client->mqtt_state.in_buffer_read_len += read_len;
if (client->mqtt_state.in_buffer_read_len < total_len) {
ESP_LOGD(TAG, "%s: transport_read(): message reading left in progress :: total message length: %d (already read: %"NEWLIB_NANO_COMPAT_FORMAT")",
__func__, total_len, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len));
return 0;
}{...}
}{...}
ESP_LOGV(TAG, "%s: transport_read():%"NEWLIB_NANO_COMPAT_FORMAT" %"NEWLIB_NANO_COMPAT_FORMAT, __func__,
NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len), NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.message_length));
return 1;
err:
esp_mqtt_client_dispatch_transport_error(client);
return -2;
}{ ... }
static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
{
uint8_t msg_type = 0, msg_qos = 0;
uint16_t msg_id = 0;
size_t previous_in_buffer_read_len = client->mqtt_state.in_buffer_read_len;
int recv = mqtt_message_receive(client, 0);
if (recv == 0) {
return ESP_OK;
}{...}
if (recv == -1) {
if (previous_in_buffer_read_len == client->mqtt_state.in_buffer_read_len) {
ESP_LOGE(TAG, "%s: Network timeout while reading MQTT message", __func__);
return ESP_FAIL;
}{...}
return ESP_OK;
}{...}
if (recv < 0) {
ESP_LOGE(TAG, "%s: mqtt_message_receive() returned %d", __func__, recv);
return ESP_FAIL;
}{...}
int read_len = client->mqtt_state.message_length;
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
msg_id = mqtt5_get_id(client->mqtt_state.in_buffer, read_len);
#endif
}{...} else {
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, read_len);
}{...}
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (remove_initiator_message(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_suback(client);
#endif
ESP_LOGD(TAG, "deliver_suback, message_length_read=%"NEWLIB_NANO_COMPAT_FORMAT", message_length=%"NEWLIB_NANO_COMPAT_FORMAT,
NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len), NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.message_length));
if (deliver_suback(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver suback message id=%d", msg_id);
return ESP_FAIL;
}{...}
}{...}
break;...
case MQTT_MSG_TYPE_UNSUBACK:
if (remove_initiator_message(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_unsuback(client);
#endif
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}{...}
break;...
case MQTT_MSG_TYPE_PUBLISH:
ESP_LOGD(TAG, "deliver_publish, message_length_read=%"NEWLIB_NANO_COMPAT_FORMAT", message_length=%"NEWLIB_NANO_COMPAT_FORMAT, NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.in_buffer_read_len), NEWLIB_NANO_COMPAT_CAST(client->mqtt_state.message_length));
if (deliver_publish(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to deliver publish message id=%d", msg_id);
return ESP_FAIL;
}{...}
if (msg_qos == 1 || msg_qos == 2) {
if (msg_qos == 1) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_puback(&client->mqtt_state.connection, msg_id);
#endif
}{...} else {
mqtt_msg_puback(&client->mqtt_state.connection, msg_id);
}{...}
}{...} else if (msg_qos == 2) {
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_pubrec(&client->mqtt_state.connection, msg_id);
#endif
}{...} else {
mqtt_msg_pubrec(&client->mqtt_state.connection, msg_id);
}{...}
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBACK or PUBREC cannot be created");
return ESP_FAIL;
}{...}
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
return ESP_FAIL;
}{...}
}{...}
break;...
case MQTT_MSG_TYPE_PUBACK:
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}{...}
#endif/* ... */
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_puback(client);
#endif
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}{...}
break;...
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREC return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
mqtt5_msg_pubrel(&client->mqtt_state.connection, msg_id);/* ... */
#endif
}{...} else {
mqtt_msg_pubrel(&client->mqtt_state.connection, msg_id);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBREL cannot be created");
return ESP_FAIL;
}{...}
outbox_set_pending(client->outbox, msg_id, ACKNOWLEDGED);
esp_mqtt_write(client);
break;...
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
ESP_LOGI(TAG, "MQTT_MSG_TYPE_PUBREL return code is %d", mqtt5_msg_get_reason_code(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_read_len));
mqtt5_msg_pubcomp(&client->mqtt_state.connection, msg_id);/* ... */
#endif
}{...} else {
mqtt_msg_pubcomp(&client->mqtt_state.connection, msg_id);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish response message PUBCOMP cannot be created");
return ESP_FAIL;
}{...}
esp_mqtt_write(client);
break;...
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_decrement_packet_counter(client);
}{...}
#endif/* ... */
if (remove_initiator_message(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
#ifdef MQTT_PROTOCOL_5
esp_mqtt5_parse_pubcomp(client);
#endif
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}{...}
break;...
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
/* ... */
client->keepalive_tick = platform_tick_get_ms();
break;...
}{...}
client->mqtt_state.in_buffer_read_len = 0;
return ESP_OK;
}{ ... }
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
client->mqtt_state.connection.outbound_message.data = outbox_item_get_data(item, &client->mqtt_state.connection.outbound_message.length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos > 0 && (outbox_item_get_pending(item) == TRANSMITTED)) {
mqtt_set_dup(client->mqtt_state.connection.outbound_message.data);
ESP_LOGD(TAG, "Sending Duplicated QoS%d message with id=%d", client->mqtt_state.pending_publish_qos, client->mqtt_state.pending_msg_id);
}{...}
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to resend data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}{...}
return ESP_OK;
}{ ... }
static void mqtt_delete_expired_messages(esp_mqtt_client_handle_t client)
{
#if MQTT_REPORT_DELETED_MESSAGES
int msg_id = 0;
while ((msg_id = outbox_delete_single_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS)) > 0) {
client->event.event_id = MQTT_EVENT_DELETED;
client->event.msg_id = msg_id;
if (esp_mqtt_dispatch_event(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to post event on deleting message id=%d", msg_id);
}{...}
}{...}
#else/* ... */
outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
#endif
}{ ... }
/* ... */
static inline int max_poll_timeout(esp_mqtt_client_handle_t client, int max_timeout)
{
return
#if MQTT_EVENT_QUEUE_SIZE > 1
atomic_load(&client->queued_events) > 0 ? 10 : max_timeout;
#else
max_timeout;
#endif
}{ ... }
static inline void run_event_loop(esp_mqtt_client_handle_t client)
{
#if MQTT_EVENT_QUEUE_SIZE > 1
if (atomic_load(&client->queued_events) > 0) {
atomic_fetch_sub(&client->queued_events, 1);
#else
{
#endif
esp_err_t ret = esp_event_loop_run(client->config->event_loop_handle, 0);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Error in running event_loop %d", ret);
}{...}
}{...}
}{ ... }
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint64_t last_retransmit = 0;
outbox_tick_t msg_tick = 0;
client->run = true;
client->state = MQTT_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
MQTT_API_LOCK(client);
run_event_loop(client);
mqtt_delete_expired_messages(client);
mqtt_client_state_t state = client->state;
switch (state) {
case MQTT_STATE_DISCONNECTED:
break;...
case MQTT_STATE_INIT:
xEventGroupClearBits(client->status_bits, RECONNECT_BIT | DISCONNECT_BIT);
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event_with_msgid(client);
client->transport = client->config->transport;
if (!client->transport) {
if (esp_mqtt_client_create_transport(client) != ESP_OK) {
ESP_LOGE(TAG, "Failed to create transport list");
client->run = false;
break;
}{...}
client->transport = esp_transport_list_get_transport(client->transport_list, client->config->scheme);
if (client->transport == NULL) {
ESP_LOGE(TAG, "There are no transports valid, stop mqtt client, config scheme = %s", client->config->scheme);
client->run = false;
break;
}{...}
}{...}
if (client->config->port == 0) {
client->config->port = esp_transport_get_default_port(client->transport);
}{...}
#if MQTT_ENABLE_SSL
esp_mqtt_set_ssl_transport_properties(client->transport_list, client->config);
#endif
if (esp_transport_connect(client->transport,
client->config->host,
client->config->port,
client->config->network_timeout_ms) < 0) {
ESP_LOGE(TAG, "Error transport connect");
esp_mqtt_client_dispatch_transport_error(client);
esp_mqtt_abort_connection(client);
break;
}{...}
ESP_LOGD(TAG, "Transport connected to %s://%s:%d", client->config->scheme, client->config->host, client->config->port);
if (esp_mqtt_connect(client, client->config->network_timeout_ms) != ESP_OK) {
ESP_LOGE(TAG, "MQTT connect failed");
esp_mqtt_abort_connection(client);
break;
}{...}
client->event.event_id = MQTT_EVENT_CONNECTED;
if (client->mqtt_state.connection.information.protocol_ver != MQTT_PROTOCOL_V_5) {
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
}{...}
client->state = MQTT_STATE_CONNECTED;
esp_mqtt_dispatch_event_with_msgid(client);
client->refresh_connection_tick = platform_tick_get_ms();
client->keepalive_tick = platform_tick_get_ms();
break;...
case MQTT_STATE_CONNECTED:
if (xEventGroupWaitBits(client->status_bits, DISCONNECT_BIT, true, true, 0) & DISCONNECT_BIT) {
send_disconnect_msg(client);
esp_mqtt_abort_connection(client);
break;
}{...}
if (mqtt_process_receive(client) == ESP_FAIL) {
esp_mqtt_abort_connection(client);
break;
}{...}
if (last_retransmit == 0) {
last_retransmit = platform_tick_get_ms();
}{...}
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
if (outbox_delete_item(client->outbox, item) != ESP_OK) {
ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
}{...}
}{...}
if (client->mqtt_state.pending_publish_qos > 0) {
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}{...}
#endif/* ... */
}{...}
}{...}
}{...} else if (has_timed_out(last_retransmit, client->config->message_retransmit_timeout)) {
last_retransmit = platform_tick_get_ms();
item = outbox_dequeue(client->outbox, TRANSMITTED, &msg_tick);
if (item && (last_retransmit - msg_tick > client->config->message_retransmit_timeout)) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}{...}
#endif/* ... */
}{...}
}{...}
}{...}
if (process_keepalive(client) != ESP_OK) {
break;
}{...}
if (client->config->refresh_connection_after_ms &&
has_timed_out(client->refresh_connection_tick, client->config->refresh_connection_after_ms)) {
ESP_LOGD(TAG, "Refreshing the connection...");
esp_mqtt_abort_connection(client);
client->state = MQTT_STATE_INIT;
}{...}
break;...
case MQTT_STATE_WAIT_RECONNECT:
if (!client->config->auto_reconnect && xEventGroupGetBits(client->status_bits)&RECONNECT_BIT) {
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
client->state = MQTT_STATE_INIT;
client->wait_timeout_ms = MQTT_RECON_DEFAULT_MS;
ESP_LOGD(TAG, "Reconnecting per user request...");
break;
}{...} else if (client->config->auto_reconnect &&
platform_tick_get_ms() - client->reconnect_tick > client->wait_timeout_ms) {
client->state = MQTT_STATE_INIT;
client->reconnect_tick = platform_tick_get_ms();
ESP_LOGD(TAG, "Reconnecting...");
break;
}{...}
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
max_poll_timeout(client, client->wait_timeout_ms / 2 / portTICK_PERIOD_MS));
continue;...
default:
ESP_LOGE(TAG, "MQTT client error, client is in an unrecoverable state.");
break;...
}{...}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}{...}
}{...}
}{...}
esp_transport_close(client->transport);
outbox_delete_all_items(client->outbox);
xEventGroupSetBits(client->status_bits, STOPPED_BIT);
client->state = MQTT_STATE_DISCONNECTED;
vTaskDelete(NULL);
}{ ... }
esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}{...}
MQTT_API_LOCK(client);
if (client->state != MQTT_STATE_INIT && client->state != MQTT_STATE_DISCONNECTED) {
ESP_LOGE(TAG, "Client has started");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}{...}
esp_err_t err = ESP_OK;
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
err = ESP_FAIL;
}{...}
#else/* ... */
ESP_LOGD(TAG, "Core selection disabled");
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
err = ESP_FAIL;
}{...}
#endif/* ... */
MQTT_API_UNLOCK(client);
return err;
}{ ... }
esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}{...}
ESP_LOGI(TAG, "Client asked to disconnect");
xEventGroupSetBits(client->status_bits, DISCONNECT_BIT);
return ESP_OK;
}{ ... }
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}{...}
ESP_LOGI(TAG, "Client force reconnect requested");
if (client->state != MQTT_STATE_WAIT_RECONNECT) {
ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request");
return ESP_FAIL;
}{...}
client->wait_timeout_ms = 0;
xEventGroupSetBits(client->status_bits, RECONNECT_BIT);
return ESP_OK;
}{ ... }
static esp_err_t send_disconnect_msg(esp_mqtt_client_handle_t client)
{
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_disconnect(&client->mqtt_state.connection, &client->mqtt5_config->disconnect_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
esp_mqtt5_client_delete_user_property(client->mqtt5_config->disconnect_property_info.user_property);
client->mqtt5_config->disconnect_property_info.user_property = NULL;
memset(&client->mqtt5_config->disconnect_property_info, 0, sizeof(esp_mqtt5_disconnect_property_config_t));
}{...}
#endif/* ... */
}{...} else {
mqtt_msg_disconnect(&client->mqtt_state.connection);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Disconnect message cannot be created");
return ESP_FAIL;
}{...}
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending disconnect message");
}{...}
return ESP_OK;
}{ ... }
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return ESP_ERR_INVALID_ARG;
}{...}
MQTT_API_LOCK(client);
if (client->run) {
TaskHandle_t running_task = xTaskGetCurrentTaskHandle();
if (running_task == client->task_handle) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Client cannot be stopped from MQTT task");
return ESP_FAIL;
}{...}
if (client->state == MQTT_STATE_CONNECTED) {
if (send_disconnect_msg(client) != ESP_OK) {
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}{...}
}{...}
client->run = false;
client->state = MQTT_STATE_DISCONNECTED;
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, STOPPED_BIT, false, true, portMAX_DELAY);
return ESP_OK;
}{...} else {
ESP_LOGW(TAG, "Client asked to stop, but was not started");
MQTT_API_UNLOCK(client);
return ESP_FAIL;
}{...}
}{ ... }
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client)
{
mqtt_msg_pingreq(&client->mqtt_state.connection);
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Ping message cannot be created");
return ESP_FAIL;
}{...}
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error sending ping");
return ESP_FAIL;
}{...}
ESP_LOGD(TAG, "Sent PING successful");
return ESP_OK;
}{ ... }
int esp_mqtt_client_subscribe_multiple(esp_mqtt_client_handle_t client,
const esp_mqtt_topic_t *topic_list, int size)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}{...}
if (client->config->outbox_limit > 0 && outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}{...}
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}{...}
MQTT_API_LOCK(client);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
int max_qos = topic_list[0].qos;
for (int topic_number = 0; topic_number < size; ++topic_number) {
if (topic_list[topic_number].qos > max_qos) {
max_qos = topic_list[topic_number].qos;
}{...}
}{...}
if (esp_mqtt5_client_subscribe_check(client, max_qos) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 subscribe check fail: QoS %d not accepted by broker ", max_qos);
MQTT_API_UNLOCK(client);
return -1;
}{...}
mqtt5_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->subscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->subscribe_property_info = NULL;
}{...}
#endif/* ... */
}{...} else {
mqtt_msg_subscribe(&client->mqtt_state.connection,
topic_list, size,
&client->mqtt_state.pending_msg_id);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Subscribe message cannot be created");
MQTT_API_UNLOCK(client);
return -1;
}{...}
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
}{...}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to send subscribe message, first topic: %s, qos: %d", topic_list[0].filter, topic_list[0].qos);
MQTT_API_UNLOCK(client);
return -1;
}{...}
ESP_LOGD(TAG, "Sent subscribe, first topic=%s, id: %d", topic_list[0].filter, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}{ ... }
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos)
{
esp_mqtt_topic_t user_topic = {.filter = topic, .qos = qos};
return esp_mqtt_client_subscribe_multiple(client, &user_topic, 1);
}{ ... }
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}{...}
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}{...}
MQTT_API_LOCK(client);
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id, client->mqtt5_config->unsubscribe_property_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->unsubscribe_property_info = NULL;
}{...}
#endif/* ... */
}{...} else {
mqtt_msg_unsubscribe(&client->mqtt_state.connection,
topic,
&client->mqtt_state.pending_msg_id);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
MQTT_API_UNLOCK(client);
ESP_LOGE(TAG, "Unubscribe message cannot be created");
return -1;
}{...}
ESP_LOGD(TAG, "unsubscribe, topic\"%s\", id: %d", topic, client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
if (!mqtt_enqueue(client, NULL, 0)) {
MQTT_API_UNLOCK(client);
return -1;
}{...}
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
if (esp_mqtt_write(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK(client);
return -1;
}{...}
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK(client);
return client->mqtt_state.pending_msg_id;
}{ ... }
static int make_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain)
{
uint16_t pending_msg_id = 0;
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
#ifdef MQTT_PROTOCOL_5
mqtt5_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id, client->mqtt5_config->publish_property_info, client->mqtt5_config->server_resp_property_info.response_info);
if (client->mqtt_state.connection.outbound_message.length) {
client->mqtt5_config->publish_property_info = NULL;
}{...}
#endif/* ... */
}{...} else {
mqtt_msg_publish(&client->mqtt_state.connection,
topic, data, len,
qos, retain,
&pending_msg_id);
}{...}
if (client->mqtt_state.connection.outbound_message.length == 0) {
ESP_LOGE(TAG, "Publish message cannot be created");
return -1;
}{...}
return pending_msg_id;
}{ ... }
static inline int mqtt_client_enqueue_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data,
int len, int qos, int retain, bool store)
{
int pending_msg_id = make_publish(client, topic, data, len, qos, retain);
if (pending_msg_id < 0) {
return -1;
}{...}
if (qos > 0 || store) {
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.connection.outbound_message.data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
if (client->mqtt_state.connection.outbound_message.fragmented_msg_total_length == 0) {
if (!mqtt_enqueue(client, NULL, 0)) {
return -1;
}{...}
}{...} else {
int first_fragment = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
if (!mqtt_enqueue(client, ((uint8_t *)data) + first_fragment, len - first_fragment)) {
return -1;
}{...}
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
}{...}
}{...}
return pending_msg_id;
}{ ... }
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}{...}
#if MQTT_SKIP_PUBLISH_IF_DISCONNECTED
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGI(TAG, "Publishing skipped: client is not connected");
return -1;
}{...}
#endif/* ... */
MQTT_API_LOCK(client);
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
ESP_LOGI(TAG, "MQTT5 publish check fail");
MQTT_API_UNLOCK(client);
return -1;
}{...}
}{...}
#endif/* ... */
/* ... */
if (len <= 0 && data != NULL) {
len = strlen(data);
}{...}
if (client->config->outbox_limit > 0 && qos > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
MQTT_API_UNLOCK(client);
return -2;
}{...}
}{...}
int pending_msg_id = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, false);
if (pending_msg_id < 0) {
MQTT_API_UNLOCK(client);
return -1;
}{...}
int ret = 0;
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGD(TAG, "Publish: client is not connected");
if (qos > 0) {
ret = pending_msg_id;
}{...} else {
ret = -1;
}{...}
mqtt_delete_expired_messages(client);
goto cannot_publish;
}{...}
int remaining_len = len;
const char *current_data = data;
bool sending = true;
while (sending) {
if (esp_mqtt_write(client) != ESP_OK) {
esp_mqtt_abort_connection(client);
ret = -1;
goto cannot_publish;
}{...}
int data_sent = client->mqtt_state.connection.outbound_message.length - client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset;
client->mqtt_state.connection.outbound_message.fragmented_msg_data_offset = 0;
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t *connection = &client->mqtt_state.connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
int write_len = remaining_len > connection->buffer_length ? connection->buffer_length : remaining_len;
memcpy(connection->buffer, current_data, write_len);
connection->outbound_message.data = connection->buffer;
connection->outbound_message.length = write_len;
sending = true;
}{...} else {
sending = false;
}{...}
}{...}
if (qos > 0) {
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
esp_mqtt5_increment_packet_counter(client);
}{...}
#endif/* ... */
outbox_set_tick(client->outbox, pending_msg_id, platform_tick_get_ms());
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}{...}
MQTT_API_UNLOCK(client);
return pending_msg_id;
cannot_publish:
client->mqtt_state.connection.outbound_message.fragmented_msg_total_length = 0;
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Losing qos0 data when client not connected");
}{...}
MQTT_API_UNLOCK(client);
return ret;
}{ ... }
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)
{
if (!client) {
ESP_LOGE(TAG, "Client was not initialized");
return -1;
}{...}
/* ... */
if (len <= 0 && data != NULL) {
len = strlen(data);
}{...}
if (client->config->outbox_limit > 0) {
if (len + outbox_get_size(client->outbox) > client->config->outbox_limit) {
return -2;
}{...}
}{...}
MQTT_API_LOCK(client);
#ifdef MQTT_PROTOCOL_5
if (client->mqtt_state.connection.information.protocol_ver == MQTT_PROTOCOL_V_5) {
if (esp_mqtt5_client_publish_check(client, qos, retain) != ESP_OK) {
ESP_LOGI(TAG, "esp_mqtt_client_enqueue check fail");
MQTT_API_UNLOCK(client);
return -1;
}{...}
}{...}
#endif/* ... */
int ret = mqtt_client_enqueue_publish(client, topic, data, len, qos, retain, store);
MQTT_API_UNLOCK(client);
if (ret == 0 && store == false) {
return -1;
}{...}
return ret;
}{ ... }
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void *event_handler_arg)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}{...}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
return esp_event_handler_register_with(client->config->event_loop_handle, MQTT_EVENTS, event, event_handler, event_handler_arg);/* ... */
#else
ESP_LOGE(TAG, "Registering event handler while event loop not available in IDF version %s", IDF_VER);
return ESP_FAIL;/* ... */
#endif
}{ ... }
esp_err_t esp_mqtt_client_unregister_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler)
{
if (client == NULL) {
return ESP_ERR_INVALID_ARG;
}{...}
#ifdef MQTT_SUPPORTED_FEATURE_EVENT_LOOP
return esp_event_handler_unregister_with(client->config->event_loop_handle, MQTT_EVENTS, event, event_handler);/* ... */
#else
ESP_LOGE(TAG, "Unregistering event handler while event loop not available in IDF version %s", IDF_VER);
return ESP_FAIL;/* ... */
#endif
}{ ... }
static void esp_mqtt_client_dispatch_transport_error(esp_mqtt_client_handle_t client)
{
client->event.event_id = MQTT_EVENT_ERROR;
client->event.error_handle->error_type = MQTT_ERROR_TYPE_TCP_TRANSPORT;
client->event.error_handle->connect_return_code = 0;
#ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_ERR_REPORTING
client->event.error_handle->esp_tls_last_esp_err = esp_tls_get_and_clear_last_error(esp_transport_get_error_handle(client->transport),
&client->event.error_handle->esp_tls_stack_err,
&client->event.error_handle->esp_tls_cert_verify_flags);
#ifdef MQTT_SUPPORTED_FEATURE_TRANSPORT_SOCK_ERRNO_REPORTING
client->event.error_handle->esp_transport_sock_errno = esp_transport_get_errno(client->transport);
#endif/* ... */
#endif
esp_mqtt_dispatch_event_with_msgid(client);
}{ ... }
int esp_mqtt_client_get_outbox_size(esp_mqtt_client_handle_t client)
{
int outbox_size = 0;
if (client == NULL) {
return 0;
}{...}
MQTT_API_LOCK(client);
if (client->outbox) {
outbox_size = outbox_get_size(client->outbox);
}{...}
MQTT_API_UNLOCK(client);
return outbox_size;
}{ ... }