Select Page

Replicating NATS Streams between clusters

I’ve mentioned NATS before – the fast and light weight message broker from nats.io – but I haven’t yet covered the sister product NATS Streaming before so first some intro.

NATS Streaming is in the same space as Kafka, it’s a stream processing system and like NATS it’s super light weight delivered as a single binary and you do not need anything like Zookeeper. It uses normal NATS for communication and ontop of that builds streaming semantics. Like NATS – and because it uses NATS – it is not well suited to running over long cluster links so you end up with LAN local clusters only.

This presents a challenge since very often you wish to move data out of your LAN. I wrote a Replicator tool for NATS Streaming which I’ll introduce here.

Streaming?


First I guess it’s worth covering what Streaming is, I should preface also that I am quite new in using Stream Processing tools so I am not about to give you some kind of official answer but just what it means to me.

In a traditional queue like ActiveMQ or RabbitMQ, which I covered in my Common Messaging Patterns posts, you do have message storage, persistence etc but those who consume a specific queue are effectively a single group of consumers and messages either go to all or load shared all at the same pace. You can’t really go back and forth over the message store independently as a client. A message gets ack’d once and once it’s been ack’d it’s done being processed.

In a Stream your clients each have their own view over the Stream, they all have their unique progress and point in the Stream they are consuming and they can move backward and forward – and indeed join a cluster of readers if they so wish and then have load balancing with the other group members. A single message can be ack’d many times but once ack’d a specific consumer will not get it again.

This is to me the main difference between a Stream processing system and just a middleware. It’s a huge deal. Without it you will find it hard to build very different business tools centred around the same stream of data since in effect every message can be processed and ack’d many many times vs just once.

Additionally Streams tend to have well defined ordering behaviours and message delivery guarantees and they support clustering etc. much like normal middleware has. There’s a lot of similarity between streams and middleware so it’s a bit hard sometimes to see why you won’t just use your existing queueing infrastructure.

Replicating a NATS Stream


I am busy building a system that will move Choria registration data from regional data centres to a global store. The new Go based Choria daemon has a concept of a Protocol Adapter which can receive messages on the traditional NATS side of Choria and transform them into Stream messages and publish them.

This gets me my data from the high frequency, high concurrency updates from the Choria daemons into a Stream – but the Stream is local to the DC. Indeed in the DC I do want to process these messages to build a metadata store there but I also want to processes these messages for replication upward to my central location(s).

Hence the importance of the properties of Streams that I highlighted above – multiple consumers with multiple views of the Stream.

There are basically 2 options available:

  1. Pick a message from a topic, replicate it, pick the next one, one after the other in a single worker
  2. Have a pool of workers form a queue group and let them share the replication load

At the basic level the first option will retain ordering of the messages – order in the source queue will be the order in the target queue. NATS Streaming will try to redeliver a message that timed out delivery and it won’t move on till that message is handled, thus ordering is safe.

The 2nd option since you have multiple workers you have no way to retain ordering of the messages since workers will go at different rates and retries can happen in any order – it will be much faster though.

I can envision a 3rd option where I have multiple workers replicating data into a temporary store where on the other side I inject them into the queue in order but this seems super prone to failure, so I only support these 2 methods for now.

Limiting the rate of replication


There is one last concern in this scenario, I might have 10s of data centres all with 10s of thousands of nodes. At the DC level I can handle the rate of messages but at the central location where I might have 10s of DCs x 10s of thousands of machines if I had to replicate ALL the data at near real time speed I would overwhelm the central repository pretty quickly.

Now in the case of machine metadata you probably want the first piece of metadata immediately but from then on it’ll be a lot of duplicated data with only small deltas over time. You could be clever and only publish deltas but you have the problem then that should a delta publish go missing you end up with a inconsistent state – this is something that will happen in distributed systems.

So instead I let the replicator inspect your JSON, if your JSON has something like fqdn in it, it can look at that and track it and only publish data for any single matching sender every 1 hour – or whatever you configure.

This has the effect that this kind of highly duplicated data is handled continuously in the edge but that it only gets a snapshot replication upwards once a hour for any given node. This solves the problem neatly for me without there being any risks to deltas being lost, it’s also a lot simpler to implement.

Choria Stream Replicator


So finally I present the Choria Stream Replicator. It does all that was described above with a YAML configuration file, something like this:

