<!DOCTYPE html>
<!--[if lt IE 7 ]> <html lang="en" class="no-js ie6"> <![endif]-->
<!--[if IE 7 ]>    <html lang="en" class="no-js ie7"> <![endif]-->
<!--[if IE 8 ]>    <html lang="en" class="no-js ie8"> <![endif]-->
<!--[if IE 9 ]>    <html lang="en" class="no-js ie9"> <![endif]-->
<!--[if (gt IE 9)|!(IE)]><!--> <html lang="en" class="no-js"> <!--<![endif]-->
  <head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">

    <!-- OpenID delegation -->
    <link rel="openid.server" href="http://pip.verisignlabs.com/server" />
    <link rel="openid.delegate" href="http://dougbarth.pip.verisignlabs.com" />
    <link rel="openid2.provider" href="http://pip.verisignlabs.com/server" />
    <link rel="openid2.local_id" href="http://dougbarth.pip.verisignlabs.com" />

    <meta http-equiv="X-XRDS-Location" content="http://pip.verisignlabs.com/user/dougbarth/yadisxrds" />
    <title>
      
        Rabbit on a Leash — Rate Limited AMQP subscriptions |
      
      Doug Barth
    </title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link href="http://feeds.feedburner.com/DougBarth" rel="alternate" title="Doug Barth" type="application/atom+xml" />
    <link rel="stylesheet" href="/css/style.css">
<link rel="stylesheet" media="handheld" href="/css/handheld.css">
<link rel="stylesheet" media="screen" href="/css/pygments/default.css" />

<link  href="http://fonts.googleapis.com/css?family=PT+Serif+Caption:regular,italic" rel="stylesheet" type="text/css" >

 
    <script src="/js/libs/modernizr-1.7.min.js"></script>
    <script type="text/javascript">

  var _gaq = _gaq || [];
  _gaq.push(['_setAccount', 'UA-24007925-1']);
  _gaq.push(['_trackPageview']);
  _gaq.push(['_trackPageLoadTime']);

  (function() {
    var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
    ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
    var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
  })();

</script>

 
    <!-- SOPA & PIPA are horrible for the internet -->
    <script type="text/javascript">var a=new Date,b=a.getUTCHours();if(0==a.getUTCMonth()&&2012==a.getUTCFullYear()&&((18==a.getUTCDate()&&13<=b)||(19==a.getUTCDate()&&0>=b)))window.location="http://sopastrike.com/strike";</script>
  </head>
  <body>
    <img class="bg" src="/img/bg.jpg" />
    <div id="container">
      <header>
<h1><a href="/">Doug Barth</a></h1>
</header>


      <div id="main" role="main">
        <article>
  <header>
  <h1><a href="/2011/06/10/keeping-the-rabbit-on-a-leash.textile">Rabbit on a Leash — Rate Limited AMQP subscriptions</a></h1>
  <small>Posted <time pubdate datetime="2011-06-10 00:00:00 +0000">10 Jun 2011</time></small>
  </header>

  "RabbitMQ":http://www.rabbitmq.com is fast: "really fast":http://www.rabbitmq.com/faq.html#performance. Consuming messages from a queue is extremely efficient. Consumers declare the queues they are subscribing to and the broker pushes messages to the consumer for processing as soon as they are ready. The "AMQP protocol":http://www.amqp.org, which RabbitMQ implements, supports the concept of limiting how many outstanding messages a consumer can be tasked with processing via the @prefetch_count@ and @no_ack@ headers, but it does not have a way to control the rate of delivery of messages to consumers.  

At "Signal":http://www.signalhq.com, we use RabbitMQ for all our queueing infrastructure needs. Outgoing SMS messages (MTs) are queued for a pool of workers to send to our aggregator. Our connection to our SMS aggregator requires us to limit the rate that we send messages to their system. It would seem that RabbitMQ is a poor fit for that use case, but we've been able to fulfill it using a bit of client side code and the existing AMQP protocol.  

Using AMQP's @prefetch_count@, client acks and a blocking token bucket, it's possible to implement rate controlled processing of queued messages.

h1. Token Buckets

A token bucket is an algorithm that is used to control the rate of data that flows through a system[1]. Token buckets can be configured to allow traffic to burst to full speed, but they ensure that the average traffic processed is held at a configurable rate.

The concept of a token bucket is rather simple. Imagine the bucket in your freezer's ice maker. Cubes of ice are added to the bucket at a certain rate (say 1 a second). The size of the bucket controls how many ice cubes (tokens) we can have waiting in the bucket before we will stop making more.

In order for traffic to be processed, we need to take a token (or more) from that bucket. If the bucket is empty, that work cannot be processed. The rate that tokens are added to the bucket controls the average speed that work is processed. If we started with an empty bucket we could process work at a rate equal to the rate that we added cubes of ice. The size of the bucket controls how much work we can burst. If the bucket held 10 tokens, we could process 10 units of work at full speed before we would be rate limited.

h1. Putting it all together

