mqtt_client.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client.c
3  * @brief MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2025 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 2.5.4
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Initialize MQTT client context
48  * @param[in] context Pointer to the MQTT client context
49  * @return Error code
50  **/
51 
53 {
54 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
55  error_t error;
56 #endif
57 
58  //Make sure the MQTT client context is valid
59  if(context == NULL)
61 
62  //Clear MQTT client context
63  osMemset(context, 0, sizeof(MqttClientContext));
64 
65 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
66  //Initialize TLS session state
67  error = tlsInitSessionState(&context->tlsSession);
68  //Any error to report?
69  if(error)
70  return error;
71 #endif
72 
73  //Default protocol version
74  context->settings.version = MQTT_VERSION_3_1_1;
75  //Default transport protocol
76  context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
77  //Default keep-alive time interval
78  context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
79  //Default communication timeout
80  context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
81 
82 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
83  //Default resource name (for WebSocket connections only)
84  osStrcpy(context->settings.uri, "/");
85 #endif
86 
87  //Initialize MQTT client state
88  context->state = MQTT_CLIENT_STATE_DISCONNECTED;
89  //Initialize packet identifier
90  context->packetId = 0;
91 
92  //Successful initialization
93  return NO_ERROR;
94 }
95 
96 
97 /**
98  * @brief Initialize callback structure
99  * @param[in] callbacks Pointer to a structure that contains callback functions
100  **/
101 
103 {
104  //Initialize callback structure
105  osMemset(callbacks, 0, sizeof(MqttClientCallbacks));
106 }
107 
108 
109 /**
110  * @brief Register MQTT client callbacks
111  * @param[in] context Pointer to the MQTT client context
112  * @param[in] callbacks Pointer to a structure that contains callback functions
113  * @return Error code
114  **/
115 
117  const MqttClientCallbacks *callbacks)
118 {
119  //Make sure the MQTT client context is valid
120  if(context == NULL)
122 
123  //Attach callback functions
124  context->callbacks = *callbacks;
125 
126  //Successful processing
127  return NO_ERROR;
128 }
129 
130 
131 /**
132  * @brief Set the MQTT protocol version to be used
133  * @param[in] context Pointer to the MQTT client context
134  * @param[in] version MQTT protocol version (3.1 or 3.1.1)
135  * @return Error code
136  **/
137 
139 {
140  //Make sure the MQTT client context is valid
141  if(context == NULL)
143 
144  //Save the MQTT protocol version to be used
145  context->settings.version = version;
146 
147  //Successful processing
148  return NO_ERROR;
149 }
150 
151 
152 /**
153  * @brief Set the transport protocol to be used
154  * @param[in] context Pointer to the MQTT client context
155  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
156  * WebSocket, or secure WebSocket)
157  * @return Error code
158  **/
159 
161  MqttTransportProtocol transportProtocol)
162 {
163  //Make sure the MQTT client context is valid
164  if(context == NULL)
166 
167  //Save the transport protocol to be used
168  context->settings.transportProtocol = transportProtocol;
169 
170  //Successful processing
171  return NO_ERROR;
172 }
173 
174 
175 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
176 
177 /**
178  * @brief Register TLS initialization callback function
179  * @param[in] context Pointer to the MQTT- client context
180  * @param[in] callback TLS initialization callback function
181  * @return Error code
182  **/
183 
185  MqttClientTlsInitCallback callback)
186 {
187  //Check parameters
188  if(context == NULL || callback == NULL)
190 
191  //Save callback function
192  context->callbacks.tlsInitCallback = callback;
193 
194  //Successful processing
195  return NO_ERROR;
196 }
197 
198 #endif
199 
200 
201 /**
202  * @brief Register publish callback function
203  * @param[in] context Pointer to the MQTT client context
204  * @param[in] callback Callback function to be called when a PUBLISH message
205  * is received
206  * @return Error code
207  **/
208 
210  MqttClientPublishCallback callback)
211 {
212  //Make sure the MQTT client context is valid
213  if(context == NULL)
215 
216  //Save callback function
217  context->callbacks.publishCallback = callback;
218 
219  //Successful processing
220  return NO_ERROR;
221 }
222 
223 
224 /**
225  * @brief Set communication timeout
226  * @param[in] context Pointer to the MQTT client context
227  * @param[in] timeout Timeout value, in seconds
228  * @return Error code
229  **/
230 
232 {
233  //Make sure the MQTT client context is valid
234  if(context == NULL)
236 
237  //Save timeout value
238  context->settings.timeout = timeout;
239 
240  //Successful processing
241  return NO_ERROR;
242 }
243 
244 
245 /**
246  * @brief Set keep-alive value
247  * @param[in] context Pointer to the MQTT client context
248  * @param[in] keepAlive Maximum time interval that is permitted to elapse
249  * between the point at which the client finishes transmitting one control
250  * packet and the point it starts sending the next
251  * @return Error code
252  **/
253 
254 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
255 {
256  //Make sure the MQTT client context is valid
257  if(context == NULL)
259 
260  //Save keep-alive value
261  context->settings.keepAlive = keepAlive;
262 
263  //Successful processing
264  return NO_ERROR;
265 }
266 
267 
268 /**
269  * @brief Set the domain name of the server (for virtual hosting)
270  * @param[in] context Pointer to the MQTT client context
271  * @param[in] host NULL-terminated string containing the hostname
272  * @return Error code
273  **/
274 
276 {
277  //Check parameters
278  if(context == NULL || host == NULL)
280 
281  //Make sure the length of the hostname is acceptable
283  return ERROR_INVALID_LENGTH;
284 
285 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
286  //Save hostname (for WebSocket connections only)
287  osStrcpy(context->settings.host, host);
288 #endif
289 
290  //Successful processing
291  return NO_ERROR;
292 }
293 
294 
295 /**
296  * @brief Set the name of the resource being requested
297  * @param[in] context Pointer to the MQTT client context
298  * @param[in] uri NULL-terminated string that contains the resource name
299  * @return Error code
300  **/
301 
303 {
304  //Check parameters
305  if(context == NULL || uri == NULL)
307 
308  //Make sure the length of the resource name is acceptable
310  return ERROR_INVALID_LENGTH;
311 
312 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
313  //Save resource name (for WebSocket connections only)
314  osStrcpy(context->settings.uri, uri);
315 #endif
316 
317  //Successful processing
318  return NO_ERROR;
319 }
320 
321 
322 /**
323  * @brief Set client identifier
324  * @param[in] context Pointer to the MQTT client context
325  * @param[in] clientId NULL-terminated string containing the client identifier
326  * @return Error code
327  **/
328 
330  const char_t *clientId)
331 {
332  //Check parameters
333  if(context == NULL || clientId == NULL)
335 
336  //Make sure the length of the client identifier is acceptable
338  return ERROR_INVALID_LENGTH;
339 
340  //Save client identifier
341  osStrcpy(context->settings.clientId, clientId);
342 
343  //Successful processing
344  return NO_ERROR;
345 }
346 
347 
348 /**
349  * @brief Set authentication information
350  * @param[in] context Pointer to the MQTT client context
351  * @param[in] username NULL-terminated string containing the user name to be used
352  * @param[in] password NULL-terminated string containing the password to be used
353  * @return Error code
354  **/
355 
357  const char_t *username, const char_t *password)
358 {
359  //Check parameters
360  if(context == NULL || username == NULL || password == NULL)
362 
363  //Make sure the length of the user name is acceptable
364  if(osStrlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
365  return ERROR_INVALID_LENGTH;
366 
367  //Make sure the length of the password is acceptable
368  if(osStrlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
369  return ERROR_INVALID_LENGTH;
370 
371  //Save user name
372  osStrcpy(context->settings.username, username);
373  //Save password
374  osStrcpy(context->settings.password, password);
375 
376  //Successful processing
377  return NO_ERROR;
378 }
379 
380 
381 /**
382  * @brief Specify the Will message
383  * @param[in] context Pointer to the MQTT client context
384  * @param[in] topic Will topic name
385  * @param[in] message Will message
386  * @param[in] length Length of the Will message
387  * @param[in] qos QoS level to be used when publishing the Will message
388  * @param[in] retain This flag specifies if the Will message is to be retained
389  * @return Error code
390  **/
391 
393  const void *message, size_t length, MqttQosLevel qos, bool_t retain)
394 {
395  MqttClientWillMessage *willMessage;
396 
397  //Check parameters
398  if(context == NULL || topic == NULL)
400 
401  //Make sure the length of the Will topic is acceptable
403  return ERROR_INVALID_LENGTH;
404 
405  //Point to the Will message
406  willMessage = &context->settings.willMessage;
407 
408  //Save Will topic
409  osStrcpy(willMessage->topic, topic);
410 
411  //Any message payload
412  if(length > 0)
413  {
414  //Sanity check
415  if(message == NULL)
417 
418  //Make sure the length of the Will message payload is acceptable
420  return ERROR_INVALID_LENGTH;
421 
422  //Save Will message payload
423  osMemcpy(willMessage->payload, message, length);
424  }
425 
426  //Length of the Will message payload
427  willMessage->length = length;
428  //QoS level to be used when publishing the Will message
429  willMessage->qos = qos;
430  //This flag specifies if the Will message is to be retained
431  willMessage->retain = retain;
432 
433  //Successful processing
434  return NO_ERROR;
435 }
436 
437 
438 /**
439  * @brief Bind the MQTT client to a particular network interface
440  * @param[in] context Pointer to the MQTT client context
441  * @param[in] interface Network interface to be used
442  * @return Error code
443  **/
444 
446  NetInterface *interface)
447 {
448  //Make sure the MQTT client context is valid
449  if(context == NULL)
451 
452  //Explicitly associate the MQTT client with the specified interface
453  context->interface = interface;
454 
455  //Successful processing
456  return NO_ERROR;
457 }
458 
459 
460 /**
461  * @brief Establish connection with the MQTT server
462  * @param[in] context Pointer to the MQTT client context
463  * @param[in] serverIpAddr IP address of the MQTT server to connect to
464  * @param[in] serverPort TCP port number that will be used to establish the
465  * connection
466  * @param[in] cleanSession If this flag is set, then the client and server
467  * must discard any previous session and start a new one
468  * @return Error code
469  **/
470 
472  const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
473 {
474  error_t error;
475 
476  //Check parameters
477  if(context == NULL || serverIpAddr == NULL)
479 
480  //Initialize status code
481  error = NO_ERROR;
482 
483  //Establish network connection
484  while(!error)
485  {
486  //Check current state
487  if(context->state == MQTT_CLIENT_STATE_DISCONNECTED)
488  {
489  //Open network connection
490  error = mqttClientOpenConnection(context);
491 
492  //Check status code
493  if(!error)
494  {
495  //Debug message
496  TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
497  ipAddrToString(serverIpAddr, NULL), serverPort);
498 
499  //The network connection is open
501  //Save current time
502  context->startTime = osGetSystemTime();
503  }
504  }
505  else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
506  {
507  //Establish network connection
508  error = mqttClientEstablishConnection(context, serverIpAddr,
509  serverPort);
510 
511  //Check status code
512  if(!error)
513  {
514  //Debug message
515  TRACE_INFO("MQTT: Connected to server\r\n");
516 
517  //The network connection is established
519  }
520  }
521  else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
522  {
523  //Format CONNECT packet
524  error = mqttClientFormatConnect(context, cleanSession);
525 
526  //Check status code
527  if(!error)
528  {
529  //Debug message
530  TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
531  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
532 
533  //Save the type of the MQTT packet to be sent
534  context->packetType = MQTT_PACKET_TYPE_CONNECT;
535  //Point to the beginning of the packet
536  context->packetPos = 0;
537 
538  //Send CONNECT packet
540  }
541  }
542  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
543  {
544  //Send more data
545  error = mqttClientProcessEvents(context, context->settings.timeout);
546  }
547  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
548  {
549  //Wait for CONNACK packet
550  error = mqttClientProcessEvents(context, context->settings.timeout);
551  }
552  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
553  {
554  //Receive more data
555  error = mqttClientProcessEvents(context, context->settings.timeout);
556  }
557  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
558  {
559  //Reset packet type
560  context->packetType = MQTT_PACKET_TYPE_INVALID;
561  //A CONNACK packet has been received
563  }
564  else if(context->state == MQTT_CLIENT_STATE_IDLE)
565  {
566  //The MQTT client is connected
567  break;
568  }
569  else
570  {
571  //Invalid state
572  error = ERROR_NOT_CONNECTED;
573  }
574  }
575 
576  //Check status code
577  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
578  {
579  //Check whether the timeout has elapsed
580  error = mqttClientCheckTimeout(context);
581  }
582 
583  //Failed to establish connection with the MQTT server?
584  if(error != NO_ERROR && error != ERROR_WOULD_BLOCK)
585  {
586  //Clean up side effects
587  mqttClientCloseConnection(context);
588  //Update MQTT client state
590  }
591 
592  //Return status code
593  return error;
594 }
595 
596 
597 /**
598  * @brief Set packet identifier
599  * @param[in] context Pointer to the MQTT client context
600  * @param[out] packetId Packet identifier to be used
601  * @return Error code
602  **/
603 
604 error_t mqttClientSetPacketId(MqttClientContext *context, uint16_t packetId)
605 {
606  //Check parameters
607  if(context == NULL || packetId == 0)
609 
610  //Restore packet identifier
611  if(context->packetId > 1)
612  {
613  context->packetId = packetId - 1;
614  }
615  else
616  {
617  context->packetId = UINT16_MAX;
618  }
619 
620  //Successful processing
621  return NO_ERROR;
622 }
623 
624 
625 /**
626  * @brief Publish message
627  * @param[in] context Pointer to the MQTT client context
628  * @param[in] topic Topic name
629  * @param[in] message Message payload
630  * @param[in] length Length of the message payload
631  * @param[in] qos QoS level to be used when publishing the message
632  * @param[in] retain This flag specifies if the message is to be retained
633  * @param[out] packetId Packet identifier used to send the PUBLISH packet
634  * @return Error code
635  **/
636 
638  const void *message, size_t length, MqttQosLevel qos, bool_t retain,
639  uint16_t *packetId)
640 {
641  //Publish message
642  return mqttClientPublishEx(context, topic, message, length, FALSE,
643  qos, retain, packetId);
644 }
645 
646 
647 /**
648  * @brief Publish message
649  * @param[in] context Pointer to the MQTT client context
650  * @param[in] topic Topic name
651  * @param[in] message Message payload
652  * @param[in] length Length of the message payload
653  * @param[in] dup DUP flag
654  * @param[in] qos QoS level to be used when publishing the message
655  * @param[in] retain This flag specifies if the message is to be retained
656  * @param[out] packetId Packet identifier used to send the PUBLISH packet
657  * @return Error code
658  **/
659 
661  const void *message, size_t length, bool_t dup, MqttQosLevel qos,
662  bool_t retain, uint16_t *packetId)
663 {
664  error_t error;
665 
666  //Check parameters
667  if(context == NULL || topic == NULL)
669 
670  if(message == NULL && length != 0)
672 
673  //Initialize status code
674  error = NO_ERROR;
675 
676  //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
677  while(!error)
678  {
679  //Check current state
680  if(context->state == MQTT_CLIENT_STATE_IDLE)
681  {
682  //Check for transmission completion
683  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
684  {
685  //Format PUBLISH packet
686  error = mqttClientFormatPublish(context, topic, message, length,
687  dup, qos, retain);
688 
689  //Check status code
690  if(!error)
691  {
692  //Save the packet identifier used to send the PUBLISH packet
693  if(packetId != NULL)
694  {
695  *packetId = context->packetId;
696  }
697 
698  //Debug message
699  TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n",
700  context->packetLen);
701 
702  //Dump the contents of the PUBLISH packet
703  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
704 
705  //Save the type of the MQTT packet to be sent
706  context->packetType = MQTT_PACKET_TYPE_PUBLISH;
707  //Point to the beginning of the packet
708  context->packetPos = 0;
709 
710  //Send PUBLISH packet
712  //Save the time at which the packet was sent
713  context->startTime = osGetSystemTime();
714  }
715  }
716  else
717  {
718  //Reset packet type
719  context->packetType = MQTT_PACKET_TYPE_INVALID;
720  //We are done
721  break;
722  }
723  }
724  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
725  {
726  //Send more data
727  error = mqttClientProcessEvents(context, context->settings.timeout);
728  }
729  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
730  {
731  //The last parameter is optional
732  if(packetId != NULL)
733  {
734  //Do not wait for PUBACK/PUBCOMP packet
736  }
737  else
738  {
739  //Check QoS level
740  if(qos == MQTT_QOS_LEVEL_0)
741  {
742  //No response is sent by the receiver and no retry is performed by the sender
744  }
745  else
746  {
747  //Wait for PUBACK/PUBCOMP packet
748  error = mqttClientProcessEvents(context, context->settings.timeout);
749  }
750  }
751  }
752  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
753  {
754  //Receive more data
755  error = mqttClientProcessEvents(context, context->settings.timeout);
756  }
757  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
758  {
759  //A PUBACK/PUBCOMP packet has been received
761  }
762  else
763  {
764  //Invalid state
765  error = ERROR_NOT_CONNECTED;
766  }
767  }
768 
769  //Check status code
770  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
771  {
772  //Check whether the timeout has elapsed
773  error = mqttClientCheckTimeout(context);
774  }
775 
776  //Return status code
777  return error;
778 }
779 
780 
781 /**
782  * @brief Subscribe to topic
783  * @param[in] context Pointer to the MQTT client context
784  * @param[in] topic Topic filter
785  * @param[in] qos Maximum QoS level at which the server can send application
786  * messages to the client
787  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
788  * @return Error code
789  **/
790 
792  MqttQosLevel qos, uint16_t *packetId)
793 {
794  error_t error;
795 
796  //Check parameters
797  if(context == NULL || topic == NULL)
799 
800  //Initialize status code
801  error = NO_ERROR;
802 
803  //Send SUBSCRIBE packet and wait for SUBACK packet to be received
804  while(!error)
805  {
806  //Check current state
807  if(context->state == MQTT_CLIENT_STATE_IDLE)
808  {
809  //Check for transmission completion
810  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
811  {
812  //Format SUBSCRIBE packet
813  error = mqttClientFormatSubscribe(context, topic, qos);
814 
815  //Check status code
816  if(!error)
817  {
818  //Save the packet identifier used to send the SUBSCRIBE packet
819  if(packetId != NULL)
820  {
821  *packetId = context->packetId;
822  }
823 
824  //Debug message
825  TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
826  context->packetLen);
827 
828  //Dump the contents of the SUBSCRIBE packet
829  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
830 
831  //Save the type of the MQTT packet to be sent
832  context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
833  //Point to the beginning of the packet
834  context->packetPos = 0;
835 
836  //Send SUBSCRIBE packet
838  //Save the time at which the packet was sent
839  context->startTime = osGetSystemTime();
840  }
841  }
842  else
843  {
844  //Reset packet type
845  context->packetType = MQTT_PACKET_TYPE_INVALID;
846  //We are done
847  break;
848  }
849  }
850  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
851  {
852  //Send more data
853  error = mqttClientProcessEvents(context, context->settings.timeout);
854  }
855  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
856  {
857  //The last parameter is optional
858  if(packetId != NULL)
859  {
860  //Do not wait for SUBACK packet
862  }
863  else
864  {
865  //Wait for SUBACK packet
866  error = mqttClientProcessEvents(context, context->settings.timeout);
867  }
868  }
869  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
870  {
871  //Receive more data
872  error = mqttClientProcessEvents(context, context->settings.timeout);
873  }
874  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
875  {
876  //A SUBACK packet has been received
878  }
879  else
880  {
881  //Invalid state
882  error = ERROR_NOT_CONNECTED;
883  }
884  }
885 
886  //Check status code
887  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
888  {
889  //Check whether the timeout has elapsed
890  error = mqttClientCheckTimeout(context);
891  }
892 
893  //Return status code
894  return error;
895 }
896 
897 
898 /**
899  * @brief Unsubscribe from topic
900  * @param[in] context Pointer to the MQTT client context
901  * @param[in] topic Topic filter
902  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
903  * @return Error code
904  **/
905 
907  uint16_t *packetId)
908 {
909  error_t error;
910 
911  //Check parameters
912  if(context == NULL || topic == NULL)
914 
915  //Initialize status code
916  error = NO_ERROR;
917 
918  //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
919  while(!error)
920  {
921  //Check current state
922  if(context->state == MQTT_CLIENT_STATE_IDLE)
923  {
924  //Check for transmission completion
925  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
926  {
927  //Format UNSUBSCRIBE packet
928  error = mqttClientFormatUnsubscribe(context, topic);
929 
930  //Check status code
931  if(!error)
932  {
933  //Save the packet identifier used to send the UNSUBSCRIBE packet
934  if(packetId != NULL)
935  {
936  *packetId = context->packetId;
937  }
938 
939  //Debug message
940  TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
941  context->packetLen);
942 
943  //Dump the contents of the UNSUBSCRIBE packet
944  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
945 
946  //Save the type of the MQTT packet to be sent
947  context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
948  //Point to the beginning of the packet
949  context->packetPos = 0;
950 
951  //Send UNSUBSCRIBE packet
953  //Save the time at which the packet was sent
954  context->startTime = osGetSystemTime();
955  }
956  }
957  else
958  {
959  //Reset packet type
960  context->packetType = MQTT_PACKET_TYPE_INVALID;
961  //We are done
962  break;
963  }
964  }
965  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
966  {
967  //Send more data
968  error = mqttClientProcessEvents(context, context->settings.timeout);
969  }
970  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
971  {
972  //The last parameter is optional
973  if(packetId != NULL)
974  {
975  //Do not wait for UNSUBACK packet
977  }
978  else
979  {
980  //Wait for UNSUBACK packet
981  error = mqttClientProcessEvents(context, context->settings.timeout);
982  }
983  }
984  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
985  {
986  //Receive more data
987  error = mqttClientProcessEvents(context, context->settings.timeout);
988  }
989  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
990  {
991  //An UNSUBACK packet has been received
993  }
994  else
995  {
996  //Invalid state
997  error = ERROR_NOT_CONNECTED;
998  }
999  }
1000 
1001  //Check status code
1002  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1003  {
1004  //Check whether the timeout has elapsed
1005  error = mqttClientCheckTimeout(context);
1006  }
1007 
1008  //Return status code
1009  return error;
1010 }
1011 
1012 
1013 /**
1014  * @brief Send ping request
1015  * @param[in] context Pointer to the MQTT client context
1016  * @param[out] rtt Round-trip time (optional parameter)
1017  * @return Error code
1018  **/
1019 
1021 {
1022  error_t error;
1023 
1024  //Make sure the MQTT client context is valid
1025  if(context == NULL)
1026  return ERROR_INVALID_PARAMETER;
1027 
1028  //Initialize status code
1029  error = NO_ERROR;
1030 
1031  //Send PINGREQ packet and wait for PINGRESP packet to be received
1032  while(!error)
1033  {
1034  //Check current state
1035  if(context->state == MQTT_CLIENT_STATE_IDLE)
1036  {
1037  //Check for transmission completion
1038  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
1039  {
1040  //Format PINGREQ packet
1041  error = mqttClientFormatPingReq(context);
1042 
1043  //Check status code
1044  if(!error)
1045  {
1046  //Debug message
1047  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n",
1048  context->packetLen);
1049 
1050  //Dump the contents of the PINGREQ packet
1051  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1052 
1053  //Save the type of the MQTT packet to be sent
1054  context->packetType = MQTT_PACKET_TYPE_PINGREQ;
1055  //Point to the beginning of the packet
1056  context->packetPos = 0;
1057 
1058  //Send PINGREQ packet
1060  //Save the time at which the packet was sent
1061  context->startTime = osGetSystemTime();
1062  }
1063  }
1064  else
1065  {
1066  //Reset packet type
1067  context->packetType = MQTT_PACKET_TYPE_INVALID;
1068  //We are done
1069  break;
1070  }
1071  }
1072  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1073  {
1074  //Send more data
1075  error = mqttClientProcessEvents(context, context->settings.timeout);
1076  }
1077  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1078  {
1079  //The last parameter is optional
1080  if(rtt != NULL)
1081  {
1082  //Wait for PINGRESP packet
1083  error = mqttClientProcessEvents(context, context->settings.timeout);
1084  }
1085  else
1086  {
1087  //Do not wait for PINGRESP packet
1089  }
1090  }
1091  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1092  {
1093  //Receive more data
1094  error = mqttClientProcessEvents(context, context->settings.timeout);
1095  }
1096  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
1097  {
1098  //The last parameter is optional
1099  if(rtt != NULL)
1100  {
1101  //Compute round-trip time
1102  *rtt = osGetSystemTime() - context->startTime;
1103  }
1104 
1105  //A PINGRESP packet has been received
1107  }
1108  else
1109  {
1110  //Invalid state
1111  error = ERROR_NOT_CONNECTED;
1112  }
1113  }
1114 
1115  //Check status code
1116  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1117  {
1118  //Check whether the timeout has elapsed
1119  error = mqttClientCheckTimeout(context);
1120  }
1121 
1122  //Return status code
1123  return error;
1124 }
1125 
1126 
1127 /**
1128  * @brief Process MQTT client events
1129  * @param[in] context Pointer to the MQTT client context
1130  * @param[in] timeout Maximum time to wait before returning
1131  * @return Error code
1132  **/
1133 
1135 {
1136  error_t error;
1137 
1138  //Make sure the MQTT client context is valid
1139  if(context == NULL)
1140  return ERROR_INVALID_PARAMETER;
1141 
1142  //Process MQTT client events
1143  error = mqttClientProcessEvents(context, timeout);
1144 
1145  //Check status code
1146  if(error == ERROR_TIMEOUT)
1147  {
1148  //Catch exception
1149  error = NO_ERROR;
1150  }
1151 
1152  //Return status code
1153  return error;
1154 }
1155 
1156 
1157 /**
1158  * @brief Gracefully disconnect from the MQTT server
1159  * @param[in] context Pointer to the MQTT client context
1160  * @return Error code
1161  **/
1162 
1164 {
1165  error_t error;
1166 
1167  //Make sure the MQTT client context is valid
1168  if(context == NULL)
1169  return ERROR_INVALID_PARAMETER;
1170 
1171  //Initialize status code
1172  error = NO_ERROR;
1173 
1174  //Send DISCONNECT packet and shutdown network connection
1175  while(!error)
1176  {
1177  //Check current state
1178  if(context->state == MQTT_CLIENT_STATE_IDLE)
1179  {
1180  //Format DISCONNECT packet
1181  error = mqttClientFormatDisconnect(context);
1182 
1183  //Check status code
1184  if(!error)
1185  {
1186  //Debug message
1187  TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
1188  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1189 
1190  //Save the type of the MQTT packet to be sent
1191  context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
1192  //Point to the beginning of the packet
1193  context->packetPos = 0;
1194 
1195  //Send DISCONNECT packet
1197  //Save the time at which the packet was sent
1198  context->startTime = osGetSystemTime();
1199  }
1200  }
1201  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1202  {
1203  //Send more data
1204  error = mqttClientProcessEvents(context, context->settings.timeout);
1205  }
1206  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1207  {
1208  //Debug message
1209  TRACE_INFO("MQTT: Shutting down connection...\r\n");
1210 
1211  //After sending a DISCONNECT packet the client must not send any
1212  //more control packets on that network connection
1214  }
1215  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
1216  {
1217  //Properly dispose the network connection
1218  error = mqttClientShutdownConnection(context);
1219 
1220  //Check status code
1221  if(!error)
1222  {
1223  //The MQTT client is disconnected
1225  }
1226  }
1227  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTED)
1228  {
1229  //The MQTT client is disconnected
1230  break;
1231  }
1232  else
1233  {
1234  //Invalid state
1235  error = ERROR_NOT_CONNECTED;
1236  }
1237  }
1238 
1239  //Check status code
1240  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1241  {
1242  //Check whether the timeout has elapsed
1243  error = mqttClientCheckTimeout(context);
1244  }
1245 
1246  //Failed to gracefully disconnect from the MQTT server?
1247  if(error != NO_ERROR && error != ERROR_WOULD_BLOCK)
1248  {
1249  //Close connection
1250  mqttClientCloseConnection(context);
1251  //Update MQTT client state
1253  }
1254 
1255  //Return status code
1256  return error;
1257 }
1258 
1259 
1260 /**
1261  * @brief Close the connection with the MQTT server
1262  * @param[in] context Pointer to the MQTT client context
1263  * @return Error code
1264  **/
1265 
1267 {
1268  //Make sure the MQTT client context is valid
1269  if(context == NULL)
1270  return ERROR_INVALID_PARAMETER;
1271 
1272  //Close connection
1273  mqttClientCloseConnection(context);
1274  //Update MQTT client state
1276 
1277  //Successful processing
1278  return NO_ERROR;
1279 }
1280 
1281 
1282 /**
1283  * @brief Release MQTT client context
1284  * @param[in] context Pointer to the MQTT client context
1285  **/
1286 
1288 {
1289  //Make sure the MQTT client context is valid
1290  if(context != NULL)
1291  {
1292  //Close connection
1293  mqttClientCloseConnection(context);
1294 
1295 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
1296  //Release TLS session state
1297  tlsFreeSessionState(&context->tlsSession);
1298 #endif
1299 
1300  //Clear MQTT client context
1301  osMemset(context, 0, sizeof(MqttClientContext));
1302  }
1303 }
1304 
1305 #endif
void mqttClientCloseConnection(MqttClientContext *context)
Close network connection.
error_t mqttClientEstablishConnection(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort)
Establish network connection.
int bool_t
Definition: compiler_port.h:61
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
@ MQTT_PACKET_TYPE_DISCONNECT
Client is disconnecting.
Definition: mqtt_common.h:114
error_t(* MqttClientTlsInitCallback)(MqttClientContext *context, TlsContext *tlsContext)
TLS initialization callback.
Definition: mqtt_client.h:253
@ ERROR_WOULD_BLOCK
Definition: error.h:96
error_t mqttClientSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Subscribe to topic.
Definition: mqtt_client.c:791
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
IP network address.
Definition: ip.h:90
MqttTransportProtocol
Transport protocol.
Definition: mqtt_common.h:74
error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Send ping request.
Definition: mqtt_client.c:1020
uint8_t message[]
Definition: chap.h:154
@ MQTT_PACKET_TYPE_PUBLISH
Publish message.
Definition: mqtt_common.h:103
error_t mqttClientFormatPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain)
Format PUBLISH packet.
error_t mqttClientOpenConnection(MqttClientContext *context)
Open network connection.
char_t * ipAddrToString(const IpAddr *ipAddr, char_t *str)
Convert a binary IP address to a string representation.
Definition: ip.c:804
@ MQTT_CLIENT_STATE_DISCONNECTED
Definition: mqtt_client.h:161
error_t mqttClientConnect(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Establish connection with the MQTT server.
Definition: mqtt_client.c:471
uint8_t qos
Definition: mqtt_common.h:181
#define osStrlen(s)
Definition: os_port.h:168
uint8_t version
Definition: coap_common.h:177
uint8_t payload[MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN]
Will message payload.
Definition: mqtt_client.h:266
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:165
void tlsFreeSessionState(TlsSessionState *session)
Properly dispose a session state.
Definition: tls.c:3044
MQTT client callback functions.
Definition: mqtt_client.h:278
error_t mqttClientInit(MqttClientContext *context)
Initialize MQTT client context.
Definition: mqtt_client.c:52
error_t mqttClientFormatConnect(MqttClientContext *context, bool_t cleanSession)
Format CONNECT packet.
@ MQTT_CLIENT_STATE_PACKET_RECEIVED
Definition: mqtt_client.h:169
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
error_t mqttClientPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:637
#define FALSE
Definition: os_port.h:46
error_t mqttClientDisconnect(MqttClientContext *context)
Gracefully disconnect from the MQTT server.
Definition: mqtt_client.c:1163
@ ERROR_INVALID_PARAMETER
Invalid parameter.
Definition: error.h:47
#define MQTT_CLIENT_MAX_USERNAME_LEN
Definition: mqtt_client.h:96
#define osMemcpy(dest, src, length)
Definition: os_port.h:144
Helper functions for MQTT client.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
error_t
Error codes.
Definition: error.h:43
Transport protocol abstraction layer.
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:166
@ MQTT_QOS_LEVEL_0
At most once delivery.
Definition: mqtt_common.h:88
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:87
@ MQTT_PACKET_TYPE_PINGREQ
Ping request.
Definition: mqtt_common.h:112
#define MQTT_CLIENT_MAX_WILL_TOPIC_LEN
Definition: mqtt_client.h:110
@ MQTT_TRANSPORT_PROTOCOL_TCP
Definition: mqtt_common.h:75
error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Set the name of the resource being requested.
Definition: mqtt_client.c:302
#define NetInterface
Definition: net.h:36
error_t mqttClientSetTransportProtocol(MqttClientContext *context, MqttTransportProtocol transportProtocol)
Set the transport protocol to be used.
Definition: mqtt_client.c:160
@ ERROR_INVALID_LENGTH
Definition: error.h:111
error_t mqttClientFormatUnsubscribe(MqttClientContext *context, const char_t *topic)
Format UNSUBSCRIBE packet.
error_t mqttClientClose(MqttClientContext *context)
Close the connection with the MQTT server.
Definition: mqtt_client.c:1266
error_t mqttClientBindToInterface(MqttClientContext *context, NetInterface *interface)
Bind the MQTT client to a particular network interface.
Definition: mqtt_client.c:445
@ MQTT_PACKET_TYPE_CONNECT
Client request to connect to server.
Definition: mqtt_common.h:101
@ MQTT_PACKET_TYPE_SUBSCRIBE
Client subscribe request.
Definition: mqtt_common.h:108
MQTT packet parsing and formatting.
void mqttClientDeinit(MqttClientContext *context)
Release MQTT client context.
Definition: mqtt_client.c:1287
#define MQTT_CLIENT_MAX_URI_LEN
Definition: mqtt_client.h:82
#define TRACE_INFO(...)
Definition: debug.h:105
uint8_t length
Definition: tcp.h:375
MqttVersion
MQTT protocol level.
Definition: mqtt_common.h:63
error_t mqttClientRegisterCallbacks(MqttClientContext *context, const MqttClientCallbacks *callbacks)
Register MQTT client callbacks.
Definition: mqtt_client.c:116
bool_t retain
Specifies if the Will message is to be retained.
Definition: mqtt_client.h:269
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:168
#define MQTT_CLIENT_DEFAULT_KEEP_ALIVE
Definition: mqtt_client.h:61
error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Set keep-alive value.
Definition: mqtt_client.c:254
@ MQTT_PACKET_TYPE_UNSUBSCRIBE
Unsubscribe request.
Definition: mqtt_common.h:110
error_t mqttClientFormatDisconnect(MqttClientContext *context)
Format DISCONNECT packet.
error_t mqttClientShutdownConnection(MqttClientContext *context)
Shutdown network connection.
size_t length
Length of the Will message payload.
Definition: mqtt_client.h:267
uint32_t systime_t
System time.
error_t mqttClientSetVersion(MqttClientContext *context, MqttVersion version)
Set the MQTT protocol version to be used.
Definition: mqtt_client.c:138
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:164
@ ERROR_TIMEOUT
Definition: error.h:95
char char_t
Definition: compiler_port.h:55
error_t mqttClientTask(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
Definition: mqtt_client.c:1134
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:120
error_t mqttClientSetTimeout(MqttClientContext *context, systime_t timeout)
Set communication timeout.
Definition: mqtt_client.c:231
@ ERROR_NOT_CONNECTED
Definition: error.h:80
@ MQTT_CLIENT_STATE_CONNECTING
Definition: mqtt_client.h:162
#define MQTT_CLIENT_MAX_ID_LEN
Definition: mqtt_client.h:89
error_t mqttClientCheckTimeout(MqttClientContext *context)
Determine whether a timeout error has occurred.
void(* MqttClientPublishCallback)(MqttClientContext *context, const char_t *topic, const uint8_t *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain, uint16_t packetId)
PUBLISH message received callback.
Definition: mqtt_client.h:186
error_t mqttClientSetPacketId(MqttClientContext *context, uint16_t packetId)
Set packet identifier.
Definition: mqtt_client.c:604
error_t mqttClientFormatSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos)
Format SUBSCRIBE packet.
uint8_t dup
Definition: mqtt_common.h:182
@ MQTT_CLIENT_STATE_DISCONNECTING
Definition: mqtt_client.h:170
@ MQTT_CLIENT_STATE_CONNECTED
Definition: mqtt_client.h:163
error_t mqttClientPublishEx(MqttClientContext *context, const char_t *topic, const void *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:660
@ MQTT_VERSION_3_1_1
MQTT version 3.1.1.
Definition: mqtt_common.h:65
#define MQTT_CLIENT_MAX_PASSWORD_LEN
Definition: mqtt_client.h:103
#define MqttClientContext
Definition: mqtt_client.h:147
MqttQosLevel qos
QoS level to be used when publishing the Will message.
Definition: mqtt_client.h:268
char_t clientId[]
void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Initialize callback structure.
Definition: mqtt_client.c:102
error_t mqttClientRegisterPublishCallback(MqttClientContext *context, MqttClientPublishCallback callback)
Register publish callback function.
Definition: mqtt_client.c:209
#define MQTT_CLIENT_DEFAULT_TIMEOUT
Definition: mqtt_client.h:68
#define MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN
Definition: mqtt_client.h:117
error_t mqttClientUnsubscribe(MqttClientContext *context, const char_t *topic, uint16_t *packetId)
Unsubscribe from topic.
Definition: mqtt_client.c:906
#define PRIuSIZE
#define osMemset(p, value, length)
Definition: os_port.h:138
TCP/IP stack core.
#define MQTT_CLIENT_MAX_HOST_LEN
Definition: mqtt_client.h:75
error_t tlsInitSessionState(TlsSessionState *session)
Initialize session state.
Definition: tls.c:2901
#define osStrcpy(s1, s2)
Definition: os_port.h:210
error_t mqttClientSetAuthInfo(MqttClientContext *context, const char_t *username, const char_t *password)
Set authentication information.
Definition: mqtt_client.c:356
error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Set the domain name of the server (for virtual hosting)
Definition: mqtt_client.c:275
error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Specify the Will message.
Definition: mqtt_client.c:392
char_t topic[MQTT_CLIENT_MAX_WILL_TOPIC_LEN+1]
Will topic name.
Definition: mqtt_client.h:265
@ NO_ERROR
Success.
Definition: error.h:44
Debugging facilities.
error_t mqttClientRegisterTlsInitCallback(MqttClientContext *context, MqttClientTlsInitCallback callback)
Register TLS initialization callback function.
Definition: mqtt_client.c:184
systime_t osGetSystemTime(void)
Retrieve system time.
MQTT client.
error_t mqttClientSetIdentifier(MqttClientContext *context, const char_t *clientId)
Set client identifier.
Definition: mqtt_client.c:329