debug: false                     # default
verbose: false                   # default
logfile: "/path/to/logfile"      # STDOUT default
state_dir: "/path/to/statedir"   # optional
topics:
    cmdb:
        topic: acme.cmdb
        source_url: nats://source1:4222,nats://source2:4222
        source_cluster_id: dc1
        target_url: nats://target1:4222,nats://target2:4222
        target_cluster_id: dc2
        workers: 10              # optional
        queued: true             # optional
        queue_group: cmdb        # optional
        inspect: host            # optional
        age: 1h                  # optional
        monitor: 10000           # optional
        name: cmdb_replicator    # optional

Please review the README document for full configuration details.

I’ve been running this in a test DC with 1k nodes for a week or so and I am really happy with the results, but be aware this is new software so due care should be given. It’s available as RPMs, has a Puppet module, and I’ll upload some binaries on the next release.

The Choria Emulator

In my previous posts I discussed what goes into load testing a Choria network, what connections are made, subscriptions are made etc.

From this it’s obvious the things we should be able to emulate are:

  • Connections to NATS
  • Subscriptions – which implies number of agents and sub collectives
  • Message payload sizes

To make it realistically affordable to emulate many more machines that I have I made an emulator that can start numbers of Choria daemons on a single node.

I’ve been slowly rewriting MCollective daemon side in Go which means I already had all the networking and connectors available there, so a daemon was written:

usage: choria-emulator --instances=INSTANCES [<flags>]
 
Emulator for Choria Networks
 
Flags:
      --help                 Show context-sensitive help (also try --help-long and --help-man).
      --version              Show application version.
      --name=""              Instance name prefix
  -i, --instances=INSTANCES  Number of instances to start
  -a, --agents=1             Number of emulated agents to start
      --collectives=1        Number of emulated subcollectives to create
  -c, --config=CONFIG        Choria configuration file
      --tls                  Enable TLS on the NATS connections
      --verify               Enable TLS certificate verifications on the NATS connections
      --server=SERVER ...    NATS Server pool, specify multiple times (eg one:4222)
  -p, --http-port=8080       Port to listen for /debug/vars

You can see here it takes a number of instances, agents and collectives. The instances will all respond with ${name}-${instance} on any mco ping or RPC commands. It can be discovered using the normal mc discovery – though only supports agent and identity filters.

Every instance will be a Choria daemon with the exact same network connection and NATS subscriptions as real ones. Thus 50 000 emulated Choria will put the exact same load of work on your NATS brokers as would normal ones, performance wise even with high concurrency the emulator performs quite well – it’s many orders of magnitude faster than the ruby Choria client anyway so it’s real enough.

The agents they start are all copies of this one:

emulated0
=========
 
Choria Agent emulated by choria-emulator
 
      Author: R.I.Pienaar <rip@devco.net>
     Version: 0.0.1
     License: Apache-2.0
     Timeout: 120
   Home Page: http://choria.io
 
   Requires MCollective 2.9.0 or newer
 
ACTIONS:
========
   generate
 
   generate action:
   ----------------
       Generates random data of a given size
 
       INPUT:
           size:
              Description: Amount of text to generate
                   Prompt: Size
                     Type: integer
                 Optional: true
            Default Value: 20
 
 
       OUTPUT:
           message:
              Description: Generated Message
               Display As: Message

You can this has a basic data generator action – you give it a desired size and it makes you a message that size. It will run as many of these as you wish all called like emulated0 etc.

It has an mcollective agent that go with it, the idea is you create a pool of machines all with your normal mcollective on it and this agent. Using that agent then you build up a different new mcollective network comprising the emulators, federation and NATS.

Here’s some example of commands – you’ll see these later again when we talk about scenarios:

We download the dependencies onto all our nodes:

$ mco playbook run setup-prereqs.yaml --emulator_url=https://example.net/rip/choria-emulator-0.0.1 --gnatsd_url=https://example.net/rip/gnatsd --choria_url=https://example.net/rip/choria

We start NATS on our first node:

$ mco playbook run start-nats.yaml --monitor 8300 --port 4300 -I test1.example.net

We start the emulator with 1500 instances per node all pointing to our above NATS:

$ mco playbook run start-emulator.yaml --agents 10 --collectives 10 --instances 750 --monitor 8080 --servers 192.168.1.1:4300

You’ll then setup a client config for the built network and can interact with it using normal mco stuff and the test suite I’ll show later. Simularly there are playbooks to stop all the various parts etc. The playbooks just interact with the mcollective agent so you could use mco rpc directly too.

