mqtt_client.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client.c
3  * @brief MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2024 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 2.4.0
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Initialize MQTT client context
48  * @param[in] context Pointer to the MQTT client context
49  * @return Error code
50  **/
51 
53 {
54 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
55  error_t error;
56 #endif
57 
58  //Make sure the MQTT client context is valid
59  if(context == NULL)
61 
62  //Clear MQTT client context
63  osMemset(context, 0, sizeof(MqttClientContext));
64 
65 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
66  //Initialize TLS session state
67  error = tlsInitSessionState(&context->tlsSession);
68  //Any error to report?
69  if(error)
70  return error;
71 #endif
72 
73  //Default protocol version
74  context->settings.version = MQTT_VERSION_3_1_1;
75  //Default transport protocol
76  context->settings.transportProtocol = MQTT_TRANSPORT_PROTOCOL_TCP;
77  //Default keep-alive time interval
78  context->settings.keepAlive = MQTT_CLIENT_DEFAULT_KEEP_ALIVE;
79  //Default communication timeout
80  context->settings.timeout = MQTT_CLIENT_DEFAULT_TIMEOUT;
81 
82 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
83  //Default resource name (for WebSocket connections only)
84  osStrcpy(context->settings.uri, "/");
85 #endif
86 
87  //Initialize MQTT client state
88  context->state = MQTT_CLIENT_STATE_DISCONNECTED;
89  //Initialize packet identifier
90  context->packetId = 0;
91 
92  //Successful initialization
93  return NO_ERROR;
94 }
95 
96 
97 /**
98  * @brief Initialize callback structure
99  * @param[in] callbacks Pointer to a structure that contains callback functions
100  **/
101 
103 {
104  //Initialize callback structure
105  osMemset(callbacks, 0, sizeof(MqttClientCallbacks));
106 }
107 
108 
109 /**
110  * @brief Register MQTT client callbacks
111  * @param[in] context Pointer to the MQTT client context
112  * @param[in] callbacks Pointer to a structure that contains callback functions
113  * @return Error code
114  **/
115 
117  const MqttClientCallbacks *callbacks)
118 {
119  //Make sure the MQTT client context is valid
120  if(context == NULL)
122 
123  //Attach callback functions
124  context->callbacks = *callbacks;
125 
126  //Successful processing
127  return NO_ERROR;
128 }
129 
130 
131 /**
132  * @brief Set the MQTT protocol version to be used
133  * @param[in] context Pointer to the MQTT client context
134  * @param[in] version MQTT protocol version (3.1 or 3.1.1)
135  * @return Error code
136  **/
137 
139 {
140  //Make sure the MQTT client context is valid
141  if(context == NULL)
143 
144  //Save the MQTT protocol version to be used
145  context->settings.version = version;
146 
147  //Successful processing
148  return NO_ERROR;
149 }
150 
151 
152 /**
153  * @brief Set the transport protocol to be used
154  * @param[in] context Pointer to the MQTT client context
155  * @param[in] transportProtocol Transport protocol to be used (TCP, TLS,
156  * WebSocket, or secure WebSocket)
157  * @return Error code
158  **/
159 
161  MqttTransportProtocol transportProtocol)
162 {
163  //Make sure the MQTT client context is valid
164  if(context == NULL)
166 
167  //Save the transport protocol to be used
168  context->settings.transportProtocol = transportProtocol;
169 
170  //Successful processing
171  return NO_ERROR;
172 }
173 
174 
175 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
176 
177 /**
178  * @brief Register TLS initialization callback function
179  * @param[in] context Pointer to the MQTT- client context
180  * @param[in] callback TLS initialization callback function
181  * @return Error code
182  **/
183 
185  MqttClientTlsInitCallback callback)
186 {
187  //Check parameters
188  if(context == NULL || callback == NULL)
190 
191  //Save callback function
192  context->callbacks.tlsInitCallback = callback;
193 
194  //Successful processing
195  return NO_ERROR;
196 }
197 
198 #endif
199 
200 
201 /**
202  * @brief Register publish callback function
203  * @param[in] context Pointer to the MQTT client context
204  * @param[in] callback Callback function to be called when a PUBLISH message
205  * is received
206  * @return Error code
207  **/
208 
210  MqttClientPublishCallback callback)
211 {
212  //Make sure the MQTT client context is valid
213  if(context == NULL)
215 
216  //Save callback function
217  context->callbacks.publishCallback = callback;
218 
219  //Successful processing
220  return NO_ERROR;
221 }
222 
223 
224 /**
225  * @brief Set communication timeout
226  * @param[in] context Pointer to the MQTT client context
227  * @param[in] timeout Timeout value, in seconds
228  * @return Error code
229  **/
230 
232 {
233  //Make sure the MQTT client context is valid
234  if(context == NULL)
236 
237  //Save timeout value
238  context->settings.timeout = timeout;
239 
240  //Successful processing
241  return NO_ERROR;
242 }
243 
244 
245 /**
246  * @brief Set keep-alive value
247  * @param[in] context Pointer to the MQTT client context
248  * @param[in] keepAlive Maximum time interval that is permitted to elapse
249  * between the point at which the client finishes transmitting one control
250  * packet and the point it starts sending the next
251  * @return Error code
252  **/
253 
254 error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
255 {
256  //Make sure the MQTT client context is valid
257  if(context == NULL)
259 
260  //Save keep-alive value
261  context->settings.keepAlive = keepAlive;
262 
263  //Successful processing
264  return NO_ERROR;
265 }
266 
267 
268 /**
269  * @brief Set the domain name of the server (for virtual hosting)
270  * @param[in] context Pointer to the MQTT client context
271  * @param[in] host NULL-terminated string containing the hostname
272  * @return Error code
273  **/
274 
276 {
277  //Check parameters
278  if(context == NULL || host == NULL)
280 
281  //Make sure the length of the hostname is acceptable
283  return ERROR_INVALID_LENGTH;
284 
285 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
286  //Save hostname (for WebSocket connections only)
287  osStrcpy(context->settings.host, host);
288 #endif
289 
290  //Successful processing
291  return NO_ERROR;
292 }
293 
294 
295 /**
296  * @brief Set the name of the resource being requested
297  * @param[in] context Pointer to the MQTT client context
298  * @param[in] uri NULL-terminated string that contains the resource name
299  * @return Error code
300  **/
301 
303 {
304  //Check parameters
305  if(context == NULL || uri == NULL)
307 
308  //Make sure the length of the resource name is acceptable
310  return ERROR_INVALID_LENGTH;
311 
312 #if (MQTT_CLIENT_WS_SUPPORT == ENABLED)
313  //Save resource name (for WebSocket connections only)
314  osStrcpy(context->settings.uri, uri);
315 #endif
316 
317  //Successful processing
318  return NO_ERROR;
319 }
320 
321 
322 /**
323  * @brief Set client identifier
324  * @param[in] context Pointer to the MQTT client context
325  * @param[in] clientId NULL-terminated string containing the client identifier
326  * @return Error code
327  **/
328 
330  const char_t *clientId)
331 {
332  //Check parameters
333  if(context == NULL || clientId == NULL)
335 
336  //Make sure the length of the client identifier is acceptable
338  return ERROR_INVALID_LENGTH;
339 
340  //Save client identifier
341  osStrcpy(context->settings.clientId, clientId);
342 
343  //Successful processing
344  return NO_ERROR;
345 }
346 
347 
348 /**
349  * @brief Set authentication information
350  * @param[in] context Pointer to the MQTT client context
351  * @param[in] username NULL-terminated string containing the user name to be used
352  * @param[in] password NULL-terminated string containing the password to be used
353  * @return Error code
354  **/
355 
357  const char_t *username, const char_t *password)
358 {
359  //Check parameters
360  if(context == NULL || username == NULL || password == NULL)
362 
363  //Make sure the length of the user name is acceptable
364  if(osStrlen(username) > MQTT_CLIENT_MAX_USERNAME_LEN)
365  return ERROR_INVALID_LENGTH;
366 
367  //Make sure the length of the password is acceptable
368  if(osStrlen(password) > MQTT_CLIENT_MAX_PASSWORD_LEN)
369  return ERROR_INVALID_LENGTH;
370 
371  //Save user name
372  osStrcpy(context->settings.username, username);
373  //Save password
374  osStrcpy(context->settings.password, password);
375 
376  //Successful processing
377  return NO_ERROR;
378 }
379 
380 
381 /**
382  * @brief Specify the Will message
383  * @param[in] context Pointer to the MQTT client context
384  * @param[in] topic Will topic name
385  * @param[in] message Will message
386  * @param[in] length Length of the Will message
387  * @param[in] qos QoS level to be used when publishing the Will message
388  * @param[in] retain This flag specifies if the Will message is to be retained
389  * @return Error code
390  **/
391 
393  const void *message, size_t length, MqttQosLevel qos, bool_t retain)
394 {
395  MqttClientWillMessage *willMessage;
396 
397  //Check parameters
398  if(context == NULL || topic == NULL)
400 
401  //Make sure the length of the Will topic is acceptable
403  return ERROR_INVALID_LENGTH;
404 
405  //Point to the Will message
406  willMessage = &context->settings.willMessage;
407 
408  //Save Will topic
409  osStrcpy(willMessage->topic, topic);
410 
411  //Any message payload
412  if(length > 0)
413  {
414  //Sanity check
415  if(message == NULL)
417 
418  //Make sure the length of the Will message payload is acceptable
420  return ERROR_INVALID_LENGTH;
421 
422  //Save Will message payload
423  osMemcpy(willMessage->payload, message, length);
424  }
425 
426  //Length of the Will message payload
427  willMessage->length = length;
428  //QoS level to be used when publishing the Will message
429  willMessage->qos = qos;
430  //This flag specifies if the Will message is to be retained
431  willMessage->retain = retain;
432 
433  //Successful processing
434  return NO_ERROR;
435 }
436 
437 
438 /**
439  * @brief Bind the MQTT client to a particular network interface
440  * @param[in] context Pointer to the MQTT client context
441  * @param[in] interface Network interface to be used
442  * @return Error code
443  **/
444 
446  NetInterface *interface)
447 {
448  //Make sure the MQTT client context is valid
449  if(context == NULL)
451 
452  //Explicitly associate the MQTT client with the specified interface
453  context->interface = interface;
454 
455  //Successful processing
456  return NO_ERROR;
457 }
458 
459 
460 /**
461  * @brief Establish connection with the MQTT server
462  * @param[in] context Pointer to the MQTT client context
463  * @param[in] serverIpAddr IP address of the MQTT server to connect to
464  * @param[in] serverPort TCP port number that will be used to establish the
465  * connection
466  * @param[in] cleanSession If this flag is set, then the client and server
467  * must discard any previous session and start a new one
468  * @return Error code
469  **/
470 
472  const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
473 {
474  error_t error;
475 
476  //Check parameters
477  if(context == NULL || serverIpAddr == NULL)
479 
480  //Initialize status code
481  error = NO_ERROR;
482 
483  //Establish network connection
484  while(!error)
485  {
486  //Check current state
487  if(context->state == MQTT_CLIENT_STATE_DISCONNECTED)
488  {
489  //Open network connection
490  error = mqttClientOpenConnection(context);
491 
492  //Check status code
493  if(!error)
494  {
495  //Debug message
496  TRACE_INFO("MQTT: Connecting to server %s port %" PRIu16 "...\r\n",
497  ipAddrToString(serverIpAddr, NULL), serverPort);
498 
499  //The network connection is open
501  //Save current time
502  context->startTime = osGetSystemTime();
503  }
504  }
505  else if(context->state == MQTT_CLIENT_STATE_CONNECTING)
506  {
507  //Establish network connection
508  error = mqttClientEstablishConnection(context, serverIpAddr,
509  serverPort);
510 
511  //Check status code
512  if(!error)
513  {
514  //Debug message
515  TRACE_INFO("MQTT: Connected to server\r\n");
516 
517  //The network connection is established
519  }
520  }
521  else if(context->state == MQTT_CLIENT_STATE_CONNECTED)
522  {
523  //Format CONNECT packet
524  error = mqttClientFormatConnect(context, cleanSession);
525 
526  //Check status code
527  if(!error)
528  {
529  //Debug message
530  TRACE_INFO("MQTT: Sending CONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
531  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
532 
533  //Save the type of the MQTT packet to be sent
534  context->packetType = MQTT_PACKET_TYPE_CONNECT;
535  //Point to the beginning of the packet
536  context->packetPos = 0;
537 
538  //Send CONNECT packet
540  }
541  }
542  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
543  {
544  //Send more data
545  error = mqttClientProcessEvents(context, context->settings.timeout);
546  }
547  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
548  {
549  //Wait for CONNACK packet
550  error = mqttClientProcessEvents(context, context->settings.timeout);
551  }
552  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
553  {
554  //Receive more data
555  error = mqttClientProcessEvents(context, context->settings.timeout);
556  }
557  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
558  {
559  //Reset packet type
560  context->packetType = MQTT_PACKET_TYPE_INVALID;
561  //A CONNACK packet has been received
563  }
564  else if(context->state == MQTT_CLIENT_STATE_IDLE)
565  {
566  //The MQTT client is connected
567  break;
568  }
569  else
570  {
571  //Invalid state
572  error = ERROR_NOT_CONNECTED;
573  }
574  }
575 
576  //Check status code
577  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
578  {
579  //Check whether the timeout has elapsed
580  error = mqttClientCheckTimeout(context);
581  }
582 
583  //Failed to establish connection with the MQTT server?
584  if(error != NO_ERROR && error != ERROR_WOULD_BLOCK)
585  {
586  //Clean up side effects
587  mqttClientCloseConnection(context);
588  //Update MQTT client state
590  }
591 
592  //Return status code
593  return error;
594 }
595 
596 
597 /**
598  * @brief Publish message
599  * @param[in] context Pointer to the MQTT client context
600  * @param[in] topic Topic name
601  * @param[in] message Message payload
602  * @param[in] length Length of the message payload
603  * @param[in] qos QoS level to be used when publishing the message
604  * @param[in] retain This flag specifies if the message is to be retained
605  * @param[out] packetId Packet identifier used to send the PUBLISH packet
606  * @return Error code
607  **/
608 
610  const char_t *topic, const void *message, size_t length,
611  MqttQosLevel qos, bool_t retain, uint16_t *packetId)
612 {
613  error_t error;
614 
615  //Check parameters
616  if(context == NULL || topic == NULL)
618  if(message == NULL && length != 0)
620 
621  //Initialize status code
622  error = NO_ERROR;
623 
624  //Send PUBLISH packet and wait for PUBACK/PUBCOMP packet to be received
625  while(!error)
626  {
627  //Check current state
628  if(context->state == MQTT_CLIENT_STATE_IDLE)
629  {
630  //Check for transmission completion
631  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
632  {
633  //Format PUBLISH packet
634  error = mqttClientFormatPublish(context, topic, message,
635  length, qos, retain);
636 
637  //Check status code
638  if(!error)
639  {
640  //Save the packet identifier used to send the PUBLISH packet
641  if(packetId != NULL)
642  *packetId = context->packetId;
643 
644  //Debug message
645  TRACE_INFO("MQTT: Sending PUBLISH packet (%" PRIuSIZE " bytes)...\r\n",
646  context->packetLen);
647 
648  //Dump the contents of the PUBLISH packet
649  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
650 
651  //Save the type of the MQTT packet to be sent
652  context->packetType = MQTT_PACKET_TYPE_PUBLISH;
653  //Point to the beginning of the packet
654  context->packetPos = 0;
655 
656  //Send PUBLISH packet
658  //Save the time at which the packet was sent
659  context->startTime = osGetSystemTime();
660  }
661  }
662  else
663  {
664  //Reset packet type
665  context->packetType = MQTT_PACKET_TYPE_INVALID;
666  //We are done
667  break;
668  }
669  }
670  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
671  {
672  //Send more data
673  error = mqttClientProcessEvents(context, context->settings.timeout);
674  }
675  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
676  {
677  //The last parameter is optional
678  if(packetId != NULL)
679  {
680  //Do not wait for PUBACK/PUBCOMP packet
682  }
683  else
684  {
685  //Check QoS level
686  if(qos == MQTT_QOS_LEVEL_0)
687  {
688  //No response is sent by the receiver and no retry is performed by the sender
690  }
691  else
692  {
693  //Wait for PUBACK/PUBCOMP packet
694  error = mqttClientProcessEvents(context, context->settings.timeout);
695  }
696  }
697  }
698  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
699  {
700  //Receive more data
701  error = mqttClientProcessEvents(context, context->settings.timeout);
702  }
703  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
704  {
705  //A PUBACK/PUBCOMP packet has been received
707  }
708  else
709  {
710  //Invalid state
711  error = ERROR_NOT_CONNECTED;
712  }
713  }
714 
715  //Check status code
716  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
717  {
718  //Check whether the timeout has elapsed
719  error = mqttClientCheckTimeout(context);
720  }
721 
722  //Return status code
723  return error;
724 }
725 
726 
727 /**
728  * @brief Subscribe to topic
729  * @param[in] context Pointer to the MQTT client context
730  * @param[in] topic Topic filter
731  * @param[in] qos Maximum QoS level at which the server can send application
732  * messages to the client
733  * @param[out] packetId Packet identifier used to send the SUBSCRIBE packet
734  * @return Error code
735  **/
736 
738  const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
739 {
740  error_t error;
741 
742  //Check parameters
743  if(context == NULL || topic == NULL)
745 
746  //Initialize status code
747  error = NO_ERROR;
748 
749  //Send SUBSCRIBE packet and wait for SUBACK packet to be received
750  while(!error)
751  {
752  //Check current state
753  if(context->state == MQTT_CLIENT_STATE_IDLE)
754  {
755  //Check for transmission completion
756  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
757  {
758  //Format SUBSCRIBE packet
759  error = mqttClientFormatSubscribe(context, topic, qos);
760 
761  //Check status code
762  if(!error)
763  {
764  //Save the packet identifier used to send the SUBSCRIBE packet
765  if(packetId != NULL)
766  *packetId = context->packetId;
767 
768  //Debug message
769  TRACE_INFO("MQTT: Sending SUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
770  context->packetLen);
771 
772  //Dump the contents of the SUBSCRIBE packet
773  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
774 
775  //Save the type of the MQTT packet to be sent
776  context->packetType = MQTT_PACKET_TYPE_SUBSCRIBE;
777  //Point to the beginning of the packet
778  context->packetPos = 0;
779 
780  //Send SUBSCRIBE packet
782  //Save the time at which the packet was sent
783  context->startTime = osGetSystemTime();
784  }
785  }
786  else
787  {
788  //Reset packet type
789  context->packetType = MQTT_PACKET_TYPE_INVALID;
790  //We are done
791  break;
792  }
793  }
794  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
795  {
796  //Send more data
797  error = mqttClientProcessEvents(context, context->settings.timeout);
798  }
799  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
800  {
801  //The last parameter is optional
802  if(packetId != NULL)
803  {
804  //Do not wait for SUBACK packet
806  }
807  else
808  {
809  //Wait for SUBACK packet
810  error = mqttClientProcessEvents(context, context->settings.timeout);
811  }
812  }
813  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
814  {
815  //Receive more data
816  error = mqttClientProcessEvents(context, context->settings.timeout);
817  }
818  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
819  {
820  //A SUBACK packet has been received
822  }
823  else
824  {
825  //Invalid state
826  error = ERROR_NOT_CONNECTED;
827  }
828  }
829 
830  //Check status code
831  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
832  {
833  //Check whether the timeout has elapsed
834  error = mqttClientCheckTimeout(context);
835  }
836 
837  //Return status code
838  return error;
839 }
840 
841 
842 /**
843  * @brief Unsubscribe from topic
844  * @param[in] context Pointer to the MQTT client context
845  * @param[in] topic Topic filter
846  * @param[out] packetId Packet identifier used to send the UNSUBSCRIBE packet
847  * @return Error code
848  **/
849 
851  const char_t *topic, uint16_t *packetId)
852 {
853  error_t error;
854 
855  //Check parameters
856  if(context == NULL || topic == NULL)
858 
859  //Initialize status code
860  error = NO_ERROR;
861 
862  //Send UNSUBSCRIBE packet and wait for UNSUBACK packet to be received
863  while(!error)
864  {
865  //Check current state
866  if(context->state == MQTT_CLIENT_STATE_IDLE)
867  {
868  //Check for transmission completion
869  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
870  {
871  //Format UNSUBSCRIBE packet
872  error = mqttClientFormatUnsubscribe(context, topic);
873 
874  //Check status code
875  if(!error)
876  {
877  //Save the packet identifier used to send the UNSUBSCRIBE packet
878  if(packetId != NULL)
879  *packetId = context->packetId;
880 
881  //Debug message
882  TRACE_INFO("MQTT: Sending UNSUBSCRIBE packet (%" PRIuSIZE " bytes)...\r\n",
883  context->packetLen);
884 
885  //Dump the contents of the UNSUBSCRIBE packet
886  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
887 
888  //Save the type of the MQTT packet to be sent
889  context->packetType = MQTT_PACKET_TYPE_UNSUBSCRIBE;
890  //Point to the beginning of the packet
891  context->packetPos = 0;
892 
893  //Send UNSUBSCRIBE packet
895  //Save the time at which the packet was sent
896  context->startTime = osGetSystemTime();
897  }
898  }
899  else
900  {
901  //Reset packet type
902  context->packetType = MQTT_PACKET_TYPE_INVALID;
903  //We are done
904  break;
905  }
906  }
907  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
908  {
909  //Send more data
910  error = mqttClientProcessEvents(context, context->settings.timeout);
911  }
912  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
913  {
914  //The last parameter is optional
915  if(packetId != NULL)
916  {
917  //Do not wait for UNSUBACK packet
919  }
920  else
921  {
922  //Wait for UNSUBACK packet
923  error = mqttClientProcessEvents(context, context->settings.timeout);
924  }
925  }
926  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
927  {
928  //Receive more data
929  error = mqttClientProcessEvents(context, context->settings.timeout);
930  }
931  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
932  {
933  //An UNSUBACK packet has been received
935  }
936  else
937  {
938  //Invalid state
939  error = ERROR_NOT_CONNECTED;
940  }
941  }
942 
943  //Check status code
944  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
945  {
946  //Check whether the timeout has elapsed
947  error = mqttClientCheckTimeout(context);
948  }
949 
950  //Return status code
951  return error;
952 }
953 
954 
955 /**
956  * @brief Send ping request
957  * @param[in] context Pointer to the MQTT client context
958  * @param[out] rtt Round-trip time (optional parameter)
959  * @return Error code
960  **/
961 
963 {
964  error_t error;
965 
966  //Make sure the MQTT client context is valid
967  if(context == NULL)
969 
970  //Initialize status code
971  error = NO_ERROR;
972 
973  //Send PINGREQ packet and wait for PINGRESP packet to be received
974  while(!error)
975  {
976  //Check current state
977  if(context->state == MQTT_CLIENT_STATE_IDLE)
978  {
979  //Check for transmission completion
980  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
981  {
982  //Format PINGREQ packet
983  error = mqttClientFormatPingReq(context);
984 
985  //Check status code
986  if(!error)
987  {
988  //Debug message
989  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n",
990  context->packetLen);
991 
992  //Dump the contents of the PINGREQ packet
993  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
994 
995  //Save the type of the MQTT packet to be sent
996  context->packetType = MQTT_PACKET_TYPE_PINGREQ;
997  //Point to the beginning of the packet
998  context->packetPos = 0;
999 
1000  //Send PINGREQ packet
1002  //Save the time at which the packet was sent
1003  context->startTime = osGetSystemTime();
1004  }
1005  }
1006  else
1007  {
1008  //Reset packet type
1009  context->packetType = MQTT_PACKET_TYPE_INVALID;
1010  //We are done
1011  break;
1012  }
1013  }
1014  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1015  {
1016  //Send more data
1017  error = mqttClientProcessEvents(context, context->settings.timeout);
1018  }
1019  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1020  {
1021  //The last parameter is optional
1022  if(rtt != NULL)
1023  {
1024  //Wait for PINGRESP packet
1025  error = mqttClientProcessEvents(context, context->settings.timeout);
1026  }
1027  else
1028  {
1029  //Do not wait for PINGRESP packet
1031  }
1032  }
1033  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
1034  {
1035  //Receive more data
1036  error = mqttClientProcessEvents(context, context->settings.timeout);
1037  }
1038  else if(context->state == MQTT_CLIENT_STATE_PACKET_RECEIVED)
1039  {
1040  //The last parameter is optional
1041  if(rtt != NULL)
1042  {
1043  //Compute round-trip time
1044  *rtt = osGetSystemTime() - context->startTime;
1045  }
1046 
1047  //A PINGRESP packet has been received
1049  }
1050  else
1051  {
1052  //Invalid state
1053  error = ERROR_NOT_CONNECTED;
1054  }
1055  }
1056 
1057  //Check status code
1058  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1059  {
1060  //Check whether the timeout has elapsed
1061  error = mqttClientCheckTimeout(context);
1062  }
1063 
1064  //Return status code
1065  return error;
1066 }
1067 
1068 
1069 /**
1070  * @brief Process MQTT client events
1071  * @param[in] context Pointer to the MQTT client context
1072  * @param[in] timeout Maximum time to wait before returning
1073  * @return Error code
1074  **/
1075 
1077 {
1078  error_t error;
1079 
1080  //Make sure the MQTT client context is valid
1081  if(context == NULL)
1082  return ERROR_INVALID_PARAMETER;
1083 
1084  //Process MQTT client events
1085  error = mqttClientProcessEvents(context, timeout);
1086 
1087  //Check status code
1088  if(error == ERROR_TIMEOUT)
1089  {
1090  //Catch exception
1091  error = NO_ERROR;
1092  }
1093 
1094  //Return status code
1095  return error;
1096 }
1097 
1098 
1099 /**
1100  * @brief Gracefully disconnect from the MQTT server
1101  * @param[in] context Pointer to the MQTT client context
1102  * @return Error code
1103  **/
1104 
1106 {
1107  error_t error;
1108 
1109  //Make sure the MQTT client context is valid
1110  if(context == NULL)
1111  return ERROR_INVALID_PARAMETER;
1112 
1113  //Initialize status code
1114  error = NO_ERROR;
1115 
1116  //Send DISCONNECT packet and shutdown network connection
1117  while(!error)
1118  {
1119  //Check current state
1120  if(context->state == MQTT_CLIENT_STATE_IDLE)
1121  {
1122  //Format DISCONNECT packet
1123  error = mqttClientFormatDisconnect(context);
1124 
1125  //Check status code
1126  if(!error)
1127  {
1128  //Debug message
1129  TRACE_INFO("MQTT: Sending DISCONNECT packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
1130  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
1131 
1132  //Save the type of the MQTT packet to be sent
1133  context->packetType = MQTT_PACKET_TYPE_DISCONNECT;
1134  //Point to the beginning of the packet
1135  context->packetPos = 0;
1136 
1137  //Send DISCONNECT packet
1139  //Save the time at which the packet was sent
1140  context->startTime = osGetSystemTime();
1141  }
1142  }
1143  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
1144  {
1145  //Send more data
1146  error = mqttClientProcessEvents(context, context->settings.timeout);
1147  }
1148  else if(context->state == MQTT_CLIENT_STATE_PACKET_SENT)
1149  {
1150  //Debug message
1151  TRACE_INFO("MQTT: Shutting down connection...\r\n");
1152 
1153  //After sending a DISCONNECT packet the client must not send any
1154  //more control packets on that network connection
1156  }
1157  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTING)
1158  {
1159  //Properly dispose the network connection
1160  error = mqttClientShutdownConnection(context);
1161 
1162  //Check status code
1163  if(!error)
1164  {
1165  //The MQTT client is disconnected
1167  }
1168  }
1169  else if(context->state == MQTT_CLIENT_STATE_DISCONNECTED)
1170  {
1171  //The MQTT client is disconnected
1172  break;
1173  }
1174  else
1175  {
1176  //Invalid state
1177  error = ERROR_NOT_CONNECTED;
1178  }
1179  }
1180 
1181  //Check status code
1182  if(error == ERROR_WOULD_BLOCK || error == ERROR_TIMEOUT)
1183  {
1184  //Check whether the timeout has elapsed
1185  error = mqttClientCheckTimeout(context);
1186  }
1187 
1188  //Failed to gracefully disconnect from the MQTT server?
1189  if(error != NO_ERROR && error != ERROR_WOULD_BLOCK)
1190  {
1191  //Close connection
1192  mqttClientCloseConnection(context);
1193  //Update MQTT client state
1195  }
1196 
1197  //Return status code
1198  return error;
1199 }
1200 
1201 
1202 /**
1203  * @brief Close the connection with the MQTT server
1204  * @param[in] context Pointer to the MQTT client context
1205  * @return Error code
1206  **/
1207 
1209 {
1210  //Make sure the MQTT client context is valid
1211  if(context == NULL)
1212  return ERROR_INVALID_PARAMETER;
1213 
1214  //Close connection
1215  mqttClientCloseConnection(context);
1216  //Update MQTT client state
1218 
1219  //Successful processing
1220  return NO_ERROR;
1221 }
1222 
1223 
1224 /**
1225  * @brief Release MQTT client context
1226  * @param[in] context Pointer to the MQTT client context
1227  **/
1228 
1230 {
1231  //Make sure the MQTT client context is valid
1232  if(context != NULL)
1233  {
1234  //Close connection
1235  mqttClientCloseConnection(context);
1236 
1237 #if (MQTT_CLIENT_TLS_SUPPORT == ENABLED)
1238  //Release TLS session state
1239  tlsFreeSessionState(&context->tlsSession);
1240 #endif
1241 
1242  //Clear MQTT client context
1243  osMemset(context, 0, sizeof(MqttClientContext));
1244  }
1245 }
1246 
1247 #endif
uint8_t message[]
Definition: chap.h:154
uint8_t version
Definition: coap_common.h:177
#define PRIuSIZE
char char_t
Definition: compiler_port.h:48
int bool_t
Definition: compiler_port.h:53
Debugging facilities.
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:108
#define TRACE_INFO(...)
Definition: debug.h:95
error_t
Error codes.
Definition: error.h:43
@ ERROR_WOULD_BLOCK
Definition: error.h:96
@ ERROR_TIMEOUT
Definition: error.h:95
@ ERROR_NOT_CONNECTED
Definition: error.h:80
@ NO_ERROR
Success.
Definition: error.h:44
@ ERROR_INVALID_LENGTH
Definition: error.h:111
@ ERROR_INVALID_PARAMETER
Invalid parameter.
Definition: error.h:47
char_t * ipAddrToString(const IpAddr *ipAddr, char_t *str)
Convert a binary IP address to a string representation.
Definition: ip.c:838
error_t mqttClientSetVersion(MqttClientContext *context, MqttVersion version)
Set the MQTT protocol version to be used.
Definition: mqtt_client.c:138
error_t mqttClientInit(MqttClientContext *context)
Initialize MQTT client context.
Definition: mqtt_client.c:52
error_t mqttClientSetHost(MqttClientContext *context, const char_t *host)
Set the domain name of the server (for virtual hosting)
Definition: mqtt_client.c:275
error_t mqttClientPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain, uint16_t *packetId)
Publish message.
Definition: mqtt_client.c:609
void mqttClientDeinit(MqttClientContext *context)
Release MQTT client context.
Definition: mqtt_client.c:1229
error_t mqttClientDisconnect(MqttClientContext *context)
Gracefully disconnect from the MQTT server.
Definition: mqtt_client.c:1105
error_t mqttClientSetTransportProtocol(MqttClientContext *context, MqttTransportProtocol transportProtocol)
Set the transport protocol to be used.
Definition: mqtt_client.c:160
error_t mqttClientSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos, uint16_t *packetId)
Subscribe to topic.
Definition: mqtt_client.c:737
error_t mqttClientSetKeepAlive(MqttClientContext *context, uint16_t keepAlive)
Set keep-alive value.
Definition: mqtt_client.c:254
error_t mqttClientUnsubscribe(MqttClientContext *context, const char_t *topic, uint16_t *packetId)
Unsubscribe from topic.
Definition: mqtt_client.c:850
error_t mqttClientConnect(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort, bool_t cleanSession)
Establish connection with the MQTT server.
Definition: mqtt_client.c:471
error_t mqttClientClose(MqttClientContext *context)
Close the connection with the MQTT server.
Definition: mqtt_client.c:1208
error_t mqttClientSetIdentifier(MqttClientContext *context, const char_t *clientId)
Set client identifier.
Definition: mqtt_client.c:329
error_t mqttClientRegisterTlsInitCallback(MqttClientContext *context, MqttClientTlsInitCallback callback)
Register TLS initialization callback function.
Definition: mqtt_client.c:184
error_t mqttClientPing(MqttClientContext *context, systime_t *rtt)
Send ping request.
Definition: mqtt_client.c:962
error_t mqttClientSetUri(MqttClientContext *context, const char_t *uri)
Set the name of the resource being requested.
Definition: mqtt_client.c:302
void mqttClientInitCallbacks(MqttClientCallbacks *callbacks)
Initialize callback structure.
Definition: mqtt_client.c:102
error_t mqttClientBindToInterface(MqttClientContext *context, NetInterface *interface)
Bind the MQTT client to a particular network interface.
Definition: mqtt_client.c:445
error_t mqttClientRegisterPublishCallback(MqttClientContext *context, MqttClientPublishCallback callback)
Register publish callback function.
Definition: mqtt_client.c:209
error_t mqttClientSetTimeout(MqttClientContext *context, systime_t timeout)
Set communication timeout.
Definition: mqtt_client.c:231
error_t mqttClientSetWillMessage(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Specify the Will message.
Definition: mqtt_client.c:392
error_t mqttClientSetAuthInfo(MqttClientContext *context, const char_t *username, const char_t *password)
Set authentication information.
Definition: mqtt_client.c:356
error_t mqttClientTask(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
Definition: mqtt_client.c:1076
error_t mqttClientRegisterCallbacks(MqttClientContext *context, const MqttClientCallbacks *callbacks)
Register MQTT client callbacks.
Definition: mqtt_client.c:116
MQTT client.
#define MQTT_CLIENT_DEFAULT_TIMEOUT
Definition: mqtt_client.h:68
#define MQTT_CLIENT_MAX_HOST_LEN
Definition: mqtt_client.h:75
#define MQTT_CLIENT_DEFAULT_KEEP_ALIVE
Definition: mqtt_client.h:61
void(* MqttClientPublishCallback)(MqttClientContext *context, const char_t *topic, const uint8_t *message, size_t length, bool_t dup, MqttQosLevel qos, bool_t retain, uint16_t packetId)
PUBLISH message received callback.
Definition: mqtt_client.h:186
@ MQTT_CLIENT_STATE_PACKET_RECEIVED
Definition: mqtt_client.h:169
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:168
@ MQTT_CLIENT_STATE_DISCONNECTING
Definition: mqtt_client.h:170
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:164
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:166
@ MQTT_CLIENT_STATE_CONNECTING
Definition: mqtt_client.h:162
@ MQTT_CLIENT_STATE_CONNECTED
Definition: mqtt_client.h:163
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:165
@ MQTT_CLIENT_STATE_DISCONNECTED
Definition: mqtt_client.h:161
#define MQTT_CLIENT_MAX_USERNAME_LEN
Definition: mqtt_client.h:96
#define MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN
Definition: mqtt_client.h:117
error_t(* MqttClientTlsInitCallback)(MqttClientContext *context, TlsContext *tlsContext)
TLS initialization callback.
Definition: mqtt_client.h:253
#define MQTT_CLIENT_MAX_WILL_TOPIC_LEN
Definition: mqtt_client.h:110
#define MqttClientContext
Definition: mqtt_client.h:147
#define MQTT_CLIENT_MAX_PASSWORD_LEN
Definition: mqtt_client.h:103
#define MQTT_CLIENT_MAX_ID_LEN
Definition: mqtt_client.h:89
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
#define MQTT_CLIENT_MAX_URI_LEN
Definition: mqtt_client.h:82
error_t mqttClientCheckTimeout(MqttClientContext *context)
Determine whether a timeout error has occurred.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
Helper functions for MQTT client.
error_t mqttClientFormatPublish(MqttClientContext *context, const char_t *topic, const void *message, size_t length, MqttQosLevel qos, bool_t retain)
Format PUBLISH packet.
error_t mqttClientFormatUnsubscribe(MqttClientContext *context, const char_t *topic)
Format UNSUBSCRIBE packet.
error_t mqttClientFormatSubscribe(MqttClientContext *context, const char_t *topic, MqttQosLevel qos)
Format SUBSCRIBE packet.
error_t mqttClientFormatConnect(MqttClientContext *context, bool_t cleanSession)
Format CONNECT packet.
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
error_t mqttClientFormatDisconnect(MqttClientContext *context)
Format DISCONNECT packet.
MQTT packet parsing and formatting.
error_t mqttClientOpenConnection(MqttClientContext *context)
Open network connection.
void mqttClientCloseConnection(MqttClientContext *context)
Close network connection.
error_t mqttClientShutdownConnection(MqttClientContext *context)
Shutdown network connection.
error_t mqttClientEstablishConnection(MqttClientContext *context, const IpAddr *serverIpAddr, uint16_t serverPort)
Establish network connection.
Transport protocol abstraction layer.
uint8_t qos
Definition: mqtt_common.h:181
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:87
@ MQTT_QOS_LEVEL_0
At most once delivery.
Definition: mqtt_common.h:88
MqttTransportProtocol
Transport protocol.
Definition: mqtt_common.h:74
@ MQTT_TRANSPORT_PROTOCOL_TCP
Definition: mqtt_common.h:75
MqttVersion
MQTT protocol level.
Definition: mqtt_common.h:63
@ MQTT_VERSION_3_1_1
MQTT version 3.1.1.
Definition: mqtt_common.h:65
@ MQTT_PACKET_TYPE_DISCONNECT
Client is disconnecting.
Definition: mqtt_common.h:114
@ MQTT_PACKET_TYPE_SUBSCRIBE
Client subscribe request.
Definition: mqtt_common.h:108
@ MQTT_PACKET_TYPE_UNSUBSCRIBE
Unsubscribe request.
Definition: mqtt_common.h:110
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
@ MQTT_PACKET_TYPE_PINGREQ
Ping request.
Definition: mqtt_common.h:112
@ MQTT_PACKET_TYPE_CONNECT
Client request to connect to server.
Definition: mqtt_common.h:101
@ MQTT_PACKET_TYPE_PUBLISH
Publish message.
Definition: mqtt_common.h:103
char_t clientId[]
TCP/IP stack core.
#define NetInterface
Definition: net.h:36
#define osMemset(p, value, length)
Definition: os_port.h:135
#define osMemcpy(dest, src, length)
Definition: os_port.h:141
#define osStrlen(s)
Definition: os_port.h:165
#define osStrcpy(s1, s2)
Definition: os_port.h:207
systime_t osGetSystemTime(void)
Retrieve system time.
uint32_t systime_t
System time.
IP network address.
Definition: ip.h:79
MQTT client callback functions.
Definition: mqtt_client.h:278
uint8_t payload[MQTT_CLIENT_MAX_WILL_PAYLOAD_LEN]
Will message payload.
Definition: mqtt_client.h:266
char_t topic[MQTT_CLIENT_MAX_WILL_TOPIC_LEN+1]
Will topic name.
Definition: mqtt_client.h:265
size_t length
Length of the Will message payload.
Definition: mqtt_client.h:267
MqttQosLevel qos
QoS level to be used when publishing the Will message.
Definition: mqtt_client.h:268
bool_t retain
Specifies if the Will message is to be retained.
Definition: mqtt_client.h:269
uint8_t length
Definition: tcp.h:368
void tlsFreeSessionState(TlsSessionState *session)
Properly dispose a session state.
Definition: tls.c:2743
error_t tlsInitSessionState(TlsSessionState *session)
Initialize session state.
Definition: tls.c:2600