{"id":2766,"date":"2012-08-19T13:23:01","date_gmt":"2012-08-19T12:23:01","guid":{"rendered":"http:\/\/www.devco.net\/?p=2766"},"modified":"2012-08-19T16:23:48","modified_gmt":"2012-08-19T15:23:48","slug":"mcollective-async-result-handling","status":"publish","type":"post","link":"https:\/\/www.devco.net\/archives\/2012\/08\/19\/mcollective-async-result-handling.php","title":{"rendered":"MCollective Async Result Handling"},"content":{"rendered":"

This ia a post in a series of posts I am doing about MCollective 2.0 and later<\/a>. <\/p>\n

Overview<\/H3>
\nThe kind of application I tend to show with MCollective is very request-response orientated. You request some info from nodes and it shows you the data as they reply. This is not the typical thing people tend to do with middleware, instead what they do is create receivers for event streams processing those into databases or using it as a job queue.<\/p>\n

The MCollective libraries can be used to build similar applications and today I’ll show a basic use case for this. It’s generally really easy creating a consumer for a job queue using Middleware as covered in my recent series of blog posts<\/a>. It’s much harder doing it when you want to support multiple middleware brokers, support pluggable payload encryption, different serializers add some Authentication, Authorization and Auditing into the mix and soon it becomes a huge undertaking.<\/p>\n

MCollective already has a rich sets of plugins for all of this so it would be great if you could reuse these to save yourself some time.<\/p>\n

Request, but reply elsewhere<\/H3>
\nOne of the features we added in 2.0.0 is more awareness of the classical reply-to behaviour common to middleware brokers to the core MCollective libraries. Now every request specifies a reply-to target and the nodes will send their replies there, this is how we get replies back from nodes and if the brokers support it this is typically done using temporary private queues.<\/p>\n

But it’s not restricted to this, lets see how you can use this feature from the command line. First we’ll setup a listener on a specific queue using my stomp-irb<\/a> application.<\/p>\n

<\/p>\n

\r\n% stomp-irb -s stomp -p 6163\r\nInteractive Ruby shell for STOMP\r\n\r\ninfo> Attempting to connect to stomp:\/\/rip@stomp:6163\r\ninfo> Connected to stomp:\/\/rip@stomp:6163\r\n\r\nType 'help' for usage instructions\r\n\r\n>> subscribe :queue, \"mcollective.nagios_passive_results\"\r\nCurrent Subscriptions:\r\n        \/queue\/mcollective.nagios_passive_results\r\n\r\n=> nil\r\n>> \r\n<\/pre>\n

<\/code><\/p>\n

We’re now receiving all messages on \/queue\/mcollective.nagios_passive_results<\/em>, lets see how we get all our machines to send some data there:<\/p>\n

<\/p>\n

\r\n% mco rpc nrpe runcommand command=check_load --reply-to=\/queue\/mcollective.nagios_passive_results\r\nRequest sent with id: 61dcd7c8c4a354198289606fb55d5480 replies to \/queue\/mcollective.nagios_passive_results\r\n<\/pre>\n

<\/code><\/p>\n

Note this client recognised that you’re never going to get replies so it just publishes the request(s) and shows you the outcome. It’s real quick and doesn’t wait of care for the results.<\/p>\n

And over in our stomp-irb we should see many messages like this one:<\/p>\n

<\/p>\n

\r\n<> BAh7CzoJYm9keSIB1QQIewg6CWRhdGF7CToNZXhpdGNvZGVpADoMY29tbWFu\r\nZCIPY2hlY2tfbG9hZDoLb3V0cHV0IihPSyAtIGxvYWQgYXZlcmFnZTogMC44\r\nMiwgMC43NSwgMC43MToNcGVyZmRhdGEiV2xvYWQxPTAuODIwOzEuNTAwOzIu\r\nMDAwOzA7IGxvYWQ1PTAuNzUwOzEuNTAwOzIuMDAwOzA7IGxvYWQxNT0wLjcx\r\nMDsxLjUwMDsyLjAwMDswOyA6D3N0YXR1c2NvZGVpADoOc3RhdHVzbXNnIgdP\r\nSzoOcmVxdWVzdGlkIiU2MWRjZDdjOGM0YTM1NDE5ODI4OTYwNmZiNTVkNTQ4\r\nMDoMbXNndGltZWwrBwjRMFA6DXNlbmRlcmlkIgl0d3AxOgloYXNoIgGvbVdV\r\nV0RXaTd6a04xRWYrM0RRUWQzUldsYjJINTltMUdWYkRBdWhVamJFaEhrOGJl\r\nYkd1Q1daMnRaZ3VBCmx3MW5DeXhtT2xWK3RpbzlCNFBMbnhoTStvV3Z6OEo4\r\nSVNiYTA4a2lzK3BVTVZ0cGxiL0ZPRVlMVWFPRQp5K2QvRGY3N2I2TTdGaGtJ\r\nRUxtR2hONHdnZTMxdU4rL3hlVHpRenE0M0lJNE5CVkpRTTg9CjoQc2VuZGVy\r\nYWdlbnQiCW5ycGU=\r\n<\/pre>\n

<\/code><\/p>\n

What you’re looking at is a base64 encoded serialized MCollective reply message. This reply message is in this case signed using a SSL key for authenticity and has the whole MCollective reply in it.<\/p>\n

MCollective to Nagios Passive Check bridge<\/H3>
\nSo as you might have guessed from the use of the NRPE plugin and the queue name I chose the next step is to connect the MCollective NRPE results to Nagios using its passive check interface:<\/p>\n

<\/p>\n

\r\nrequire 'mcollective'\r\nrequire 'pp'\r\n\r\n# where the nagios command socket is\r\nNAGIOSCMD = \"\/var\/log\/nagios\/rw\/nagios.cmd\"\r\n\r\n# to mcollective this is a client, load the client config and\r\n# inform the security system we are a client\r\nMCollective::Applications.load_config\r\nMCollective::PluginManager[\"security_plugin\"].initiated_by = :client\r\n\r\n# connect to the middleware and subscribe\r\nconnector = MCollective::PluginManager[\"connector_plugin\"]\r\nconnector.connect\r\nconnector.connection.subscribe(\"\/queue\/mcollective.nagios_passive_results\")\r\n\r\n# consume all the things...\r\nloop do\r\n  # get a mcollective Message object and configure it as a reply\r\n  work = connector.receive\r\n  work.type = :reply\r\n\r\n  # decode it, this will go via the MCollective security system\r\n  # and validate SSL etcetc\r\n  work.decode!\r\n\r\n  # Now we have the NRPE result, just save it to nagios\r\n  result = work.payload\r\n  data = result[:body][:data]\r\n\r\n  unless data[:perfdata] == \"\"\r\n    output = \"%s|%s\" % [data[:output], data[:perfdata]]\r\n  else\r\n    output = data[:output]\r\n  end\r\n\r\n  passive_check = \"[%d] PROCESS_SERVICE_CHECK_RESULT;%s;%s;%d;%s\" % [result[:msgtime], result[:senderid], data[:command].gsub(\"check_\", \"\"), data[:exitcode], output]\r\n\r\n  begin\r\n    File.open(NAGIOSCMD, \"w\") {|nagios| nagios.puts passive_check }\r\n  rescue => e\r\n    puts \"Could not write to #{NAGIOSCMD}: %s: %s\" % [e.class, e.to_s]\r\n  end\r\nend\r\n<\/pre>\n

<\/code><\/p>\n

This code connects to the middleware using the MCollective Connector Plugin, subscribes to the specified queue and consumes the messages.<\/p>\n

You’ll note there is very little being done here that’s actually middleware related we’re just using the MCollective libraries. The beauty of this code is that if we later wish to employ a different middleware or different security system or configure our middleware connections to use TLS to ActiveMQ nothing has to change here. All the hard stuff is done in MCollective config and libraries.<\/p>\n

In this specific case I am using the SSL plugin for MCollective so the message is signed so no-one can edit the results in a MITM attack on the monitoring system. This came for free I didn’t have to write any code here to get this ability – just use MCollective.<\/p>\n

Scheduling Nagios Checks and scaling them with MCollective<\/H3>
\nNow that we have a way to receive check results from the network lets look at how we can initiate checks. I’ll use the very awesome
Rufus Scheduler<\/a> Gem for this.<\/p>\n

I want to create something simple that reads a simple config file of checks and repeatedly request my nodes – possibly matching mcollective filters – to do NRPE checks. Here’s a sample checks file:<\/p>\n

<\/p>\n

\r\nnrpe \"check_load\", \"1m\", \"monitored_by=monitor1\"\r\nnrpe \"check_swap\", \"1m\", \"monitored_by=monitor1\"\r\nnrpe \"check_disks\", \"1m\", \"monitored_by=monitor1\"\r\nnrpe \"check_bacula_main\", \"6h\", \"bacula::node monitored_by=monitor1\"\r\n<\/pre>\n

<\/code><\/p>\n

This will check load, swap and disks on all machines monitored by this monitoring box and do a bacula backup check on machines that has the bacula::node<\/em> class included via puppet.<\/p>\n

Here’s a simple bit of code that takes the above file and schedules the checks:<\/p>\n

<\/p>\n

\r\nrequire 'rubygems'\r\nrequire 'mcollective'\r\nrequire 'rufus\/scheduler'\r\n\r\n# (ab)use mcollective logger...\r\nLog = MCollective::Log\r\n\r\nclass Scheduler\r\n  include MCollective::RPC\r\n\r\n  def initialize(destination, checks)\r\n    @destination = destination\r\n    @jobs = []\r\n\r\n    @scheduler = Rufus::Scheduler.start_new\r\n    @nrpe = rpcclient(\"nrpe\")\r\n\r\n    # this is where the magic happens, send all the results to the receiver...\r\n    @nrpe.reply_to = destination\r\n\r\n    instance_eval(File.read(checks))\r\n  end\r\n\r\n  # helper to schedule checks, this will create rufus jobs that does NRPE requests\r\n  def nrpe(command, interval, filter=nil)\r\n    options = {:first_in => \"%ss\" % rand(Rufus.parse_time_string(interval)),\r\n               :blocking => true}\r\n\r\n    Log.info(\"Adding a job for %s every %s matching '%s', first in %s\" % [command, interval, filter, options[:first_in]])\r\n\r\n    @jobs << @scheduler.every(interval.to_s, options) do\r\n      Log.info(\"Publishing request for %s with filter '%s'\" % [command, filter])\r\n\r\n      @nrpe.reset_filter\r\n      @nrpe.filter = parse_filter(filter)\r\n      @nrpe.runcommand(:command => command.to_s)\r\n    end\r\n  end\r\n\r\n  def parse_filter(filter)\r\n    new_filter = MCollective::Util.empty_filter\r\n\r\n    return new_filter unless filter\r\n\r\n    filter.split(\" \").each do |filter|\r\n      begin\r\n        fact_parsed = MCollective::Util.parse_fact_string(filter)\r\n        new_filter[\"fact\"] << fact_parsed\r\n      rescue\r\n        new_filter[\"cf_class\"] << filter\r\n      end\r\n    end\r\n\r\n    new_filter\r\n  end\r\n\r\n  def join\r\n    @scheduler.join\r\n  end\r\nend\r\n\r\ns = Scheduler.new(\"\/queue\/mcollective.nagios_passive_results\", \"checks.txt\")\r\ns.join\r\n<\/pre>\n

<\/code><\/p>\n

When I run it I get:<\/p>\n

<\/p>\n

\r\n% ruby schedule.rb\r\ninfo 2012\/08\/19 13:06:46: activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp:\/\/nagios@stomp:6163\r\ninfo 2012\/08\/19 13:06:46: activemq.rb:101:in `on_connected' Conncted to stomp:\/\/nagios@stomp:6163\r\ninfo 2012\/08\/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_load every 1m matching 'monitored_by=monitor1', first in 36s\r\ninfo 2012\/08\/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_swap every 1m matching 'monitored_by=monitor1', first in 44s\r\ninfo 2012\/08\/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_disks every 1m matching 'monitored_by=monitor1', first in 43s\r\ninfo 2012\/08\/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 496s\r\ninfo 2012\/08\/19 13:07:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'\r\ninfo 2012\/08\/19 13:07:29: schedule.rb:28:in `nrpe' Publishing request for check_disks with filter 'monitored_by=monitor1'\r\ninfo 2012\/08\/19 13:07:30: schedule.rb:28:in `nrpe' Publishing request for check_swap with filter 'monitored_by=monitor1'\r\ninfo 2012\/08\/19 13:08:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'\r\n<\/pre>\n

<\/code><\/p>\n

All the checks are loaded, they are splayed a bit so they don't cause a thundering herd and you can see the schedule is honoured. In my nagios logs I can see the passive results being submitted by the receiver.<\/p>\n

MCollective NRPE Scaler<\/H3>
\nSo taking these ideas I've knocked up a project that does this with some better code than above, it's still in progress and I'll blog later about it. For now you can check out the code on
GitHub<\/a> it includes all of the above but integrated better and should serve as a more complete example than I can realistically post on a blog post.<\/p>\n

There are many advantages to this method that comes specifically from combining MCollective and Nagios. The Nagios scheduler visit hosts one by one meaning you get this moving view of status over a 5 minute resolution. Using MCollective to request the check on all your hosts means you get a 1 second resolution - all the load averages Nagios sees are from the same narrow time period. Receiving results on a queue has scaling benefits and the MCollective libraries are already multi broker aware and supports failover to standby brokers which means this isn't a single point of failure.<\/p>\n

Conclusion<\/H3>
\nSo we've seen that we can reuse much of the MCollective internals and plugin system to setup a dedicated receiver of MCollective produced data and I've shown a simple use case where we're requesting data from our managed nodes. <\/p>\n

Today what I showed kept the request-response model but split the traditional MCollective client into two. One part scheduling requests and another part processing results. These parts could even be on different machines. <\/p>\n

We can take this further and simply connect 2 bits of code together and flow arbitrary data between them but securing the communications using the MCollective protocol. A follow up blog post will look at that.<\/p>\n","protected":false},"excerpt":{"rendered":"

This ia a post in a series of posts I am doing about MCollective 2.0 and later. Overview The kind of application I tend to show with MCollective is very request-response orientated. You request some info from nodes and it shows you the data as they reply. This is not the typical thing people tend […]<\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_et_pb_use_builder":"","_et_pb_old_content":"","footnotes":""},"categories":[7],"tags":[121,85,78,106,107,60,13],"_links":{"self":[{"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/posts\/2766"}],"collection":[{"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/comments?post=2766"}],"version-history":[{"count":24,"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/posts\/2766\/revisions"}],"predecessor-version":[{"id":2791,"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/posts\/2766\/revisions\/2791"}],"wp:attachment":[{"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/media?parent=2766"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/categories?post=2766"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.devco.net\/wp-json\/wp\/v2\/tags?post=2766"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}