Wheleph's blog    About    Feed

On RabbitMQ automatic acknowledgement and reliable message processing

RabbitMQ automatic acknowledgement

One common task when working with RabbitMQ is asynchronous consumption of messages from a given queue. RabbitMQ Java client library (com.rabbitmq:amqp-client) allows you to do that via set of overloaded methods com.rabbitmq.client.Channel.basicConsume. Here's an example:

channel.basicConsume(QUEUE_NAME, true /* auto-ack */, consumer);

The second parameter in the example above is for auto acknowledgement mode:

@param autoAck true if the server should consider messages
acknowledged once delivered; false if the server should expect
explicit acknowledgements

Acknowledgement is basically a notification for RabbitMQ broker that a particular message has been processed and the broker doesn't need to worry about it anymore. In particular it shouldn't redeliver to message to this or some other consumer.

Auto acknowledgement frees a developer from the need to explicitly acknowledge incoming messages. However it also has reliability implications which we are going to consider further.

This article uses com.rabbitmq:amqp-client version 3.5.1 and rabbitmq-server version 3.5.1-1.

Reliability implications of automatic acknowledgements

Suppose that we have a consumer with auto acknowledgement enabled that is subscribed to queue hello and it takes 0.5 second to process a message:

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class ConcurrentRecv {
    private static final Logger logger = LoggerFactory.getLogger(ConcurrentRecv.class);

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        final Connection connection = connectionFactory.newConnection();
        final Channel channel = connection.createChannel();

        logger.info(" [*] Waiting for messages. To exit press CTRL+C");

        registerConsumer(channel, 500);
    }

    private static void registerConsumer(final Channel channel, final int timeout)
            throws IOException {
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                logger.info(String.format("Received (channel %d) %s",
                        channel.getChannelNumber(),
                        new String(body)));

                try {
                    Thread.sleep(timeout);
                } catch (InterruptedException e) {
                }
            }
        };

        channel.basicConsume(QUEUE_NAME, true /* auto-ack */, consumer);
    }
}

Now suppose that we produced 100 messages in hello via MultipleSend:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class MultipleSend {
    private static final Logger logger = LoggerFactory.getLogger(MultipleSend.class);

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(QUEUE_NAME, "fanout");
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, QUEUE_NAME, "");

        for (int i = 0; i < 100; i++) {
            String message = "Hello world" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            logger.info(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }
}

After that we launched our consumer ConcurrentRecv and not surprisingly it started consuming the messages taking 0.5 second for each:

18:07:31,735 [main] ConcurrentRecv -  [*] Waiting for messages. To exit press CTRL+C
18:07:31,825 [pool-1-thread-3] ConcurrentRecv - Received (channel 1) Hello world0
18:07:32,325 [pool-1-thread-3] ConcurrentRecv - Received (channel 1) Hello world1
18:07:32,826 [pool-1-thread-3] ConcurrentRecv - Received (channel 1) Hello world2

So far so good but let's suppose that at some point before all the messages are consumed we stopped ConcurrentRecv. Here are the last log messages:

18:07:50,351 [pool-1-thread-5] ConcurrentRecv - Received (channel 1) Hello world37
18:07:50,852 [pool-1-thread-5] ConcurrentRecv - Received (channel 1) Hello world38
18:07:51,352 [pool-1-thread-5] ConcurrentRecv - Received (channel 1) Hello world39

Process finished with exit code 130

What's interesting though is that after we restart ConcurrentRecv no messages are consumed:

18:22:50,989 [main] ConcurrentRecv -  [*] Waiting for messages. To exit press CTRL+C

This means that 60 messages (40-99) are lost.

But stats of queue hello gathered by RabbitMQ management plugin explains everything:

Management plugin screenshot: auto ack

At the top chart you can see that number of queued messages quickly goes up to 100 when MultipleSend published its messages. Then it stays at 100 until ConcurrentRecv is launched an after that immediately does down. So at 18:07:40 there are 0 messages in the RabbitMQ queue according to the chart but at 18:07:51 only 40 messages got processed according to log of ConcurrentRecv. So it's no longer surprising that after restart of the consumer it doesn't process the rest of the messages. That's because no messages are left in queue!

