eMQTT upcoming change to the publish cycle

In the actual version 1 scheme, the MQTT publish cycle was simple and fast but added some burden to the developer to handle publish while receiving reentrancy issue.

Please refer to this page for a presentation of the main drawback.

The implementation was built this way for numerous reasons:

  1. No packet copying and storage. By calling the messageReceived callback when it has just received the packet (and before it overwrites it with a new one in the publish cycle) the client avoids copying the packet and can deal with it on low memory system.
  2. Simple state machine. By performing the publish cycle in the event loop (and publish method), the client doesn't have to perform state tracking for packet. Only one packet state is tracked at any given time
  3. Low latency. By calling the callback as soon as the packet is received achieve the lowest possible latency whatever the current packet's Quality of Service level.

The drawbacks are:

  1. No publishing while receiving. If we allow publishing while receiving, possible packet loss could happen when the broker is mixing the QoS and packet transmission.
  2. Only one packet on the wire at a given time. Because the client is performing the complete publish cycle for each step, only one packet can be in transit at any time
  3. Locking constraint on the client. Typically, the client locks the implementation when performing a function (like receiving, publishing, subscribing, etc...). If you are locking another mutex in your receive code, and try to publish in another thread where you have an mutex, you'll get a classical A-B / B-A locking pattern deadlock.

In the MQTT v5 standard, we have those guarantees:

  1. The sent packet ordering is kept (section 4.6). So it's not possible to have a QoS packet A then B to be sent in reverse order (said differently, ACK for packet B will never appear before ACK for packet A)
  2. Packet resent only happen upon network disconnection and reconnection without a clean restart, it's not compliant to resend a packet while still being connected (section 4.4).
  3. it's possible to mitigate the need for storing packet by setting the Receive Maximum CONNECT's property to 1, so no new QoS packet can happen until the current is acknowledged completely.

Possible implementation

Locking constraint

To loose the locking constraint drawback, we can remove the global action lock and instead move to a local socket lock (so we prevent mixing bytes in the socket buffer if used from multiple thread). With a socket lock, a complete packet is received or sent but never half a packet.

We must also prevent having 2 publish actions with QoS running at the same time (resp. receiving 2 publish with QoS at the same time).

Doing so is only possible if we are able to guarantee progress in all tasks of the client (for example, in a publish cycle a publish lock is used so the second publish would be serialized after the former is acknowledged).

Thus, a specific buffer sized to hold temporary packet for either PUBACK/PUBREC/PUBREL/PUBCOMP is required too.

In that case, we could get such behavior for the communication:

sequenceDiagram Client->>+Broker: PUBLISH QoS 1, id=1 Broker-->>+Client: PUBLISH QoS 0, id=357 Note left of Client: Unexpected publish packet in answer Client-->>-Broker: PUBACK id=357 Broker->>-Client: PUBACK id=1

When the client entered the publish cycle for packet ID 1, it was expecting a PUBACK from the broker, but instead got a new PUBLISH message. Typically, this can trigger the following actions in the code:

  1. User call publish method
  2. Client takes the publish action lock
  3. Client prepares the PUBLISH packet and send it
  4. Client enters the publishCycle method with sending flag
  5. Client calls the publishReceive method that's fetching only the packet type.
  6. Case 1 Most likely path
    1. Client receives a QoS ACK packet header
    2. Client finishes receiving such packet to the ACK temporary buffer
    3. Jumps to step 9
  7. Case 2 Possible path
    1. Client receives a PUBLISH packet in the current recvBuffer
    2. Client remembers that a publish packet is in the recvBuffer and proceed with Acknowledging it using the ACK temporary buffers.
    3. Notice that broker can't send another QoS packet here, so no reentrancy is required. It can send a QoS=0 PUBLISH packet here that'll be lost by the client since there's no place to store it (it'll be dropped).
    4. After the received PUBLISH packet is acknowledged, go back to step 5 above.
  8. Case 3 Unlikely path
    1. Client receives a DISCONNECT packet or a network disconnection signal
    2. In that case, resending the message is required upon reconnection (or not), but left up to the caller to perform
    3. So the methods returns in chain with an network error up to the publish method so the user can deal with the error the way she intended.
  9. Client finishes publishCycle, releases the lock and returns from publish method
  10. In the next eventLoop call, if there is a publish packet in the recvBuffer (from step 7.2), it'll trigger the usual messageReceived callback.
  11. Notice that in that case, the latency will be higher for this packet since the user will only be notified after calling the eventLoop even if the packet is already downloaded.

Almighty event loop

Let's consider another possible implementation, that's probably more close to what MQTT standard intended.

The client will be constructed with a user defined Receive Maximum CONNECT's property (defaults to 1), later called RcvMax. The higher the property value, the higher the requirements for memory buffers in the client.

