As mentioned in a previous blog post, at Signal we use RabbitMQ for all our queuing needs. Though the AMQP protocol supports the concept of priority, RabbitMQ does not yet implement that feature. However, with a little bit of creative client side code, we’ve been able to approximate priority in our application in a way that will allow us to seamlessly remove that workaround once RabbitMQ implements priority internally.
AMQP’s priority support
The AMQP protocol supports up to 10 levels of priority, starting at zero. 0 has the lowest priority and 9 has the highest. The priority of a message is set by the publisher using the priority header. The consumer will then have messages pushed to it in priority order.
Brokers that say they fully conform to the spec must implement at least 2 levels of priority. In that case, the 10 levels of priority are broken up into 2 ranges: 0-4 and 5-9. Each range is then treated as a single priority level.
Dealing with it on the client
To workaround the lack of priority support in RabbitMQ, we need to split one logical queue into several physical queues based on priority. The consumer pulls messages from all queues, but gives preference to the higher priority queues if work is available.
At Signal, we made this nuance transparent to our application by adding a layer of indirection. Our application code publishes using the logical queue name and priority through a common function. That function then determines the physical queue to publish to based on the priority (eg. 0-4 goes to the slow queue, 5-9 goes to the fast queue). Consumers subscribe to both queues, but always work the higher priority queues first.
Our first implementation had two physical queues (fast & slow), and consumers popped messages off those queues in priority order. First, they would try to pop a message off the fast queue. If a message was returned, they processed it and then tried popping from the fast queue again. If no message was on the fast queue, tried popping from the slow queue. If a message was on the slow queue, they processed it and then tried popping from the fast queue immediately. If no message was on the slow queue, we would sleep for a short while to ensure that we don’t overwhelm the broker with polling.
This approach worked well enough, but when queues had a large backlog, we found RabbitMQ would eat up a bunch of CPU processing the polling logic. Clearly, we needed to switch to a subscription model, so RabbitMQ will push messages to us with less overhead.
Dealing with the subscription flood
Switching to an AMQP subscription model presents a new set of issues: namely, subscriptions in AMQP push messages to consumers as quickly the network interface is serviced. Since we process work on a different thread than EventMachine’s reactor thread (to leave it free to publish new messages), messages will be pulled off the network interface and into memory faster than we can process them. Without an approach to managing the flood of messages coming into the consumer, we would be unable to guarantee a priority order processing of messages nor keep our process from dying due to memory usage.
To manage the influx of messages for standard AMQP subscriptions, we lean on prefetch counts and explicit client acks. The prefetch count, set on a channel, determines how many unacked messages a channel may be sent before the broker will wait for acknowledgments.
To service multiple subscriptions in priority order, we subscribe to each queue and put incoming messages on an in memory priority queue. Each subscription is on its own channel, with an appropriate prefetch count: higher prefetch counts for higher priority queues. The sizing of the prefetch counts are crucial. If they are too low, low priority messages that are in the worker’s memory will be processed ahead of higher priority messages that need to be sent from the server. In our application, we found that a good rule of thumb is to double the prefetch count for each higher level of priority.
As messages are received from RabbitMQ, we put them on an in memory priority queue. A background worker thread pulls messages from that queue (blocking if empty) and processes them. After a message is processed, an ack is sent to RabbitMQ to signal that a new message of that same priority can be sent.
Add a third queue
With 2 levels of priority approximation, we were able to ensure high priority outbound SMS messages (eg. responding to a STOP text message) were processed before lower priority messages (eg. a scheduled blast to a subscription list). This approach worked great until we started having clients who’s lists were large enough to saturate our outbound connections for long periods of time. Our solution was to add a third priority queue and send really large blasts with the lowest priority and smaller blasts with the middle priority level. With all the plumbing in place, adding this extra queue was little more than adding the third queue and adjusting the priority ranges.
After living with this priority approximation for over 2 years, we are very pleased. Other than the few incremental changes, we’ve had no issues supporting this solution in production. If you’re evaluating RabbitMQ and lack of proper priority has you down, I’d suggest implementing this solution.