Using MongoDB as Publish Subscribe middleware

08/31/2012

Yesterday I mentioned on Twitter that I was playing with the MongoDB pub/sub features and that it worked quite well for my needs.

What I didn’t mention was that the documentation and blog posts were a bit all over the show and the Ruby examples I saw didn’t actually do what they said they did so I’ll show in this post working code and some basic approaches I took to deal with per consumer destinations etc.

Why?


So why would anyone want to use MongoDB as a queue or indeed MongoDB at all since everyone knows it’s unusable and does not save any data ever and kills kittens?

Actually MongoDB is a really nice database but like most NoSQL databases the thing to know about it is what shortcuts it takes with your data to do it’s magic. Knowing this you have to evaluate its suitability to your specific problem and if it’s not suitable, don’t use it.

It’s fast and has a flexible query system to search over arbitrary structured JSON data. Yes it has some interesting ideas about data durability but this is well known by now and if your needs match it’s features it’s not bad.

For shops with needs well suited to MongoDB who might want to add some queueing ability it can be daunting to bring in new tech like RabbitMQ or ActiveMQ because it brings new unknowns requires an investment in more monitoring, training and learning by making mistakes. If you already have a Mongo instance and know its quirks using it for a queue might not be such a terrible thing.

Additionally MongoDB is really easy to get going and generally I find for my work loads it just works with little maintenance required.

So my interest in its queueing abilities lies in providing a simpler ‘getting started’ for MCollective. New MCollective has pluggable discovery which works really well when discovering against a MongoDB cache of registration data so it would be nice if a simple starter edition setup could include both the queue and discovery data in one simple bit of software.

There are other options of course like Redis and I’ll evaluate them but of the various options MongoDB is the only one that comes with both pubsub and searching/querying capabilities that does what I need, isn’t written in Java and has OS packages for most distros easily available.

Background


In MongoDB when you do a find on a collection the returned result set is a Cursor. Cursors can have a number of modes or flags associated with them. Further it has something called Capped Collections that are fixed size and rotate old data out when they fill up.

The combination of some of these Cursor flags and Capped Collections enables a kind of tail -f behavior that works like a queue.

When you have a collection it usually returns nil when you reached the end of your results as can be seen here:

>> coll = db.collection('commands')
=> Mongo::DB:0x7fa1ae005f58 ....>
>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.next_document
=> nil

Here we opened a collections and did a find. We moved to the end of the results and fetched the next result which immediately returned a nil indicating there’s nothing new.

Lets see how we can change the behavior of this collection that instead of returning immediately it will block for a while waiting for a new document and then return a nil after after a timeout if nothing new was found:

>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
=> 2
>> cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
=> 34
>> loop { puts "#{Time.now}> Tailing...."; p cursor.next_document }
Fri Aug 31 13:40:19 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:21 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:23 +0100 2012> Tailing....
nil

Now instead of immediately returning a nil it will wait 2 to 3 seconds at the end of the collection incase new data comes.

So this is your consumer to the queue called commands here, anyone who saves data into the collection are producers. It’s quite light on resources on both the client and the MongoDB server, on a fairly low spec VM I was easily able to run 50+ consumers, a MongoDB instance and some producers.

MongoDB calls this feature Tailable Cursors and the thing the Ruby docs don’t tell you and that the Ruby library does not do for you is set the Mongo::Cursor::OP_QUERY_AWAIT_DATA option as above. Without this option it will still return nil immediately and the example code has a sleep to combat a infinite high resource usage loop. The proposed sleeping solution to the problem makes it completely pointless as a high performance Queue but the Mongo::Cursor::OP_QUERY_AWAIT_DATA option sorts that out.

A simple message structure


In my use case I have to be able to send messages to all consumers or sometimes just to a specific consumer. In other middleware you do this with different queue names or perhaps headers and then do selective subscribes to the queue picking off just the messages you are interested in.

I chose to use a single capped collection and use a structure similar to middleware headers to identify message targets:

{"headers" : {"target" : "all"},
 "payload" : "data"}

{"headers" : {"target" : "some.consumer"},
 "payload" : "data"}

The 2 examples show different target headers in one I am targeting everyone consuming the queue and in the 2nd one just a specific consumer. The payload can be anything, text, hashes whatever your needs are.

Lets look at a consumer that has a consumer name and that’s interested in messages directed at it or all consumers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@consumer_identity = "example.com"
@database = "queue"
@collection = "commands"
 
def get_collection
  @db ||= Mongo::Connection.new().db(@database)
 
  until @db.connection.active?
     puts ">>> Retrying database connection..."
     @db.connection.close
     @db.connection.connect
 
     sleep 0.5 unless @db.connection.active?
  end
 
  unless @db.collections.include?(@collection)
    coll = @db.create_collection(@collection, :capped => true, :size => 10240)
  else
    coll = @db.collection(@collection)
  end
 
  coll
end
 
loop do
  begin
    cursor = get_collection.find({"headers.target" => {"$in" => [@consumer_identity, "all"]}})
 
    # ignore old stuff
    cursor.skip(cursor.count)
 
    # blocking tail reads
    cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
    cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
 
    begin
      # fetch all the docs forever
      loop do
        if doc = cursor.next_document
          p doc["payload"]
        end
      end
    rescue Mongo::OperationFailure => e
      puts ">>> Cursor closed: %s (%s)" % [e.to_s, e.class]
    end
  rescue Mongo::ConnectionFailure
    puts ">>> DB connection failed"
  end
end

On line 30 we’re setting up a Cursor for all messages matching “all” and our identity. You can now simply publish data with correct headers to target specific consumers or all consumers. The 2 loops will forever attempt to reconnect to any failed database and forever read whatever new messages arrives after connection.

Using this method it’s really easy to come up with all kinds of addressing modes for your queue. For example you can give work being done a job name and combine it with the target header to create sets of named consumers that will all receive commands that match just the work they’re able to do.

Results


As I initially said I did all this to test a MCollective connector that uses MongoDB as a middleware. It worked surprisingly well and I have both broadcast and directed modes working:

$ mco ping
.
.
---- ping statistics ----
15 replies max: 57.94 min: 48.56 avg: 54.72

I’ll try out some other options for a small site or starter edition middleware and post follow up blog posts.

I’ll say I’ve been very surprised by how well this worked though. The connector is a bit complex and awkward because of how thread safety is handled in the MongoDB Ruby drivers but it’s not been a big problem overall to solve a pretty complex use case with this.

Specifically I noted performance didn’t degrade hugely with 50 nodes connected or with larger payloads which is very nice.