The property implies storing that number of PUBLISH packet in a buffer in case of needing to retransmit it.

Currently, the client simply errors out when disconnected (or network connection is dropped). It is up to the application to reconnect if that's what required. If the client were to continue doing this, no packet buffer would be required, but it would be non compliant with the standard.

In the new implementation proposed below, the client will auto reconnect, so it must store the connect properties and attributes (additional storage requirements).

Let's see what would happen in the different phases of the communication

Asynchronous publishing

  1. In the publish method, the client checks the QoS level for the packet.
  2. If the packet is QoS 0, it serializes the PUBLISH packet, then it takes the socket's lock and immediately publish it and returns.
  3. If the QoS is non-zero, the client checks the packet ID buffer (see below) for an empty place.
  4. If no free place is found, the method returns with an error. The user must call the eventLoop to let the publish cycle proceed. It's now possible to do so in the caller code at the cost of increased stack usage.
  5. Else, the client serialize the PUBLISH packet, then stores the packet in a specific buffer (via a callback). It also stores the packet ID in a (RcvMax sized) buffer (containing packet ID for each QoS level). The buffer for the QoS 2 level also contains RcvMax boolean to know if the PUBREC packet was received (see below).
  6. It then exits the method

Event loop

  1. The client takes the socket lock for receiving a packet. It can be an publish cycle packet: PUBACK, a PUBREC, a PUBREL, a PUBCOMP or an unrelated packet (like PUBLISH or DISCONNECT or PING or PONG ...)
  2. If the packet is a publish cycle packet, the client checks its packet ID buffer (sending and receiving packet ID buffers).
  3. If a packet with similar ID is found in the buffer, the next packet in the publish cycle is created and sent. Depending on the QoS level, the saved packet in the buffer might be released (on PUBACK or PUBREC) and the packet ID can be released to (in PUBACK or PUBCOMP).
  4. Else, if the packet isn't a publish cycle, usual behavior is done as currently
  5. Finally, a received PUBLISH packet will trigger calling the messageReceived callback as usual.
  6. If this packet isn't QoS 0, then its ID is saved in a receive packet ID buffer (there are 2 buffers, one for QoS 1 and one for QoS 2)
  7. If the received packet ID buffer isn't empty, an publish cycle answer is created (PUBACK for QoS 1 or PUBREC and PUBCOMP for QoS 2) and sent with the socket locked.
  8. For QoS 1, the packet ID is released from the buffer once the PUBACK is sent, or once the PUBCOMP is sent for QoS 2.
  9. The event loop returns.

That implementation present multiple advantages and few drawbacks.

First, it doesn't suffer from re-entrancy issue (it's possible to publish anytime, even inside the messageReceived callback), no possible deadlock (locking is small and progress is always made). Callbacks are always called with no lock taken.

For QoS 0, the latency is minimum. For higher QoS, the latency is minimal for publishing or receiving (since any packet's action is done immediately), but the QoS overhead is delayed (it might take longer since one must wait for the messageReceived's callback execution to continue). This means that if there are multiple serialized PUBLISH packet with high QoS on the line, the second packet will suffer from a longer latency.

Then, it can support publishing with multiples packet pending on the wire.

The main drawback of this implementation is the requirement for additional memory buffers:

  1. Storing the CONNECT parameters (this might be a security issue to save them in memory) for automatic reconnection. Instead, the implementation will provide a connectionLost callback where the user can decide to connect again by providing the necessary parameters. This imply a reentrancy for any method that can handle network failure.
  2. Storing 4 packet ID buffer. There are 2 types of buffers (one for QoS 1 where the 16-bit packet ID is stored per bucket, and one for QoS 2 where the 16-bit packet ID is stored plus a boolean value to know what stage of the publish cycle it is). The number of buckets in these buffers is small (there are RcvMax slots per buffer, so typically, for a RcvMax of 1, 12 bytes are required for all 4 buffers)
  3. Storing un-acknowledged packets. Since the client doesn't limit the PUBLISH packet size, creating a packet buffer for this beforehand would be a waste of resources. Instead the implementation will provide a savePacketBuffer and releasePacketBuffer and recallPacketBuffer callback where the user code will decide what to do with the packet. A possible solution would be to save the packet by allocating memory for it and deleting it when released (with the big drawback of memory fragmentation this implies). Or the user could simply save the source of the packet (what data where used to generate this packet and save the source instead indexed by the packet ID) (same drawback, but likely better memory usage). Or the user could copy the packet in a fixed size circular buffer (this would only work if the packet size is somehow small).
  4. If the user doesn't intend to reconnect upon network failure, then it's safe to simply do nothing in these callbacks since no retransmit will ever happen (the packet is then considered lost).

This implementation was chosen for version 2 of the library.