I recently needed to write a Java RabbitMQ client application which processes messages on a RabbitMQ queue in parallel on multiple threads. Unfortunately the RabbitMQ documentation is very unclear on how to do this, and how threads are actually handled in the RabbitMQ Java client library. I did some experiments, and stepped into the RabbitMQ Java client library code using a debugger; here is what I found out…
Queues and Exchanges
A RabbitMQ queue is a read-only list of messages. A message may only be read once from a queue - ie each message is processed by exactly one “consumer” (though a consumer might fail, resulting in the message being placed back on the queue). A RabbitMQ queue thus acts rather like the in-memory Deque datastructure. Broadcast of messages (single message goes to multiple recipients) is done via exchanges.
Code which publishes new messages does not write directly to a queue; instead the message is sent to an exchange which then places the message on zero or more of the queues which are “bound” to that exchange.
See this article for more information.
An application uses the RabbitMQ client library to establish a connection to the RabbitMQ server cluster using
ConnectionFactory.newConnection(...). A connection is actually a single TCP socket to a single RabbitMQ server instance. If that server instance fails, the client library will automatically connect to another server within the cluster.
An application can open multiple connections in parallel if it wishes; the RabbitMQ server(s) treats each connection as an independent client.
Method ‘newConnection’ takes an ExecutorService object as a parameter; this is used only for processing callbacks associated with “subscriptions” to a queue, ie due to calls to method
Each time the RabbitMQ client library opens a connection, it creates a dedicated thread for reading data sent to it from the server over that socket. This thread is not related to the ExecutorService object passed to method
The client and server sides of a connection send data packets to each other whenever they have a need to do so. There is no “master side” of a connection; each side simply “pushes” packets of data over the socket and the other end detects and reacts to them. Such data packets are called “data frames” within RabbitMQ. Often a logical block of data is transmitted as multiple data frames and the receiver needs to assemble them to obtain the full set of data for a logical operation. In particular, when the server allocates a message on a queue to a consumer in some client application, it pushes the message over the associated connection (TCP socket) as a sequence of 2 or 3 data frames; the RabbitMQ client library assembles the sequence of packets into a “message” then queues it for local execution.
A single connection has up to 64K logical channels associated with it.
Client RabbitMQ code must create channels, and any operation performed via the RabbitMQ client library must be done over some specific channel. Every data packet sent from a client to a RabbitMQ server over a connection includes the channel id within the data frame, and likewise each message from the server to a client includes the relevant channel id.
Channels are effectively single threaded from the point of view of code using the RabbitMQ client library; an operation performed while some other thread is operating on the same channel will generally block. There is more information on channels and threading later.
Channels are light-weight, so allocating dozens or hundreds of channels is not a problem.
For each connection, channel zero is an “admin” channel used for internal purposes, including creating other channels.
Active vs Reactive Processing
RabbitMQ clients can operate in two ways: actively call the server and wait for responses, or subscribe to messages/events and wait to be called. Setting up a client is always done actively, but acquiring/processing messages can be done in either manner.
Active fetching of messages is done via method
basicGet. In this approach, there should definitely only be one thread in the client application accessing a specific channel. The ExecutorService parameter to method
newConnection is unused in this approach.
Passive processing of messages is done by calling
basicConsume, passing a consumer object, and waiting for callbacks to be invoked on the consumer. As noted above, each call to
basicConsume is done via a channel. Multiple consumers can be registered for the same queue or different queues via a single channel if desired. However RabbitMQ will only ever execute one callback at a time for a specific channel. The callbacks are invoked via the ExecutorService associated with the connection - ie usually a threadpool. However there is no “dedicated thread” for a channel; each callback may be made via any of the threads from the ExecutorService.
Server-side Allocation of Messages to Consumers
A RabbitMQ server (or cluster) keeps track of the set of currently-connected consumer objects for each queue (see
basicConsume). As messages arrive on the queue, they are actively pushed to the client application over the associated connection (TCP socket) as a sequence of data frames. The data frames are labelled with the channel and consumer tag of the specific consumer object that the RabbitMQ server has chosen to handle that message. The role of the RabbitMQ client library is then just to deliver that specific message to that specific consumer object.
In the RabbitMQ Java client library, a thread is allocated for each connection (TCP socket) as it is opened. This thread is responsible for listening for new data frames transmitted over the socket, assembling them into complete datastructures, and invoking any channel-specific logic to handle that data frame/datastructure. However this thread never calls into application code.
In the case where a RabbitMQ server has allocated a message to a registered consumer object, and proactively pushed a message over a TCP socket, this client-side connection thread extracts the “channel id” from the data-frame and then invokes the “assembler” object for that channel. When the assembler determines that it has received all data-frames for a complete operation, then it looks at whether the operation is an “async” delivery of a message to a consumer, or a response to a synchronous RPC-style request initiated earlier from this client. Here the first async case is discussed; see below for information on the sync case.
Now that the connection-handler thread has determined it has a message to be delivered to a consumer on a specific channel, it extracts the consumer tag from the message header to see exactly which consumer it is (
basicConsume can be called multiple times on the same channel). A reference to the specific
Consumer instance for that channel with that consumer tag is retrieved; a
Runnable object is then created which is able to invoke method
consume on that specific consumer instance, passing the new message and this runnable object is placed in a channel-specific queue of work to be done. Finally, the thread checks whether there is an existing Runnable object scheduled or running for that channel; if not then the Runnable object is registered with the ExecutorService for execution. When there is already a callback registered with the ExecutorService then nothing is done; once the ExecutorService has completed processing of the already-scheduled callback then it will look for and register the next one. This approach ensures that only one Runnable object for each channel is on the ExecutorService workqueue at a time, and thus no concurrent callbacks for the same channel will occur.
The RabbitMQ server may proactively “push” multiple messages to a client application; these get buffered in memory in the client until the consumer callbacks can be invoked. Proactive pushing can significantly improve performance, but of course such messages are then not available to other processes which may also be connected to RabbitMQ so it should not be too aggressive. The number of such “prefetched” (actually, pre-pushed) messages can be tuned per-channel or per-connection with the QOS (quality-of-service) methods. See later for more information on QOS.
Most RabbitMQ operations initiated from the client (eg createQueue, basicGet, basicConsume) are synchrononous.
Each call from a thread in the client application is made via a channel object. Only one synchronous operation is allowed per channel at a time; the calling thread waits within the RabbitMQ client library until it obtains an exclusive lock on the channel. It then builds the data frames necessary for the synchronous request, obtains an exclusive lock on the RabbitMQ connection (tcp socket), sends the dataframes, and releases the lock on the connection. It then blocks, waiting to be notified that a response for the synchronous operation has been received.
The single connection-handler thread eventually receives a data frame from the server labelled with channel=N and type=synchronous; it passes this to the “assembler” object associated with the specified channel. When the assembler has received all data frame objects necessary to form a complete response, it places the response in a field of the channel object and notifies the blocked client thread. The connection-handler thread then returns to waiting for new data frames from the server, while the client thread retrieves the response for its synchronous operation from the channel object and returns back into the calling code. At this point, another thread waiting to perform a synchronous operation on the same channel may obtain the channel lock and start transmission.
Because only one synchronous call is active within a channel at a time, the “response” data frames from the server do not need to be tagged with a “correlation id” linking them to a specific request (as is done in some other messaging systems); the channel-id is sufficient to identify the call the response belongs to.
The documentation for class channel states rather vaguely that “some methods should not be called from multiple threads”. As far as I can see, the whole class is indeed thread-safe; however it makes little sense to call synchronous operations from multiple threads when only one outbound call may be active at a time; better to have a 1-thread:1-channel model for calls from application code to a specific channel. There is one exception: consumer callbacks invoked by RabbitMQ due to an earlier call to
basicConsume are done in the context of a thread allocated from the ExecutorService specified when the connection was created. This thread may be different on each callback, but it is safe (and necessary) to invoke synchronous operations on the associated channel from this thread - not only because channel really is thread-safe, but also because RabbitMQ never invokes multiple consumer callbacks for the same channel concurrently.
Quality of Service
RabbitMQ servers track the number of unacknowledged messages delivered to each connection, and channel within that connection. When this value reaches a limit then RabbitMQ stops delivering new messages to that channel or connection until some already-delivered messages have been acknowledged.
The client-library method
basicAck is used to send acknowledgement messages to the server. This method is asynchronous; no reply from the server is expected and therefore the caller does not block. The caller thread does take some locks (first on the channel and then on the connection) so can wait for a small amount of time, but then simply sends the ack data-frame over the outgoing socket and returns to the caller.
The client library method
basicQOS allows the threshold for unacknowledged messages to be tuned. The RabbitMQ server attempts to push all messages it receives on a queue to subscribers immediately; as the default QOS limit is infinite, this means that server-side RabbitMQ queues tend to remain at length zero with large numbers of messages being buffered in client applications which have subscribed to those queues. In general it is therefore advisable for client code to set a reasonable basicQOS for each channel - somewhere in the range 20 to 50 is a good choice.
When registering a consumer on a queue via method
Channel.basicConsume, there are two options which may not be entirely obvious:
Nolocal handles the case where a single connection is used for both publishing and consuming messages. When “local” is allowed, then messages published over the connection can be delivered directly to a consumer registered on the same connection without ever transmitting the message to the server. When nolocal is selected, then published messages are always pushed to the server (the “quick path” is ignored).
The exclusive flag specifies whether this consumer should be the only one reading from the queue; if exclusive is true and another consumer is already registered for the queue then the call fails. Once an exclusive consumer is registered, any other attempt to call basicConsume on the same queue (whether exclusive or not) will not be allowed. This ensures “single threaded” access to the queue.
Note that an exclusive flag is also available when creating a queue. This is completely different from the “exclusive consumer” setting; an exclusive queue is a temporary queue which is removed when the connection over which it was defined is closed. One use for such a temporary queue is to bind it to an exchange of type “fanout”; that queue will then receive a copy of every message written to the exchange - but only while the original connection is open.
Processing Messages in Parallel
Now that so many topics have been addressed, it is finally possible to answer the original question: how can messages from a single queue be processed in parallel?
When using the “polling” approach (
basicGet) then the maximum parallelism is fairly obvious. Only one synchronous call is permitted per channel, so it is necessary to allocate one channel per thread.
basicGet is a synchronous operation and only retrieves one message at a time from the server (no internal batching at all). It therefore has much poorer performance than the consumer (subscriber aka
basicConsume) approach, and should be avoided for high-performance systems. It also has no timeout, ie will block if no messages are available; it should therefore be executed in a thread dedicated to message processing. The blocking call can be interrupted by closing the associated channel.
When using the “consumer” approach (
basicConsume, ie rabbitmq invokes application code when a message is available) then the maximum parallelism is:
min(nchannels, threadpoolsize)where the threadpool is the one associated with the ExecutorService passed to
The callbacks registered via
basicConsume for different channels will be invoked in parallel; callbacks for multiple calls to
basicConsume for the same channel are not. The thread used for each callback may be different on each call - the callbacks are done via the ExecutorService.
When a consumer has been registered, the server can pro-actively push messages to the client, which improves performance (a kind of server-side “prefetch”).
Each processed message must be “acked” at some time. Method
basicAck does provide an option to perform “multiple acks” but there does not seem to be any reasonable way to take advantage of this when using a consumer, ie each message must simply be acked after it as processed. As discussed in the section on quality-of-service, acks are asynchronous (“fire and forget” type operations), so sending a separate ack per message is not likely to be a performance bottleneck.
A channel may specify mode “auto-acknowledge” when calling
basicGet or when subscribing to a queue with
basicConsume. This is the most efficient way of dealing with message acknowledgements, but can also lead to data loss in the case of a client crash or a message-handling method throwing an unexpected exception.
The alternative is for client code to call
basicAck explicitly. As already mentioned.
basicAck is an asynchronous method (the server sends no response), so is fairly efficient to call from client code.
It is important not to forget acknowledgements; eventually RabbitMQ will stop delivering messages (assuming a reasonable QOS has been defined). And on client application shutdown, all non-acknowledged messages will be placed back on the original queue and delivered to a different client.
There is a “multiple ack” option to acknowledge multiple messages in a single
basicAck call; however I am not sure how this is supposed to be used. When using
basicGet, the synchronous one-by-one retrieval of messages will be the performance bottleneck, not the acknowledgements. And when using either
basicConsume, the message-handling thread never knows when the next message will be received - if the queue is empty then it can wait a very long time for the next call. It therefore seems necessary to acknowledge each message as it is processed; if the acknowledgement is deliberately postponed until later (to batch acknowledgements), I see no way in which such a batched acknowledgement can be sent within a suitably bounded time-period. There is one possible condition where it makes sense: a single-threaded “exclusive” consumer for a queue can count the number of messages waiting on the queue and therefore know that it will receive more messages and can therefore postpone acks until a later message has been processed. However I expect this is an unusual case, and that sending an ack for each message is the necessary approach in most cases.
RabbitMQ and AMQP
AMQP is a wire-level standard for communication between message brokers; it allows messages to be passed from one message-broker type to another. Q: is it also used in the client-lib/broker communication?
A RabbitMQ channel is an AMQP session. A RabbitMQ Ack message is an AMQP disposition frame.
AMQP is primarily used for:
- defining the data-format for message headers (so headers can be interpreted by any broker)
- defining a set of default headers (eg time-to-live, priority)
- defining the control-packets transferred (transfer, disposition, etc)
JMS is a Java-specific API, where client-side libraries for different brokers can be plugged in. The “wire protocol” is broker-specific but the client-side code to use it is identical. In contrast, AMQP is a wire-protocol, allowing client libraries to work with any AMQP broker, regardless of which API they offer client-side code.
The AMQP Specification provides the full details.
The AMQP specification strongly influences the RabbitMQ API design. A RabbitMQ Channel object is actually an AMQP “session” consisting of a pair of AMQP channels (one on client, one on server). A subscription to a queue (ie a call to basicConsume) establishes an AMQP link belonging to a specific AMQP session.
The Java application I used when verifying the content of this article is here.
The Maven pomfile used to build the above is here