I found I can easily run 700 to 1000 instances on basic VMs – needs like 1.5GB RAM – so it’s fairly light. Using 400 nodes I managed to build a 300 000 node Choria network and could easily interact with it etc.

Finally I made a ec2 environment where you can stand up a Puppet Master, Choria, the emulator and everything you need and do load tests on your own dime. I was able to do many runs with 50 000 emulated nodes on EC2 and the whole lot cost me less than $20.

The code for this emulator is very much a work in progress as is the Go code for the Choria protocol and networking but the emulator is here if you want to take a peek.

What to consider when speccing a Choria network

In my previous post I talked about the need to load test Choria given that I now aim for much larger workloads. This post goes into a few of the things you need to consider when sizing the optimal network size.

Given that we now have the flexibility to build 50 000 node networks quite easily with Choria the question is should we, and if yes then what is the right size. As we can now federate multiple Collectives together into one where each member Collective is a standalone network we have the opportunity to optimise for the operability of the network rather than be forced to just build it as big as we can.

What do I mean when I say the operability of the network? Quite a lot of things:

  • What is your target response time on a unbatched mco rpc rpcutil ping command
  • What is your target discovery time? You should use a discovery data source but broadcast is useful, so how long do you want?
  • If you are using a discovery source, how long do you want to wait for publishes to happen?
  • How many agents will you run? Each agent makes multiple subscriptions on the middleware and consume resources there
  • How many sub collectives do you want? Each sub collective multiply the amount of subscriptions
  • How many federated networks will you run?
  • When you restart the entire NATS, how long do you want to wait for the whole network to reconnect?
  • How many NATS do you need? 1 can run 50 000 nodes, but you might want a cluster for HA. Clustering introduces overhead in the middleware
  • If you are federating a global distributed network, what impact does the latency cross the federation have and what is acceptable

So you can see that to a large extend the answer here is related to your needs and not only to the needs of benchmarking Choria. I am working on a set of tools to allow anyone to run tests locally or on a EC2 network. The main work hose is a Choria emulator that runs a 1 000 or more Choria instances on a single node so you can use a 50 node EC2 network to simulate a 50 000 node one.

Middleware Scaling Concerns


Generally for middleware brokers there are a few things that impact their scalability:

  • Number of TCP Connections – generally a thread/process is made for each
  • TLS or Plain text – huge overhead in TLS typically and it can put a lot of strain on single systems
  • Number of message targets – queues, topics, etc. Different types of target have different overheads. Often a thread/process for each.
  • Number of subscribers to each target
  • Cluster overhead
  • Persistence overheads like storage and ACKs etc

You can see it’s quite a large number of variables that goes into this, anywhere that requires a thread or process to manage 1 of it means you should get worried or at least be in a position to measure it.

NATS uses 1 go routine for each connection and no additional ones per subscription etc, its quite light weight but there are no hard and fast rules. Best to observe how it grows by needs, something I’ll include in my test suite.

How Choria uses NATS


It helps then to understand how Choria will use NATS and what connections and targets it makes.

A single Choria node will:

  • Maintain a single TCP+TLS connection to NATS
  • Subscribe to 1 queue unique to the node for every Subcollective it belongs to
  • For every agent – puppet, package, service, etc – subscribe to a broadcast topic for that agent. Once in every Subcollective. Choria comes default with 7 agents.

So if you have a node with 10 agents in 5 Subcollectives:

  • 50 broadcast subjects for agents
  • 5 queue subjects
  • 1 TCP+TLS connection

So 100 nodes will have 5 500 subscriptions, 550 NATS subjects and 100 TCP+TLS connections.

Ruby based Federation brokers will maintain 1 subscription to a queue subject on the Federation and same on the Collective. The upcoming Go based Federation Brokers will maintain 10 (configurable) connections to NATS on each side, each with these subscriptions.

Conclusion


This will give us a good input into designing a suite of tools to measure various things during the run time of a big test, check back later for details about such a tool.

You can read about the emulator I wrote in the next post.

An update on my Choria project

Some time ago I mentioned that I am working on improving the MCollective Deployment story.

I started a project called Choria that aimed to massively improve the deployment UX and yield a secure and stable MCollective setup for those using Puppet 4.

The aim is to make installation quick and secure, towards that it seems a common end to end install from scratch by someone new to project using a clustered NATS setup can take less than a hour, this is a huge improvement.

