mqtt_client_misc.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client_misc.c
3  * @brief Helper functions for MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2024 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 2.4.0
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Update MQTT client state
48  * @param[in] context Pointer to the MQTT client context
49  * @param[in] newState New state to switch to
50  **/
51 
53  MqttClientState newState)
54 {
55  //Switch to the new state
56  context->state = newState;
57 }
58 
59 
60 /**
61  * @brief Process MQTT client events
62  * @param[in] context Pointer to the MQTT client context
63  * @param[in] timeout Maximum time to wait before returning
64  * @return Error code
65  **/
66 
68 {
69  error_t error;
70  size_t n;
71 
72  //It is the responsibility of the client to ensure that the interval
73  //between control packets being sent does not exceed the keep-alive value
74  error = mqttClientCheckKeepAlive(context);
75 
76  //Check status code
77  if(!error)
78  {
79  //Check current state
80  if(context->state == MQTT_CLIENT_STATE_IDLE ||
81  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
82  {
83  //Wait for incoming data
84  error = mqttClientWaitForData(context, timeout);
85 
86  //Check status code
87  if(!error)
88  {
89  //Initialize context
90  context->packet = context->buffer;
91  context->packetPos = 0;
92  context->packetLen = 0;
93  context->remainingLen = 0;
94 
95  //Start receiving the packet
97  }
98  }
99  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
100  {
101  //Receive the incoming packet
102  error = mqttClientReceivePacket(context);
103 
104  //Check status code
105  if(!error)
106  {
107  //Process MQTT control packet
108  error = mqttClientProcessPacket(context);
109 
110  //Update MQTT client state
111  if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
112  {
113  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
114  {
116  }
117  else
118  {
120  }
121  }
122  }
123  }
124  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
125  {
126  //Any remaining data to be sent?
127  if(context->packetPos < context->packetLen)
128  {
129  //Send more data
130  error = mqttClientSendData(context, context->packet + context->packetPos,
131  context->packetLen - context->packetPos, &n, 0);
132 
133  //Advance data pointer
134  context->packetPos += n;
135  }
136  else
137  {
138  //Save the time at which the message was sent
139  context->keepAliveTimestamp = osGetSystemTime();
140 
141  //Update MQTT client state
142  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
143  {
145  }
146  else
147  {
149  }
150  }
151  }
152  }
153 
154  //Return status code
155  return error;
156 }
157 
158 
159 /**
160  * @brief Check keep-alive time interval
161  * @param[in] context Pointer to the MQTT client context
162  * @return Error code
163  **/
164 
166 {
167  error_t error;
168  systime_t time;
169  systime_t keepAlive;
170 
171  //Initialize status code
172  error = NO_ERROR;
173 
174  //In the absence of sending any other control packets, the client must
175  //send a PINGREQ packet
176  if(context->state == MQTT_CLIENT_STATE_IDLE ||
177  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
178  {
179  //A keep-alive value of zero has the effect of turning off the keep
180  //alive mechanism
181  if(context->settings.keepAlive != 0)
182  {
183  //Get current time
184  time = osGetSystemTime();
185 
186  //Convert the keep-alive value to milliseconds
187  keepAlive = context->settings.keepAlive * 1000;
188 
189  //It is the responsibility of the client to ensure that the interval
190  //between control packets being sent does not exceed the keep-alive value
191  if(timeCompare(time, context->keepAliveTimestamp + keepAlive) >= 0)
192  {
193  //Format PINGREQ packet
194  error = mqttClientFormatPingReq(context);
195 
196  //Check status code
197  if(!error)
198  {
199  //Debug message
200  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
201  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
202 
203  //Point to the beginning of the packet
204  context->packetPos = 0;
205 
206  //Send PINGREQ packet
208  }
209  }
210  }
211  }
212 
213  //Return status code
214  return error;
215 }
216 
217 
218 /**
219  * @brief Serialize fixed header
220  * @param[in] buffer Pointer to the output buffer
221  * @param[in,out] pos Current position
222  * @param[in] type MQTT control packet type
223  * @param[in] dup DUP flag
224  * @param[in] qos QoS field
225  * @param[in] retain RETAIN flag
226  * @param[in] remainingLen Length of the variable header and the payload
227  * @return Error code
228  **/
229 
230 error_t mqttSerializeHeader(uint8_t *buffer, size_t *pos, MqttPacketType type,
231  bool_t dup, MqttQosLevel qos, bool_t retain, size_t remainingLen)
232 {
233  uint_t i;
234  uint_t k;
235  size_t n;
236  MqttPacketHeader *header;
237 
238  //Point to the current position
239  n = *pos;
240 
241  //The Remaining Length is encoded using a variable length encoding scheme
242  if(remainingLen < 128)
243  {
244  k = 1;
245  }
246  else if(remainingLen < 16384)
247  {
248  k = 2;
249  }
250  else if(remainingLen < 2097152)
251  {
252  k = 3;
253  }
254  else if(remainingLen < 268435456)
255  {
256  k = 4;
257  }
258  else
259  {
260  return ERROR_INVALID_LENGTH;
261  }
262 
263  //Sanity check
264  if(n < (sizeof(MqttPacketHeader) + k))
265  return ERROR_BUFFER_OVERFLOW;
266 
267  //Position where to format the header
268  n -= sizeof(MqttPacketHeader) + k;
269 
270  //Point to the MQTT packet header
271  header = (MqttPacketHeader *) (buffer + n);
272 
273  //Encode the first byte of the header
274  header->type = type;
275  header->dup = dup;
276  header->qos = qos;
277  header->retain = retain;
278 
279  //Encode the Remaining Length field
280  for(i = 0; i < k; i++)
281  {
282  //The least significant seven bits of each byte encode the data
283  header->length[i] = remainingLen & 0xFF;
284  remainingLen >>= 7;
285 
286  //The most significant bit is used to indicate that there are
287  //following bytes in the representation
288  if(remainingLen > 0)
289  header->length[i] |= 0x80;
290  }
291 
292  //Update current position
293  *pos = n;
294 
295  //Successful processing
296  return NO_ERROR;
297 }
298 
299 
300 /**
301  * @brief Write a 8-bit integer to the output buffer
302  * @param[in] buffer Pointer to the output buffer
303  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
304  * @param[in,out] pos Current position
305  * @param[in] value 8-bit integer to be serialized
306  * @return Error code
307  **/
308 
309 error_t mqttSerializeByte(uint8_t *buffer, size_t bufferLen,
310  size_t *pos, uint8_t value)
311 {
312  size_t n;
313 
314  //Point to the current position
315  n = *pos;
316 
317  //Make sure the output buffer is large enough
318  if((n + sizeof(uint8_t)) > bufferLen)
319  return ERROR_BUFFER_OVERFLOW;
320 
321  //Write the byte to the output buffer
322  buffer[n++] = value;
323 
324  //Advance current position
325  *pos = n;
326 
327  //Successful processing
328  return NO_ERROR;
329 }
330 
331 
332 /**
333  * @brief Write a 16-bit integer to the output buffer
334  * @param[in] buffer Pointer to the output buffer
335  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
336  * @param[in,out] pos Current position
337  * @param[in] value 16-bit integer to be serialized
338  * @return Error code
339  **/
340 
341 error_t mqttSerializeShort(uint8_t *buffer, size_t bufferLen,
342  size_t *pos, uint16_t value)
343 {
344  size_t n;
345 
346  //Point to the current position
347  n = *pos;
348 
349  //Make sure the output buffer is large enough
350  if((n + sizeof(uint16_t)) > bufferLen)
351  return ERROR_BUFFER_OVERFLOW;
352 
353  //Write the short integer to the output buffer
354  buffer[n++] = MSB(value);
355  buffer[n++] = LSB(value);
356 
357  //Advance current position
358  *pos = n;
359 
360  //Successful processing
361  return NO_ERROR;
362 }
363 
364 
365 /**
366  * @brief Serialize string
367  * @param[in] buffer Pointer to the output buffer
368  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
369  * @param[in,out] pos Current position
370  * @param[in] string Pointer to the string to be serialized
371  * @param[in] stringLen Length of the string, in bytes
372  * @return Error code
373  **/
374 
375 error_t mqttSerializeString(uint8_t *buffer, size_t bufferLen,
376  size_t *pos, const void *string, size_t stringLen)
377 {
378  size_t n;
379 
380  //Point to the current position
381  n = *pos;
382 
383  //Make sure the output buffer is large enough to hold the string
384  if((n + sizeof(uint16_t) + stringLen) > bufferLen)
385  return ERROR_BUFFER_OVERFLOW;
386 
387  //Encode the length field
388  buffer[n++] = MSB(stringLen);
389  buffer[n++] = LSB(stringLen);
390 
391  //Write the string to the output buffer
392  osMemcpy(buffer + n, string, stringLen);
393 
394  //Advance current position
395  *pos = n + stringLen;
396 
397  //Successful processing
398  return NO_ERROR;
399 }
400 
401 
402 /**
403  * @brief Serialize raw data
404  * @param[in] buffer Pointer to the output buffer
405  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
406  * @param[in,out] pos Current position
407  * @param[in] data Pointer to the raw data to be serialized
408  * @param[in] dataLen Length of the raw data, in bytes
409  * @return Error code
410  **/
411 
412 error_t mqttSerializeData(uint8_t *buffer, size_t bufferLen,
413  size_t *pos, const void *data, size_t dataLen)
414 {
415  size_t n;
416 
417  //Point to the current position
418  n = *pos;
419 
420  //Make sure the output buffer is large enough to hold the data
421  if((n + dataLen) > bufferLen)
422  return ERROR_BUFFER_OVERFLOW;
423 
424  //Write the data to the output buffer
425  osMemcpy(buffer + n, data, dataLen);
426 
427  //Advance current position
428  *pos = n + dataLen;
429 
430  //Successful processing
431  return NO_ERROR;
432 }
433 
434 
435 /**
436  * @brief Deserialize fixed header
437  * @param[in] buffer Pointer to the input buffer
438  * @param[in] bufferLen Length of the input buffer
439  * @param[in,out] pos Current position
440  * @param[out] type MQTT control packet type
441  * @param[out] dup DUP flag from the fixed header
442  * @param[out] qos QoS field from the fixed header
443  * @param[out] retain RETAIN flag from the fixed header
444  * @param[out] remainingLen Length of the variable header and the payload
445  * @return Error code
446  **/
447 
448 error_t mqttDeserializeHeader(uint8_t *buffer, size_t bufferLen, size_t *pos,
449  MqttPacketType *type, bool_t *dup, MqttQosLevel *qos, bool_t *retain, size_t *remainingLen)
450 {
451  uint_t i;
452  size_t n;
453  MqttPacketHeader *header;
454 
455  //Point to the current position
456  n = *pos;
457 
458  //Make sure the input buffer is large enough
459  if((n + sizeof(MqttPacketHeader)) > bufferLen)
460  return ERROR_INVALID_LENGTH;
461 
462  //Point to the MQTT packet header
463  header = (MqttPacketHeader *) (buffer + n);
464 
465  //Save MQTT control packet type
466  *type = (MqttPacketType) header->type;
467 
468  //Save flags
469  *dup = header->dup;
470  *qos = (MqttQosLevel) header->qos;
471  *retain = header->retain;
472 
473  //Advance current position
474  n += sizeof(MqttPacketHeader);
475 
476  //Prepare to decode the Remaining Length field
477  *remainingLen = 0;
478 
479  //The Remaining Length is encoded using a variable length encoding scheme
480  for(i = 0; i < 4; i++)
481  {
482  //Sanity check
483  if((n + sizeof(uint8_t)) > bufferLen)
484  return ERROR_INVALID_LENGTH;
485 
486  //Advance current position
487  n += sizeof(uint8_t);
488 
489  //The most significant bit is used to indicate that there are
490  //following bytes in the representation
491  if(header->length[i] & 0x80)
492  {
493  //Applications can send control packets of size up to 256 MB
494  if(i == 3)
495  return ERROR_INVALID_SYNTAX;
496 
497  //The least significant seven bits of each byte encode the data
498  *remainingLen |= (header->length[i] & 0x7F) << (7 * i);
499  }
500  else
501  {
502  //The least significant seven bits of each byte encode the data
503  *remainingLen |= header->length[i] << (7 * i);
504  //This is the last byte
505  break;
506  }
507  }
508 
509  //Return the current position
510  *pos = n;
511 
512  //Successful processing
513  return NO_ERROR;
514 }
515 
516 
517 /**
518  * @brief Read a 8-bit integer from the input buffer
519  * @param[in] buffer Pointer to the input buffer
520  * @param[in] bufferLen Length of the input buffer
521  * @param[in,out] pos Current position
522  * @param[out] value Value of the 8-bit integer
523  * @return Error code
524  **/
525 
526 error_t mqttDeserializeByte(uint8_t *buffer, size_t bufferLen,
527  size_t *pos, uint8_t *value)
528 {
529  size_t n;
530 
531  //Point to the current position
532  n = *pos;
533 
534  //Make sure the input buffer is large enough
535  if((n + sizeof(uint8_t)) > bufferLen)
536  return ERROR_BUFFER_OVERFLOW;
537 
538  //Read the short integer from the input buffer
539  *value = buffer[n];
540 
541  //Advance current position
542  *pos = n + sizeof(uint8_t);
543 
544  //Successful processing
545  return NO_ERROR;
546 }
547 
548 
549 /**
550  * @brief Read a 16-bit integer from the input buffer
551  * @param[in] buffer Pointer to the input buffer
552  * @param[in] bufferLen Length of the input buffer
553  * @param[in,out] pos Current position
554  * @param[out] value Value of the 16-bit integer
555  * @return Error code
556  **/
557 
558 error_t mqttDeserializeShort(uint8_t *buffer, size_t bufferLen,
559  size_t *pos, uint16_t *value)
560 {
561  size_t n;
562 
563  //Point to the current position
564  n = *pos;
565 
566  //Make sure the input buffer is large enough
567  if((n + sizeof(uint16_t)) > bufferLen)
568  return ERROR_BUFFER_OVERFLOW;
569 
570  //Read the short integer from the input buffer
571  *value = (buffer[n] << 8) | buffer[n + 1];
572 
573  //Advance current position
574  *pos = n + sizeof(uint16_t);
575 
576  //Successful processing
577  return NO_ERROR;
578 }
579 
580 
581 /**
582  * @brief Deserialize string
583  * @param[in] buffer Pointer to the input buffer
584  * @param[in] bufferLen Length of the input buffer
585  * @param[in,out] pos Current position
586  * @param[out] string Pointer to the string
587  * @param[out] stringLen Length of the string, in bytes
588  * @return Error code
589  **/
590 
591 error_t mqttDeserializeString(uint8_t *buffer, size_t bufferLen,
592  size_t *pos, char_t **string, size_t *stringLen)
593 {
594  size_t n;
595 
596  //Point to the current position
597  n = *pos;
598 
599  //Make sure the input buffer is large enough
600  if((n + sizeof(uint16_t)) > bufferLen)
601  return ERROR_BUFFER_OVERFLOW;
602 
603  //Decode the length field
604  *stringLen = (buffer[n] << 8) | buffer[n + 1];
605 
606  //Make sure the input buffer is large enough
607  if((n + sizeof(uint16_t) + *stringLen) > bufferLen)
608  return ERROR_BUFFER_OVERFLOW;
609 
610  //Read the string from the input buffer
611  *string = (char_t *) buffer + n + 2;
612 
613  //Advance current position
614  *pos = n + 2 + *stringLen;
615 
616  //Successful processing
617  return NO_ERROR;
618 }
619 
620 
621 /**
622  * @brief Determine whether a timeout error has occurred
623  * @param[in] context Pointer to the MQTT client context
624  * @return Error code
625  **/
626 
628 {
629 #if (NET_RTOS_SUPPORT == DISABLED)
630  error_t error;
631  systime_t time;
632 
633  //Get current time
634  time = osGetSystemTime();
635 
636  //Check whether the timeout has elapsed
637  if(timeCompare(time, context->startTime + context->settings.timeout) >= 0)
638  {
639  //Report a timeout error
640  error = ERROR_TIMEOUT;
641  }
642  else
643  {
644  //The operation would block
645  error = ERROR_WOULD_BLOCK;
646  }
647 
648  //Return status code
649  return error;
650 #else
651  //Report a timeout error
652  return ERROR_TIMEOUT;
653 #endif
654 }
655 
656 #endif
uint8_t type
Definition: coap_common.h:176
unsigned int uint_t
Definition: compiler_port.h:50
#define PRIuSIZE
char char_t
Definition: compiler_port.h:48
int bool_t
Definition: compiler_port.h:53
Debugging facilities.
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:108
#define TRACE_INFO(...)
Definition: debug.h:95
uint8_t n
uint32_t time
error_t
Error codes.
Definition: error.h:43
@ ERROR_WOULD_BLOCK
Definition: error.h:96
@ ERROR_TIMEOUT
Definition: error.h:95
@ ERROR_INVALID_SYNTAX
Definition: error.h:68
@ NO_ERROR
Success.
Definition: error.h:44
@ ERROR_BUFFER_OVERFLOW
Definition: error.h:142
@ ERROR_INVALID_LENGTH
Definition: error.h:111
uint8_t data[]
Definition: ethernet.h:222
MQTT client.
MqttClientState
MQTT client states.
Definition: mqtt_client.h:160
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:168
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:164
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:166
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:165
#define MqttClientContext
Definition: mqtt_client.h:147
error_t mqttDeserializeHeader(uint8_t *buffer, size_t bufferLen, size_t *pos, MqttPacketType *type, bool_t *dup, MqttQosLevel *qos, bool_t *retain, size_t *remainingLen)
Deserialize fixed header.
error_t mqttDeserializeByte(uint8_t *buffer, size_t bufferLen, size_t *pos, uint8_t *value)
Read a 8-bit integer from the input buffer.
error_t mqttSerializeData(uint8_t *buffer, size_t bufferLen, size_t *pos, const void *data, size_t dataLen)
Serialize raw data.
error_t mqttClientCheckTimeout(MqttClientContext *context)
Determine whether a timeout error has occurred.
error_t mqttSerializeString(uint8_t *buffer, size_t bufferLen, size_t *pos, const void *string, size_t stringLen)
Serialize string.
error_t mqttDeserializeShort(uint8_t *buffer, size_t bufferLen, size_t *pos, uint16_t *value)
Read a 16-bit integer from the input buffer.
error_t mqttSerializeHeader(uint8_t *buffer, size_t *pos, MqttPacketType type, bool_t dup, MqttQosLevel qos, bool_t retain, size_t remainingLen)
Serialize fixed header.
error_t mqttSerializeShort(uint8_t *buffer, size_t bufferLen, size_t *pos, uint16_t value)
Write a 16-bit integer to the output buffer.
error_t mqttDeserializeString(uint8_t *buffer, size_t bufferLen, size_t *pos, char_t **string, size_t *stringLen)
Deserialize string.
error_t mqttSerializeByte(uint8_t *buffer, size_t bufferLen, size_t *pos, uint8_t value)
Write a 8-bit integer to the output buffer.
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
error_t mqttClientCheckKeepAlive(MqttClientContext *context)
Check keep-alive time interval.
Helper functions for MQTT client.
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
error_t mqttClientProcessPacket(MqttClientContext *context)
Process incoming MQTT packet.
error_t mqttClientReceivePacket(MqttClientContext *context)
Receive MQTT packet.
MQTT packet parsing and formatting.
error_t mqttClientSendData(MqttClientContext *context, const void *data, size_t length, size_t *written, uint_t flags)
Send data using the relevant transport protocol.
error_t mqttClientWaitForData(MqttClientContext *context, systime_t timeout)
Wait for incoming data.
Transport protocol abstraction layer.
uint8_t dup
Definition: mqtt_common.h:182
uint8_t qos
Definition: mqtt_common.h:181
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:87
MqttPacketHeader
Definition: mqtt_common.h:186
MqttPacketType
MQTT control packet type.
Definition: mqtt_common.h:99
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
TCP/IP stack core.
#define osMemcpy(dest, src, length)
Definition: os_port.h:141
#define timeCompare(t1, t2)
Definition: os_port.h:40
#define LSB(x)
Definition: os_port.h:55
#define MSB(x)
Definition: os_port.h:59
systime_t osGetSystemTime(void)
Retrieve system time.
uint32_t systime_t
System time.
uint32_t dataLen
Definition: sftp_common.h:229
uint8_t value[]
Definition: tcp.h:369