mqtt_client_misc.c
Go to the documentation of this file.
1 /**
2  * @file mqtt_client_misc.c
3  * @brief Helper functions for MQTT client
4  *
5  * @section License
6  *
7  * SPDX-License-Identifier: GPL-2.0-or-later
8  *
9  * Copyright (C) 2010-2023 Oryx Embedded SARL. All rights reserved.
10  *
11  * This file is part of CycloneTCP Open.
12  *
13  * This program is free software; you can redistribute it and/or
14  * modify it under the terms of the GNU General Public License
15  * as published by the Free Software Foundation; either version 2
16  * of the License, or (at your option) any later version.
17  *
18  * This program is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software Foundation,
25  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26  *
27  * @author Oryx Embedded SARL (www.oryx-embedded.com)
28  * @version 2.2.4
29  **/
30 
31 //Switch to the appropriate trace level
32 #define TRACE_LEVEL MQTT_TRACE_LEVEL
33 
34 //Dependencies
35 #include "core/net.h"
36 #include "mqtt/mqtt_client.h"
39 #include "mqtt/mqtt_client_misc.h"
40 #include "debug.h"
41 
42 //Check TCP/IP stack configuration
43 #if (MQTT_CLIENT_SUPPORT == ENABLED)
44 
45 
46 /**
47  * @brief Update MQTT client state
48  * @param[in] context Pointer to the MQTT client context
49  * @param[in] newState New state to switch to
50  **/
51 
53  MqttClientState newState)
54 {
55  //Switch to the new state
56  context->state = newState;
57 }
58 
59 
60 /**
61  * @brief Process MQTT client events
62  * @param[in] context Pointer to the MQTT client context
63  * @param[in] timeout Maximum time to wait before returning
64  * @return Error code
65  **/
66 
68 {
69  error_t error;
70  size_t n;
71 
72  //It is the responsibility of the client to ensure that the interval
73  //between control packets being sent does not exceed the keep-alive value
74  error = mqttClientCheckKeepAlive(context);
75 
76  //Check status code
77  if(!error)
78  {
79  //Check current state
80  if(context->state == MQTT_CLIENT_STATE_IDLE ||
81  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
82  {
83  //Wait for incoming data
84  error = mqttClientWaitForData(context, timeout);
85 
86  //Check status code
87  if(!error)
88  {
89  //Initialize context
90  context->packet = context->buffer;
91  context->packetPos = 0;
92  context->packetLen = 0;
93  context->remainingLen = 0;
94 
95  //Start receiving the packet
97  }
98  }
99  else if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
100  {
101  //Receive the incoming packet
102  error = mqttClientReceivePacket(context);
103 
104  //Check status code
105  if(!error)
106  {
107  //Process MQTT control packet
108  error = mqttClientProcessPacket(context);
109 
110  //Update MQTT client state
111  if(context->state == MQTT_CLIENT_STATE_RECEIVING_PACKET)
112  {
113  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
115  else
117  }
118  }
119  }
120  else if(context->state == MQTT_CLIENT_STATE_SENDING_PACKET)
121  {
122  //Any remaining data to be sent?
123  if(context->packetPos < context->packetLen)
124  {
125  //Send more data
126  error = mqttClientSendData(context, context->packet + context->packetPos,
127  context->packetLen - context->packetPos, &n, 0);
128 
129  //Advance data pointer
130  context->packetPos += n;
131  }
132  else
133  {
134  //Save the time at which the message was sent
135  context->keepAliveTimestamp = osGetSystemTime();
136 
137  //Update MQTT client state
138  if(context->packetType == MQTT_PACKET_TYPE_INVALID)
140  else
142  }
143  }
144  }
145 
146  //Return status code
147  return error;
148 }
149 
150 
151 /**
152  * @brief Check keep-alive time interval
153  * @param[in] context Pointer to the MQTT client context
154  * @return Error code
155  **/
156 
158 {
159  error_t error;
160  systime_t time;
161  systime_t keepAlive;
162 
163  //Initialize status code
164  error = NO_ERROR;
165 
166  //In the absence of sending any other control packets, the client must
167  //send a PINGREQ packet
168  if(context->state == MQTT_CLIENT_STATE_IDLE ||
169  context->state == MQTT_CLIENT_STATE_PACKET_SENT)
170  {
171  //A keep-alive value of zero has the effect of turning off the keep
172  //alive mechanism
173  if(context->settings.keepAlive != 0)
174  {
175  //Get current time
176  time = osGetSystemTime();
177 
178  //Convert the keep-alive value to milliseconds
179  keepAlive = context->settings.keepAlive * 1000;
180 
181  //It is the responsibility of the client to ensure that the interval
182  //between control packets being sent does not exceed the keep-alive value
183  if(timeCompare(time, context->keepAliveTimestamp + keepAlive) >= 0)
184  {
185  //Format PINGREQ packet
186  error = mqttClientFormatPingReq(context);
187 
188  //Check status code
189  if(!error)
190  {
191  //Debug message
192  TRACE_INFO("MQTT: Sending PINGREQ packet (%" PRIuSIZE " bytes)...\r\n", context->packetLen);
193  TRACE_DEBUG_ARRAY(" ", context->packet, context->packetLen);
194 
195  //Point to the beginning of the packet
196  context->packetPos = 0;
197 
198  //Send PINGREQ packet
200  }
201  }
202  }
203  }
204 
205  //Return status code
206  return error;
207 }
208 
209 
210 /**
211  * @brief Serialize fixed header
212  * @param[in] buffer Pointer to the output buffer
213  * @param[in,out] pos Current position
214  * @param[in] type MQTT control packet type
215  * @param[in] dup DUP flag
216  * @param[in] qos QoS field
217  * @param[in] retain RETAIN flag
218  * @param[in] remainingLen Length of the variable header and the payload
219  * @return Error code
220  **/
221 
222 error_t mqttSerializeHeader(uint8_t *buffer, size_t *pos, MqttPacketType type,
223  bool_t dup, MqttQosLevel qos, bool_t retain, size_t remainingLen)
224 {
225  uint_t i;
226  uint_t k;
227  size_t n;
228  MqttPacketHeader *header;
229 
230  //Point to the current position
231  n = *pos;
232 
233  //The Remaining Length is encoded using a variable length encoding scheme
234  if(remainingLen < 128)
235  k = 1;
236  else if(remainingLen < 16384)
237  k = 2;
238  else if(remainingLen < 2097152)
239  k = 3;
240  else if(remainingLen < 268435456)
241  k = 4;
242  else
243  return ERROR_INVALID_LENGTH;
244 
245  //Sanity check
246  if(n < (sizeof(MqttPacketHeader) + k))
247  return ERROR_BUFFER_OVERFLOW;
248 
249  //Position where to format the header
250  n -= sizeof(MqttPacketHeader) + k;
251 
252  //Point to the MQTT packet header
253  header = (MqttPacketHeader *) (buffer + n);
254 
255  //Encode the first byte of the header
256  header->type = type;
257  header->dup = dup;
258  header->qos = qos;
259  header->retain = retain;
260 
261  //Encode the Remaining Length field
262  for(i = 0; i < k; i++)
263  {
264  //The least significant seven bits of each byte encode the data
265  header->length[i] = remainingLen & 0xFF;
266  remainingLen >>= 7;
267 
268  //The most significant bit is used to indicate that there are
269  //following bytes in the representation
270  if(remainingLen > 0)
271  header->length[i] |= 0x80;
272  }
273 
274  //Update current position
275  *pos = n;
276 
277  //Successful processing
278  return NO_ERROR;
279 }
280 
281 
282 /**
283  * @brief Write a 8-bit integer to the output buffer
284  * @param[in] buffer Pointer to the output buffer
285  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
286  * @param[in,out] pos Current position
287  * @param[in] value 8-bit integer to be serialized
288  * @return Error code
289  **/
290 
291 error_t mqttSerializeByte(uint8_t *buffer, size_t bufferLen,
292  size_t *pos, uint8_t value)
293 {
294  size_t n;
295 
296  //Point to the current position
297  n = *pos;
298 
299  //Make sure the output buffer is large enough
300  if((n + sizeof(uint8_t)) > bufferLen)
301  return ERROR_BUFFER_OVERFLOW;
302 
303  //Write the byte to the output buffer
304  buffer[n++] = value;
305 
306  //Advance current position
307  *pos = n;
308 
309  //Successful processing
310  return NO_ERROR;
311 }
312 
313 
314 /**
315  * @brief Write a 16-bit integer to the output buffer
316  * @param[in] buffer Pointer to the output buffer
317  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
318  * @param[in,out] pos Current position
319  * @param[in] value 16-bit integer to be serialized
320  * @return Error code
321  **/
322 
323 error_t mqttSerializeShort(uint8_t *buffer, size_t bufferLen,
324  size_t *pos, uint16_t value)
325 {
326  size_t n;
327 
328  //Point to the current position
329  n = *pos;
330 
331  //Make sure the output buffer is large enough
332  if((n + sizeof(uint16_t)) > bufferLen)
333  return ERROR_BUFFER_OVERFLOW;
334 
335  //Write the short integer to the output buffer
336  buffer[n++] = MSB(value);
337  buffer[n++] = LSB(value);
338 
339  //Advance current position
340  *pos = n;
341 
342  //Successful processing
343  return NO_ERROR;
344 }
345 
346 
347 /**
348  * @brief Serialize string
349  * @param[in] buffer Pointer to the output buffer
350  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
351  * @param[in,out] pos Current position
352  * @param[in] string Pointer to the string to be serialized
353  * @param[in] stringLen Length of the string, in bytes
354  * @return Error code
355  **/
356 
357 error_t mqttSerializeString(uint8_t *buffer, size_t bufferLen,
358  size_t *pos, const void *string, size_t stringLen)
359 {
360  size_t n;
361 
362  //Point to the current position
363  n = *pos;
364 
365  //Make sure the output buffer is large enough to hold the string
366  if((n + sizeof(uint16_t) + stringLen) > bufferLen)
367  return ERROR_BUFFER_OVERFLOW;
368 
369  //Encode the length field
370  buffer[n++] = MSB(stringLen);
371  buffer[n++] = LSB(stringLen);
372 
373  //Write the string to the output buffer
374  osMemcpy(buffer + n, string, stringLen);
375 
376  //Advance current position
377  *pos = n + stringLen;
378 
379  //Successful processing
380  return NO_ERROR;
381 }
382 
383 
384 /**
385  * @brief Serialize raw data
386  * @param[in] buffer Pointer to the output buffer
387  * @param[in] bufferLen Maximum number of bytes the output buffer can hold
388  * @param[in,out] pos Current position
389  * @param[in] data Pointer to the raw data to be serialized
390  * @param[in] dataLen Length of the raw data, in bytes
391  * @return Error code
392  **/
393 
394 error_t mqttSerializeData(uint8_t *buffer, size_t bufferLen,
395  size_t *pos, const void *data, size_t dataLen)
396 {
397  size_t n;
398 
399  //Point to the current position
400  n = *pos;
401 
402  //Make sure the output buffer is large enough to hold the data
403  if((n + dataLen) > bufferLen)
404  return ERROR_BUFFER_OVERFLOW;
405 
406  //Write the data to the output buffer
407  osMemcpy(buffer + n, data, dataLen);
408 
409  //Advance current position
410  *pos = n + dataLen;
411 
412  //Successful processing
413  return NO_ERROR;
414 }
415 
416 
417 /**
418  * @brief Deserialize fixed header
419  * @param[in] buffer Pointer to the input buffer
420  * @param[in] bufferLen Length of the input buffer
421  * @param[in,out] pos Current position
422  * @param[out] type MQTT control packet type
423  * @param[out] dup DUP flag from the fixed header
424  * @param[out] qos QoS field from the fixed header
425  * @param[out] retain RETAIN flag from the fixed header
426  * @param[out] remainingLen Length of the variable header and the payload
427  * @return Error code
428  **/
429 
430 error_t mqttDeserializeHeader(uint8_t *buffer, size_t bufferLen, size_t *pos,
431  MqttPacketType *type, bool_t *dup, MqttQosLevel *qos, bool_t *retain, size_t *remainingLen)
432 {
433  uint_t i;
434  size_t n;
435  MqttPacketHeader *header;
436 
437  //Point to the current position
438  n = *pos;
439 
440  //Make sure the input buffer is large enough
441  if((n + sizeof(MqttPacketHeader)) > bufferLen)
442  return ERROR_INVALID_LENGTH;
443 
444  //Point to the MQTT packet header
445  header = (MqttPacketHeader *) (buffer + n);
446 
447  //Save MQTT control packet type
448  *type = (MqttPacketType) header->type;
449 
450  //Save flags
451  *dup = header->dup;
452  *qos = (MqttQosLevel) header->qos;
453  *retain = header->retain;
454 
455  //Advance current position
456  n += sizeof(MqttPacketHeader);
457 
458  //Prepare to decode the Remaining Length field
459  *remainingLen = 0;
460 
461  //The Remaining Length is encoded using a variable length encoding scheme
462  for(i = 0; i < 4; i++)
463  {
464  //Sanity check
465  if((n + sizeof(uint8_t)) > bufferLen)
466  return ERROR_INVALID_LENGTH;
467 
468  //Advance current position
469  n += sizeof(uint8_t);
470 
471  //The most significant bit is used to indicate that there are
472  //following bytes in the representation
473  if(header->length[i] & 0x80)
474  {
475  //Applications can send control packets of size up to 256 MB
476  if(i == 3)
477  return ERROR_INVALID_SYNTAX;
478 
479  //The least significant seven bits of each byte encode the data
480  *remainingLen |= (header->length[i] & 0x7F) << (7 * i);
481  }
482  else
483  {
484  //The least significant seven bits of each byte encode the data
485  *remainingLen |= header->length[i] << (7 * i);
486  //This is the last byte
487  break;
488  }
489  }
490 
491  //Return the current position
492  *pos = n;
493 
494  //Successful processing
495  return NO_ERROR;
496 }
497 
498 
499 /**
500  * @brief Read a 8-bit integer from the input buffer
501  * @param[in] buffer Pointer to the input buffer
502  * @param[in] bufferLen Length of the input buffer
503  * @param[in,out] pos Current position
504  * @param[out] value Value of the 8-bit integer
505  * @return Error code
506  **/
507 
508 error_t mqttDeserializeByte(uint8_t *buffer, size_t bufferLen,
509  size_t *pos, uint8_t *value)
510 {
511  size_t n;
512 
513  //Point to the current position
514  n = *pos;
515 
516  //Make sure the input buffer is large enough
517  if((n + sizeof(uint8_t)) > bufferLen)
518  return ERROR_BUFFER_OVERFLOW;
519 
520  //Read the short integer from the input buffer
521  *value = buffer[n];
522 
523  //Advance current position
524  *pos = n + sizeof(uint8_t);
525 
526  //Successful processing
527  return NO_ERROR;
528 }
529 
530 
531 /**
532  * @brief Read a 16-bit integer from the input buffer
533  * @param[in] buffer Pointer to the input buffer
534  * @param[in] bufferLen Length of the input buffer
535  * @param[in,out] pos Current position
536  * @param[out] value Value of the 16-bit integer
537  * @return Error code
538  **/
539 
540 error_t mqttDeserializeShort(uint8_t *buffer, size_t bufferLen,
541  size_t *pos, uint16_t *value)
542 {
543  size_t n;
544 
545  //Point to the current position
546  n = *pos;
547 
548  //Make sure the input buffer is large enough
549  if((n + sizeof(uint16_t)) > bufferLen)
550  return ERROR_BUFFER_OVERFLOW;
551 
552  //Read the short integer from the input buffer
553  *value = (buffer[n] << 8) | buffer[n + 1];
554 
555  //Advance current position
556  *pos = n + sizeof(uint16_t);
557 
558  //Successful processing
559  return NO_ERROR;
560 }
561 
562 
563 /**
564  * @brief Deserialize string
565  * @param[in] buffer Pointer to the input buffer
566  * @param[in] bufferLen Length of the input buffer
567  * @param[in,out] pos Current position
568  * @param[out] string Pointer to the string
569  * @param[out] stringLen Length of the string, in bytes
570  * @return Error code
571  **/
572 
573 error_t mqttDeserializeString(uint8_t *buffer, size_t bufferLen,
574  size_t *pos, char_t **string, size_t *stringLen)
575 {
576  size_t n;
577 
578  //Point to the current position
579  n = *pos;
580 
581  //Make sure the input buffer is large enough
582  if((n + sizeof(uint16_t)) > bufferLen)
583  return ERROR_BUFFER_OVERFLOW;
584 
585  //Decode the length field
586  *stringLen = (buffer[n] << 8) | buffer[n + 1];
587 
588  //Make sure the input buffer is large enough
589  if((n + sizeof(uint16_t) + *stringLen) > bufferLen)
590  return ERROR_BUFFER_OVERFLOW;
591 
592  //Read the string from the input buffer
593  *string = (char_t *) buffer + n + 2;
594 
595  //Advance current position
596  *pos = n + 2 + *stringLen;
597 
598  //Successful processing
599  return NO_ERROR;
600 }
601 
602 
603 /**
604  * @brief Determine whether a timeout error has occurred
605  * @param[in] context Pointer to the MQTT client context
606  * @return Error code
607  **/
608 
610 {
611 #if (NET_RTOS_SUPPORT == DISABLED)
612  error_t error;
613  systime_t time;
614 
615  //Get current time
616  time = osGetSystemTime();
617 
618  //Check whether the timeout has elapsed
619  if(timeCompare(time, context->startTime + context->settings.timeout) >= 0)
620  {
621  //Report a timeout error
622  error = ERROR_TIMEOUT;
623  }
624  else
625  {
626  //The operation would block
627  error = ERROR_WOULD_BLOCK;
628  }
629 
630  //Return status code
631  return error;
632 #else
633  //Report a timeout error
634  return ERROR_TIMEOUT;
635 #endif
636 }
637 
638 #endif
error_t mqttSerializeData(uint8_t *buffer, size_t bufferLen, size_t *pos, const void *data, size_t dataLen)
Serialize raw data.
int bool_t
Definition: compiler_port.h:53
@ MQTT_PACKET_TYPE_INVALID
Invalid packet.
Definition: mqtt_common.h:100
@ ERROR_WOULD_BLOCK
Definition: error.h:96
uint8_t data[]
Definition: ethernet.h:220
error_t mqttClientFormatPingReq(MqttClientContext *context)
Format PINGREQ packet.
@ ERROR_BUFFER_OVERFLOW
Definition: error.h:142
error_t mqttDeserializeHeader(uint8_t *buffer, size_t bufferLen, size_t *pos, MqttPacketType *type, bool_t *dup, MqttQosLevel *qos, bool_t *retain, size_t *remainingLen)
Deserialize fixed header.
__start_packed struct @0 MqttPacketHeader
Fixed header.
error_t mqttClientReceivePacket(MqttClientContext *context)
Receive MQTT packet.
error_t mqttClientSendData(MqttClientContext *context, const void *data, size_t length, size_t *written, uint_t flags)
Send data using the relevant transport protocol.
error_t mqttDeserializeString(uint8_t *buffer, size_t bufferLen, size_t *pos, char_t **string, size_t *stringLen)
Deserialize string.
error_t mqttClientWaitForData(MqttClientContext *context, systime_t timeout)
Wait for incoming data.
uint8_t qos
Definition: mqtt_common.h:179
error_t mqttDeserializeByte(uint8_t *buffer, size_t bufferLen, size_t *pos, uint8_t *value)
Read a 8-bit integer from the input buffer.
@ MQTT_CLIENT_STATE_SENDING_PACKET
Definition: mqtt_client.h:165
#define timeCompare(t1, t2)
Definition: os_port.h:42
error_t mqttSerializeHeader(uint8_t *buffer, size_t *pos, MqttPacketType type, bool_t dup, MqttQosLevel qos, bool_t retain, size_t remainingLen)
Serialize fixed header.
#define osMemcpy(dest, src, length)
Definition: os_port.h:140
Helper functions for MQTT client.
void mqttClientChangeState(MqttClientContext *context, MqttClientState newState)
Update MQTT client state.
char_t type
error_t
Error codes.
Definition: error.h:43
Transport protocol abstraction layer.
error_t mqttClientProcessEvents(MqttClientContext *context, systime_t timeout)
Process MQTT client events.
@ MQTT_CLIENT_STATE_PACKET_SENT
Definition: mqtt_client.h:166
error_t mqttSerializeByte(uint8_t *buffer, size_t bufferLen, size_t *pos, uint8_t value)
Write a 8-bit integer to the output buffer.
MqttQosLevel
Quality of service level.
Definition: mqtt_common.h:87
uint8_t retain
Definition: mqtt_common.h:178
uint8_t value[]
Definition: tcp.h:367
@ ERROR_INVALID_LENGTH
Definition: error.h:111
MQTT packet parsing and formatting.
#define MSB(x)
Definition: os_port.h:61
#define TRACE_INFO(...)
Definition: debug.h:95
#define LSB(x)
Definition: os_port.h:57
error_t mqttClientProcessPacket(MqttClientContext *context)
Process incoming MQTT packet.
uint32_t dataLen
Definition: sftp_common.h:227
@ MQTT_CLIENT_STATE_RECEIVING_PACKET
Definition: mqtt_client.h:168
uint32_t systime_t
System time.
@ MQTT_CLIENT_STATE_IDLE
Definition: mqtt_client.h:164
@ ERROR_TIMEOUT
Definition: error.h:95
char char_t
Definition: compiler_port.h:48
uint32_t time
#define TRACE_DEBUG_ARRAY(p, a, n)
Definition: debug.h:108
uint8_t n
error_t mqttClientCheckTimeout(MqttClientContext *context)
Determine whether a timeout error has occurred.
error_t mqttClientCheckKeepAlive(MqttClientContext *context)
Check keep-alive time interval.
MqttClientState
MQTT client states.
Definition: mqtt_client.h:160
@ ERROR_INVALID_SYNTAX
Definition: error.h:68
uint8_t dup
Definition: mqtt_common.h:180
#define MqttClientContext
Definition: mqtt_client.h:147
error_t mqttDeserializeShort(uint8_t *buffer, size_t bufferLen, size_t *pos, uint16_t *value)
Read a 16-bit integer from the input buffer.
error_t mqttSerializeShort(uint8_t *buffer, size_t bufferLen, size_t *pos, uint16_t value)
Write a 16-bit integer to the output buffer.
#define PRIuSIZE
unsigned int uint_t
Definition: compiler_port.h:50
TCP/IP stack core.
error_t mqttSerializeString(uint8_t *buffer, size_t bufferLen, size_t *pos, const void *string, size_t stringLen)
Serialize string.
@ NO_ERROR
Success.
Definition: error.h:44
MqttPacketType
MQTT control packet type.
Definition: mqtt_common.h:99
Debugging facilities.
systime_t osGetSystemTime(void)
Retrieve system time.
MQTT client.