Yesterday I mentioned on Twitter that I was playing with the MongoDB pub/sub features and that it worked quite well for my needs.
What I didn’t mention was that the documentation and blog posts were a bit all over the show and the Ruby examples I saw didn’t actually do what they said they did so I’ll show in this post working code and some basic approaches I took to deal with per consumer destinations etc.
Why?
So why would anyone want to use MongoDB as a queue or indeed MongoDB at all since everyone knows it’s unusable and does not save any data ever and kills kittens?
Actually MongoDB is a really nice database but like most NoSQL databases the thing to know about it is what shortcuts it takes with your data to do it’s magic. Knowing this you have to evaluate its suitability to your specific problem and if it’s not suitable, don’t use it.
It’s fast and has a flexible query system to search over arbitrary structured JSON data. Yes it has some interesting ideas about data durability but this is well known by now and if your needs match it’s features it’s not bad.
For shops with needs well suited to MongoDB who might want to add some queueing ability it can be daunting to bring in new tech like RabbitMQ or ActiveMQ because it brings new unknowns requires an investment in more monitoring, training and learning by making mistakes. If you already have a Mongo instance and know its quirks using it for a queue might not be such a terrible thing.
Additionally MongoDB is really easy to get going and generally I find for my work loads it just works with little maintenance required.
So my interest in its queueing abilities lies in providing a simpler ‘getting started’ for MCollective. New MCollective has pluggable discovery which works really well when discovering against a MongoDB cache of registration data so it would be nice if a simple starter edition setup could include both the queue and discovery data in one simple bit of software.
There are other options of course like Redis and I’ll evaluate them but of the various options MongoDB is the only one that comes with both pubsub and searching/querying capabilities that does what I need, isn’t written in Java and has OS packages for most distros easily available.
Background
In MongoDB when you do a find on a collection the returned result set is a Cursor. Cursors can have a number of modes or flags associated with them. Further it has something called Capped Collections that are fixed size and rotate old data out when they fill up.
The combination of some of these Cursor flags and Capped Collections enables a kind of tail -f behavior that works like a queue.
When you have a collection it usually returns nil when you reached the end of your results as can be seen here:
>> coll = db.collection('commands') => Mongo::DB:0x7fa1ae005f58 ....> >> cursor = coll.find() => Mongo::Cursor:0x3fd0d6f61184 ....> >> cursor.skip(cursor.count) => Mongo::Cursor:0x3fd0d6f61184 ....> >> cursor.next_document => nil |
Here we opened a collections and did a find. We moved to the end of the results and fetched the next result which immediately returned a nil indicating there’s nothing new.
Lets see how we can change the behavior of this collection that instead of returning immediately it will block for a while waiting for a new document and then return a nil after after a timeout if nothing new was found:
>> cursor = coll.find() => Mongo::Cursor:0x3fd0d6f61184 ....> >> cursor.skip(cursor.count) => Mongo::Cursor:0x3fd0d6f61184 ....> >> cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE) => 2 >> cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA) => 34 >> loop { puts "#{Time.now}> Tailing...."; p cursor.next_document } Fri Aug 31 13:40:19 +0100 2012> Tailing.... nil Fri Aug 31 13:40:21 +0100 2012> Tailing.... nil Fri Aug 31 13:40:23 +0100 2012> Tailing.... nil |
Now instead of immediately returning a nil it will wait 2 to 3 seconds at the end of the collection incase new data comes.
So this is your consumer to the queue called commands here, anyone who saves data into the collection are producers. It’s quite light on resources on both the client and the MongoDB server, on a fairly low spec VM I was easily able to run 50+ consumers, a MongoDB instance and some producers.
MongoDB calls this feature Tailable Cursors and the thing the Ruby docs don’t tell you and that the Ruby library does not do for you is set the Mongo::Cursor::OP_QUERY_AWAIT_DATA option as above. Without this option it will still return nil immediately and the example code has a sleep to combat a infinite high resource usage loop. The proposed sleeping solution to the problem makes it completely pointless as a high performance Queue but the Mongo::Cursor::OP_QUERY_AWAIT_DATA option sorts that out.
A simple message structure
In my use case I have to be able to send messages to all consumers or sometimes just to a specific consumer. In other middleware you do this with different queue names or perhaps headers and then do selective subscribes to the queue picking off just the messages you are interested in.
I chose to use a single capped collection and use a structure similar to middleware headers to identify message targets:
{"headers" : {"target" : "all"}, "payload" : "data"} |
{"headers" : {"target" : "some.consumer"}, "payload" : "data"} |
The 2 examples show different target headers in one I am targeting everyone consuming the queue and in the 2nd one just a specific consumer. The payload can be anything, text, hashes whatever your needs are.
Lets look at a consumer that has a consumer name and that’s interested in messages directed at it or all consumers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | @consumer_identity = "example.com" @database = "queue" @collection = "commands" def get_collection @db ||= Mongo::Connection.new().db(@database) until @db.connection.active? puts ">>> Retrying database connection..." @db.connection.close @db.connection.connect sleep 0.5 unless @db.connection.active? end unless @db.collections.include?(@collection) coll = @db.create_collection(@collection, :capped => true, :size => 10240) else coll = @db.collection(@collection) end coll end loop do begin cursor = get_collection.find({"headers.target" => {"$in" => [@consumer_identity, "all"]}}) # ignore old stuff cursor.skip(cursor.count) # blocking tail reads cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE) cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA) begin # fetch all the docs forever loop do if doc = cursor.next_document p doc["payload"] end end rescue Mongo::OperationFailure => e puts ">>> Cursor closed: %s (%s)" % [e.to_s, e.class] end rescue Mongo::ConnectionFailure puts ">>> DB connection failed" end end |
On line 30 we’re setting up a Cursor for all messages matching “all” and our identity. You can now simply publish data with correct headers to target specific consumers or all consumers. The 2 loops will forever attempt to reconnect to any failed database and forever read whatever new messages arrives after connection.
Using this method it’s really easy to come up with all kinds of addressing modes for your queue. For example you can give work being done a job name and combine it with the target header to create sets of named consumers that will all receive commands that match just the work they’re able to do.
Results
As I initially said I did all this to test a MCollective connector that uses MongoDB as a middleware. It worked surprisingly well and I have both broadcast and directed modes working:
$ mco ping . . ---- ping statistics ---- 15 replies max: 57.94 min: 48.56 avg: 54.72 |
I’ll try out some other options for a small site or starter edition middleware and post follow up blog posts.
I’ll say I’ve been very surprised by how well this worked though. The connector is a bit complex and awkward because of how thread safety is handled in the MongoDB Ruby drivers but it’s not been a big problem overall to solve a pretty complex use case with this.
Specifically I noted performance didn’t degrade hugely with 50 nodes connected or with larger payloads which is very nice.
Is it really a queue but not topic?
When several consumers subscribe to the same target all of them will get the message, or won’t they?
Indeed, everyone who have matching find params will find it. This matches what is implied by “Publish Subscribe”.
So you’re right, it’s not a queue in the sense of STOMP or JMS.
In my example I am forwarding to the latest message and ignoring old messages which makes it even more topic like.
Will rename the post to make it clearer.
Thanks for the practical post
There is one point where I have a problem with the design.
Afaiu the following will just skip all entries that matched the previous find query?
# ignore old stuff
cursor.skip(cursor.count)
Then it means that while you get a Mongo::OperationFailure or Mongo::ConnectionFailure and reconnect to database again (or even a short full process restart), you’ll loose ALL of the messages/records which were inserted during disconnect (even a very short disconnect) – which can be bad even in pub/sub framework (not to speak of other use cases) – since short disconnects are legitimate and you do not want the consuber(subscriber), for example, mcollective agent to miss important action. No matter how long timeout you specify on the mco command the agent(subscriber) will never pickup the messages/records it missed, while it was disconnected from mongodb.
While this can be somehow tolerated in pub/sub , it means you can not use this for queues, where you have single consumer and you cannot loose messages/records.
So I guess you need to somehow remember the offset in capped collection – but how I have no idea? Since you were moving a cursor over a filtered (with find) results and thus you have no absolute offset snapshot at which you stopped, to which you can rewind upon Mongo::OperationFailure or Mongo::ConnectionFailure or process restart.
And afaiu even if you find a way to track the absolute offset snapshot it does not help in case you have a capped collection as the start of collection changes on every insert?
The only workaround I can think of is that the publisher re-sends the same message several times during the timeout period – and consumer should react only to the first duplicate message it receives.
@piavlo I think the behaviour is exactly like pubsub, even if you have a short disconnect in most pubsub middleware you only get what arrives since your new subscription
JMS let you declare a persistent subscribe to a topic and this would retain stuff you missed but you should probably use a queue if that’s what you want
The behaviour accurately describe a pub sub system but you could of course backtrack n messages in the capped collection to catch up to some recent events but I think that would be incorrect behaviour. Certainly for mco that would be very undesirable, it’s not a ‘job’ system its a real time or near real time rpc system.
As pointed out above it is indeed not a queue.
As for mcollective, well mcollective behaved this way all along, it was never a guaranteed delivery system it’s more like UDP than TCP. Since 2.0 though the direct addressing mode uses a queue and in this case given a capable queue as middleware mco will get messages it missed. But even then it will adhere to the TTL in those messages which is only 60 seconds by default
I think as a means of getting started and exploring the abilities of mcollective this isn’t a bad setup obviously if you wish to scale or need the actual queuing like behaviour you need to invest in an actual queue. More importantly if you need a persistant queue you really should not be using a database with the approach to storage that MongoDB takes. It’s all about evaluating it for your needs and deciding if it suits.
Why would you wanna do something like this, besides wasting your free time? is this a joke?
there are proven AMQ for every purpose. take rabbitmq, activemq, let alone the commercial ones. dont tell me it s fast, it wasnt even working when i tried this last year, i raised tickets which slip through noone responded.
Just get over the hype already.
@frat maybe you can’t read. The reasons are outlined above.
This has nothing to do with hype, I get how much mongo suck for most purposes, in some cases though it is a good solution, such as in my case.