Faisal's Interactions


Beanstalk – Part 2

Posted in Uncategorized by f10i on November 3, 2009
Tags: , ,

Now that we have setup the beanstalk server, it’s time to start using it. First thing we need to do is to install a beanstalk client. Check this list for a client appropriate to your programming language. I am using ruby, so I installed my client using rubygems:

gem install beanstalk-client

All following code is run using ruby’s interactive environment, irb. Feel free to use a different environment, or even write a script file and run that. I prefer the interactive environment as it gives me immediate feedback of my work.

First thing you need to do, is to connect to the beanstalk server:
bs = Beanstalk::Pool.new('127.0.0.1:11300')

You can check the status of your beanstalk server:
bs.stats

To add a new message to the queue, you simply “put” it:
bs.put('Hello World!')

Or, if you are like me, whenever I am sending complex data (anything other than a single string), I use Hashes. Beanstalk offers the “yput” convenience method that converts the input data into a YAML string.
bs.yput({:name => 'Faisal', :blog => 'f10i.wordpress.com'})

Queues are a First-In-First-Out (FIFO) data structure. That means that messages are fetched in the same order they were placed in the queue. You can modify that behavior by assigning priority to messages. A message with high priority will be fetched before messages with lower priority, regardless of its position in the queue.

Priority is indicated by an integer number, the smaller the number the higher the priority. By default all messages are added with priority 65536. Messages that have the same priority number are fetched according to normal FIFO.
bs.put("Message with high priority", 100)
bs.put("Message with low priority", 500)
bs.put("Message with lowest priority")
bs.put("Message with highest priority", 1)
bs.put("Another message with low priority", 500)

The above messages would be fetched in the following order: 4 – 1 – 2 – 5 – 3

You can also delay a message from entering the queue for a specified amount of time. By default all messages have delay 0, and are immediately available for consumption
bs.put("This message will not be available until 20 seconds pass", 65536, 20)

The final option to adding a message to the queue is to assign it a time-to-run value. What this does is that once a client “reserves” a message (explained below), the client would have time-to-run seconds to manipulate the message and either delete, release or bury the message within that time (those actions are explained below). If the client fails to take any of those actions within the time-to-run, the server would automatically release the job, and make it available again in the queue. By default, all messages have 120 seconds as time-to-run.
bs.put("Message with high priority (1), delayed 0 seconds, and have 1 hour time to run", 1, 0, 3600)

Now that we know how to add messages to the queue, let’s get that information out of the queue. You fetch a message by “reserving” it so that no other client can get that message while your client is processing it.
message = bs.reserve

Note that reserve is a BLOCKING method call. What this means is that when you call reserve it’ll try to fetch a message from the queue. If no message is available, it will block, and keep waiting for a message on the queue. Your code won’t progress until a message is retrieved. If what you want is to check for a message and return regardless if a message exists or not, you need to specify a timeout to the reserve method call:
message = bs.reserve(2)

The code will now try to fetch a message from the queue. If no message is available, it will wait for 2 seconds, before throwing a timeout exception.

You can check the message status, using the “stats” method:
message.stats

Now that we fetched a message from the queue, call the “body” method to get the message contents. If you used “yput” to place a YAML representation of your message, you can call the “ybody” method to unserialize the YAML string.
bs.put("Hello There")
message = bs.reserve
message.body -> Hello There
bs.yput({:name => 'Faisal', :blog => 'f10i.wordpress.com'})
message = bs.reserve
message.ybody -> {:name => 'Faisal', :blog => 'f10i.wordpress.com'}

Once you reserve a message, you have time-to-run seconds (default 120) to process this message and issue one of the following commands:
message.delete -> Deletes the message
message.release -> Releases the message back into the queue and becomes available for fetching again
message.bury -> Didn't actually use this command, so don't know much about it

If you do not take one of those actions within the time-to-run, the server would automatically release the message.

If message processing is taking too long, and you do not want to take any of those actions, you can “touch” the message to reset the time-to-run counter.
message.touch

One last thing you can do with a message is to “put_back” in the queue.
message.put_back

This is similar to “release” in the sense that it puts the message back in the queue. However some major differences do exist:

  1. Whereas “release” method releases the message back into the queue, the “put_back” method creates a new identical message and puts the new message in the queue. The old message is not modified or manipulated at all.
  2. The “put_back” action is not considered one of the actions to be done within the time-to-run, and running it does not remove the necessity to take one of the actions mentioned earlier

And that’s about it for beanstalk clients. In my next article, I’ll explore beanstalk queues, or tubes, which allows you to have multiple simultaneous separate queues to add and consume messages.


Follow

Get every new post delivered to your Inbox.