But why the RabbitMQ queue was drained so fast? Well you may know that there's a work pool (com.rabbitmq.client.impl.WorkPool) behind implementation of com.rabbitmq.client.Connection that stores incoming messages until they are picked up by free worker threads. If there are more incoming messages than the threads then the messages get stack up until the threads become available.

Sidenote: in fact if we use single instance of com.rabbitmq.client.Channel (as in our example) then incomming messages are going to be processed serially (which you can see in the log output). Which slows down the processing even more. But why it happens that way is a topic for separate discussion.

The reason is that in our example we use automatic acknowledgement of the messages which sends ack back to the broker as soon as the message is put into the work pool but not when the message is actually processed. And if application is shut down while there are still messages waiting for free workers, those messages will be lost (because they are stored in memory).

Using explicit acknowledgement to avoid lost messages

An obvious way to fix this problem is to turn off automatic acknowldedgement:

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class ConcurrentRecv {
    private static final Logger logger = LoggerFactory.getLogger(ConcurrentRecv.class);

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        final Connection connection = connectionFactory.newConnection();
        final Channel channel = connection.createChannel();

        logger.info(" [*] Waiting for messages. To exit press CTRL+C");

        final boolean autoAck = false;

        registerConsumer(channel, autoAck, 500);
    }

    private static void registerConsumer(final Channel channel, final boolean autoAck, final int timeout)
            throws IOException {
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                logger.info(String.format("Received (channel %d) %s",
                        channel.getChannelNumber(),
                        new String(body)));

                try {
                    Thread.sleep(timeout);
                } catch (InterruptedException e) {
                }

                if (!autoAck) {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

Notice that we do 2 things differently:

  1. Set autoAck to false when doing consumer subscription.
  2. Explicitly send acknowledgement at the end of message processing.

This causes broker not to remove a message from the queue until it's got processed.

And now if we run the very same test with 100 messages in the queue we see the difference:

Management plugin screenshot: no auto ack

Now we see that the number of messages does not drop to 0 immediately but slowly reduces as they get processed. We can do even some math to explain what happens. From the top chart we see that about 40 messages were removed from the queue in 20 seconds which is roughly 2 messages/seconds that is equal to the processing time (0.5 seconds per message). Then starting from 00:38:10 there's a horizontal line at ~60. That's a moment when we stopped ConcurrentRecv. But the messages are still in RabbitMQ queue so not surpisingly if we launch the consumer again it starts where it left.

And logs show it as well. Here're a few last log messages before ConcurrentRecv was stopped:

00:38:05,610 [pool-1-thread-5] ConcurrentRecv - Received (channel 1) Hello world40
00:38:06,111 [pool-1-thread-5] ConcurrentRecv - Received (channel 1) Hello world41
00:38:06,612 [pool-1-thread-5] ConcurrentRecv - Received (channel 1) Hello world42

And here're the first log messages after the relaunch:

00:38:43,134 [main] ConcurrentRecv -  [*] Waiting for messages. To exit press CTRL+C
00:38:43,167 [pool-1-thread-3] ConcurrentRecv - Received (channel 1) Hello world42
00:38:43,668 [pool-1-thread-3] ConcurrentRecv - Received (channel 1) Hello world43
00:38:44,169 [pool-1-thread-3] ConcurrentRecv - Received (channel 1) Hello world44

If you give ConcurrentRecv enough time it's going to consume all remaining messages. And as you see nothing was lost.

There's one caveat though. If you look at the output carefully you'll notice that message "Hello world42" appears in the output twice:

  • As the last line before shutdown
  • As the first line after relaunch

That's easily explainable because ConcurrentRecv was shut down in the middle of processing of the message but before sending ack to the broker. That's why it was delivered for the second time. So it's a good idea to make your consumer idempotent to make sure that this caveat does not cause any inconsistencies.