Further I’ve had really good user feedback, especially around NATS. One user reports 2000 nodes on a single NATS server consuming 300MB RAM and it being very performant, much more so than the previous setup.

It’s been a few months, this is whats changed:

  • The module now supports every OS AIO Puppet supports, including Windows.
  • Documentation is available on choria.io, installation should take about a hour max.
  • The PQL language can now be used to do completely custom infrastructure discovery against PuppetDB.
  • Many bugs have been fixed, many things have been streamlined and made more easy to get going with better defaults.
  • Event Machine is not needed anymore.
  • A number of POC projects have been done to flesh out next steps, things like a very capable playbook system and a revisit to the generic RPC client, these are on GitHub issues.

Meanwhile I am still trying to get to a point where I can take over maintenance of MCollective again, at first Puppet Inc was very open to the idea but I am afraid it’s been 7 months and it’s getting nowhere, calls for cooperation are just being ignored. Unfortunately I think we’re getting pretty close to a fork being the only productive next step.

For now though, I’d say the Choria plugin set is production ready and stable any one using Puppet 4 AIO should consider using these – it’s about the only working way to get MCollective on FOSS Puppet now due to the state of the other installation options.

Puppet 4 Sensitive Data Types

You often need to handle sensitive data in manifests when using Puppet. Private keys, passwords, etc. There has not been a native way to deal with these and so a cottage industry of community tools have spring up.

To deal with data at rest various Hiera backends like the popular hiera-eyaml exist, to deal with data on nodes a rather interesting solution called binford2k-node_encrypt exist. There are many more but less is more, these are good and widely used.

The problem is data leaks all over the show in Puppet – diffs, logs, reports, catalogs, PuppetDB – it’s not uncommon for this trusted data to show up all over the place. And dealing with this problem is a huge scope issue that will require adjustments to every component – Puppet, Hiera / Lookup, PuppetDB, etc.

But you have to start somewhere and Puppet is the right place, lets look at the first step.

Sensitive[T]


Puppet 4.6.0 introduce – and 4.6.1 fixed – a new data type that decorates other data telling the system it’s sensitive. And this data cannot by accident become logged or leaked since the type will only return a string indicating it’s redacted.

It’s important to note this is step one of a many step process towords having a unified blessed way of dealing with Sensitive data all over. But lets take a quick look at them. The official specification for this feature lives here.

In the most basic case we can see how to make sensitive data, how it looks when logged or leaked by accident:

$secret = Sensitive("My Socrates Note")
notice($secret)

This prints out the following:

Notice: Scope(Class[main]): Sensitive [value redacted]

To unwrap this and gain access to the real original data:

$secret = Sensitive(hiera("secret"))
 
$unwrapped = $secret.unwrap |$sensitive| { $sensitive }
notice("Unwrapped: ${unwrapped}")
 
$secret.unwrap |$sensitive| { notice("Lambda: ${sensitive}") }

Here you can see how to assign it unwrapped to a new variable or just use it in a block. Important to note you should never print these values like this and ideally you’d only ever use them inside a lambda if you have to use them in .pp code. Puppet has no concept of private variables so this $unwrapped variable could be accessed from outside of your classes. A lambda scope is temporary and private.

The output of above is:

Notice: Scope(Class[main]): Unwrapped: Too Many Secrets
Notice: Scope(Class[main]): Lambda: Too Many Secrets

So these are the basic operations, you can now of course pass the data around classes.

class mysql (
  Sensitive[String] $root_pass
) {
  # somehow set the password
}
 
class{"mysql":
  root_pass => Sensitive(hiera("mysql_root"))
}

Note here you can see the class specifically wants a String that is sensitive and not lets say a Number using the Sensitive[String] markup. And if you attempted to pass Sensitive(1) into it you’d get a type missmatch error.

Conclusion


So this appears to be quite handy, you can see down the line that lookup() might have a eyaml like system and emit Sensitive data directly and perhaps some providers and types will support this. But as I said it’s early days so I doubt this is actually useful yet.

I mentioned how other systems like PuppetDB and so forth also need updates before this is useful and indeed today PuppetDB is oblivious to these types and stores the real values:

$ puppet query 'resources[parameters] { type = "Class" and title = "Test" }'
...
  {
    "parameters": {
      "string": "My Socrates Note"
    }
  },
...

So this really does not yet serve any purpose but as a step one it’s an interesting look at what will come.