How to allow EasyNetQ finish processing message on application stop
This post was written as an answer to one of the questions on EasyNetQ user group.
The main principle of EasyNetQ bus is simplicity. It greatly abstracts nuances of communicating with RabbitMQ giving developers time to concentrate on writing the core application. The framework is great and makes processing messages really simple. But there are few scenarios when this simplicity becomes a small pain. One of those times is when you want to gracefully stop the application allowing it to finish processing current message.
This post is rather lengthy but I wanted to provide good explanation.
When it comes to acknowledging messages, the EasyNetQ takes fully care of that. The only case when message will be NACKed is when the subscriber was disposed during processing a message. In any other case including errors1 the message is ACKed. There is no manual ACK or NACK command available on the bus.
Unsubscribing from the bus
To unsubscribe from the bus you need to call
Dispose() method on the result returned from the
Subscribe() call. This will stop processing of the message and prevent new messages from delivering.
var consumer = bus.SubscribeAsync("stockChecker", m => Task.Factory.StartNew(() => StockCheckerHandler(m))); consumer.Dispose();
Allowing to finish processing message
If you want to allow your consumer code finish processing a message, before calling Dispose() you need to wait until the handling is completed and the bus ACKs the message.
Waiting for bus to ACK message
Unfortunately there is no event which will be raised when the ACK happened. You can assume the message was ACKed when there is new message delivered for processing. So as long as there are messages waiting in the queue we can detect ACK. But what if there are no more messages in the queue? Well, you need to wait some arbitrary time to allow bus finishing the operation. In most cases it is very fast and a second or few should be enough. In the worst case scenario the message returns back to the queue and will be processed again.
Putting it all together
I am using
CancellationToken to signal application stop request and
ManualResetEvent to signal when message is being processed. The same
ManualResetEvent is also used to signal delivery of new message for processing which tells us that message was ACKed and we may
Dispose consumer. When waiting on
ManualResetEvent to signal, you can specify a timeout which is used here to force application stop if processing doesn’t finish in timely fashion.
There are four cases to consider.
APPLICATION STOPPED WHEN NOTHING IS PROCESSED
Simplest case. When application stop requested and the consumer doesn’t process any message and we can safely
CONSUMER BUSY AND THERE ARE MORE MESSAGES IN THE QUEUE
As mentioned earlier on, new message delivered to consumer is a signal that current message was ACKed (
line 73) and we can proceed with shutting down. To prevent loosing new message the consumer thread sleeps infinitely (
line 74). That makes disposed bus to NACK the message moving it back to the top of the queue.
Main application thread signals the stop was requested using
line 43) and starts waiting for consumer to finish handling current message (
line 49). Providing time out value to
WaitOne method allows for forcing handler to stop in case it takes to long for it to finish and show message on the screen. If handling the message finishes before time out (
line 86) then main thread receives signal (
line 54) and starts waiting to see whether there is new message for processing (
Bus delivers new message to the consumer which detects that application stop was requested (
line 71), signals that back to main thread (
line 72) and sleeps the thread infinitely.
Main thread receive the signal (
line 57) meaning message was ACKed and consumer can be disposed (
line 59). This will trigger NACK of the new message delivered by the bus. Finally, the bus is disposed and application stops.
CONSUMER BUSY BUT NO MORE MESSAGES IN THE QUEUE
This is the trickiest path. Because there is no notification when bus ACKed the message you can only sleep main thread giving enough time for the bus to finish operation. Using
WaitOne with time out (
line 57) of few seconds should guarantee successful ACK in most cases.
CONSUMER DOESN’T FINISH PROCESSING MESSAGE IN TIMELY FASHION
To make sure the application doesn’t hung on “exit” because long running consumer, when waiting on the signal that current message was processed you can use time out value (
line 49). In case the wait timed out the application shows a message and disposes consumer. The message will go back to the queue (bus will NACK it) and will be ready for processing again.
Generally, message handlers should be designed in a way that processing doesn’t take to long. In cases long processing is required it is a good practice to periodically check
CancellationToken for signal that application is shutting down, and if that happens to save current state of processing. When the messages is picked up again you can continue processing from the last known stage.
As mentioned earlier, in small number of cases the consumer can be disposed before bus ACKes the message. This will happen only when there is no new message for processing and system gets busy, slowing down code execution in EasyNetQ. If that happens the message goes back to the queue and will be processed again. Assuming all consumer code is idempotent this will only result in loosing some processing time.
Below is a gist with sample implementation. To run it just call
After application ends you should see 4 messages left on the