With a correctly working token bucket, implementing fixed rate processing is fairly straightforward. First, when subscribing to a queue, we set an explicit @prefetch_count@ on the channel and we set @no_ack@ to false when subscribing. The @prefetch_count@ limits how many unacked messages RabbitMQ will deliver and @no_ack@ allows us to acknowledge the message once we've finished processing it. In our application, we size the @prefetch_count@ so there are a few seconds worth of messages waiting in the worker's memory to be sent.

We use the token bucket to control our rate of processing these messages from RabbitMQ. We need to take a token from the bucket before processing a message. If the bucket is empty, we block until a new token is added. 

<figure class="highlight"><pre><code class="language-ruby" data-lang="ruby"><span class="no">EM</span><span class="p">.</span><span class="nf">run</span> <span class="k">do</span>
  <span class="n">channel</span> <span class="o">=</span> <span class="no">AQMP</span><span class="o">::</span><span class="no">Channel</span><span class="p">.</span><span class="nf">new</span>

  <span class="c1"># Allow 10 unacked messages to be delivered to this worker.</span>
  <span class="n">channel</span><span class="p">.</span><span class="nf">prefetch</span><span class="p">(</span><span class="mi">10</span><span class="p">)</span>

  <span class="c1"># Configure this worker to send at 1 msg/s on average with occassional bursts</span>
  <span class="c1"># up to 5 messages.</span>
  <span class="n">token_bucket</span> <span class="o">=</span> <span class="no">TokenBucket</span><span class="p">.</span><span class="nf">new</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">5</span><span class="p">)</span>

  <span class="c1"># Refresh the token bucket every second. The bucket is also refreshed</span>
  <span class="c1"># when the take method is called.</span>
  <span class="no">EM</span><span class="p">.</span><span class="nf">add_periodic_timer</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span> <span class="p">{</span> <span class="n">token_bucket</span><span class="p">.</span><span class="nf">refresh</span> <span class="p">}</span>
 
  <span class="c1"># We subscribe with explicit acknowledgments so we can signal to RabbitMQ</span>
  <span class="c1"># that more work should be delivered. Without this setting, RabbitMQ would</span>
  <span class="c1"># send work over to us as fast as possible.</span>
  <span class="n">channel</span><span class="p">.</span><span class="nf">queue</span><span class="p">(</span><span class="s1">'send_mt'</span><span class="p">).</span><span class="nf">subscribe</span><span class="p">(</span><span class="ss">:ack</span> <span class="o">=&gt;</span> <span class="kp">true</span><span class="p">)</span> <span class="k">do</span> <span class="o">|</span><span class="n">header</span><span class="p">,</span> <span class="n">message</span><span class="o">|</span>
    <span class="c1"># Defer the processing to a background thread since taking a token from</span>
    <span class="c1"># the bucket could potentially be a blocking operation and we don't want to</span>
    <span class="c1"># block the reactor.</span>
    <span class="no">EM</span><span class="p">.</span><span class="nf">defer</span><span class="p">(</span>
      <span class="nb">lambda</span> <span class="p">{</span>
        <span class="c1"># Takes 1 token from the bucket. If the bucket is empty, this</span>
        <span class="c1"># method will block.</span>
        <span class="n">token_bucket</span><span class="p">.</span><span class="nf">take</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>

        <span class="n">process_message</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>

        <span class="c1"># Acknowledge this message, allowing RabbitMQ to send more work.</span>
        <span class="n">header</span><span class="p">.</span><span class="nf">ack</span>
      <span class="p">}</span>
    <span class="p">)</span>
  <span class="k">end</span>
<span class="k">end</span></code></pre></figure>

fn1. "Token Buckets (Wikipedia)":http://en.wikipedia.org/wiki/Token_bucket

<article>

      </div>

      <footer class="clearfix">
<section id="hi">
<h1>Hi, I'm Doug</h1>
<p>
I'm an <a href="http://www.pagerduty.com/">Operations Engineer at
  PagerDuty</a>, where I'm responsible for our production infrastructure.
I used to work at <a href="http://www.signalhe.com">Signal</a> and <a href="http://www.orbitz.com">Orbitz</a>.
</p>
<p>
I'm also a husband &amp; father. I took this background picture while on vacation in Mexico.
</p>
</section>

<section id="where">
<h1>Where to Find Me</h1>
<ul>
  <li><a href="http://www.twitter.com/dougbarth">twitter.com/dougbarth</a></li>
  <li><a href="http://www.github.com/dougbarth">github.com/dougbarth</a></li>
  <li><a href="mailto:dougbarth@gmail.com">dougbarth@gmail.com</a></li>
</ul>
</section>
</footer>

    </div>

    <script src="//ajax.googleapis.com/ajax/libs/jquery/1.5.1/jquery.min.js"></script>
    <script>!window.jQuery && document.write(unescape('%3Cscript src="/js/libs/jquery-1.5.1.min.js"%3E%3C/script%3E'))</script>
    <!--[if lt IE 7 ]>
    <script src="/js/libs/dd_belatedpng.js"></script>
    <script> DD_belatedPNG.fix('img, .png_bg');</script>
    <![endif]-->
  </body>
</html>
