Day 17: Something about messaging (but I couldn’t think of a snappier title.)

Why messaging?

When I first started thinking about writing an Advent article this year I reflect that I hadn’t really written a great deal of Perl 6 in the past twelve months in comparison to the previous years when I appear to have written a large number of modules. What I have been doing (in my day job at least,) is thinking about and implementing applications that make heavy use of some messaging system. So I thought it would be interesting to bring some of those ideas to Perl 6.

Perl has always had a strong reputation as a “glue language” and Perl 6 has features that take off an run with that, most prominently the reactive and concurrent features, making it ideally suited to creating message based integration services.

What messaging?

At my feet right now is the excellent Enterprise Integration Patterns which I’d recommend to anyone who has an interest (or works in,) the field, despite it being nearly 15 years old now. However it is a weighty tome (literally, it weighs in at nearly one and half kilograms in hard book,) so I’m using it as a reminder to myself not to attempt to be exhaustive on the subject, lest this turn into a book itself.

There are quite a large number of managed messaging systems both free and commercial, using a range of protocols both open and proprietary, but I am going to limit myself to RabbitMQ which I know quite well and is supported in Perl 6 by Net::AMQP.

If you want to try the examples yourself you will need to have access to a RabbitMQ broker (which is available as a package for most operating system distributions,) but you can use the Docker Image, which appears to work quite well.

You will also need to install Net::AMQP which can be done with:

zef install Net::AMQP

In the examples I will be using the default connection details for the RabbitMQ server (that is the broker is running on localhost and the default guest is active,) if you need to supply differing details then you can alter the constructor for Net::AMQP to reflect the appropriate values:

my $n = Net::AMQP.new(
  host => 'localhost',
  port => 5672,
  login => 'guest',
  password => 'guest',
  vhost => '/'
);

A couple of the examples may require other modules but I’ll introduce them as I go along.

Obligatory Hello, World

RabbitMQ implements the rich broker architecture that is described by the AMQP v0.9 specification, the more recent v1.0 specification as implemented by ActiveMQ does away with much of the prescribed broker semantics to the extent that it is basically a different protocol that shares a similar wire format.

Possibly the simplest possible example of sending a message (a producer,) would be:

   use Net::AMQP;

    my $n = Net::AMQP.new;

    await $n.connect;
    my $channel = $n.open-channel(1).result;
    my $exchange = $channel.exchange.result;
    $exchange.publish(routing-key => "hello", body => "Hello, World".encode);
    await $n.close("", "");

This demonstrates most of the core features of RabbitMQ and Net::AMQP.

Firstly you will notice that many of the methods return a Promise that will be mostly kept with the actual returned value, this reflects the asynchronous nature of the broker which sends (im most cases but not all,) a confirmation message (method in AMQP parlance,) when the operation has been completed on the server.

The connect here establishes the network connection to the broker and negotiates certain parameters, returning a Promise which will be kept with a true value if succesfull or broken if the network connection fails, the supplied credentials are incorrect or the server declines the connection for some other reason.

The open-channel opens a logical broker communication channel in which the messages are exchanged, you may use more than one channel in an application. The returned Promise will be kept with an initialised Net::AMQP::Channel object when confirmed by the server.

The exchange method on the channel object returns a Net::AMQP::Exchange object, in the AMQP model all messages are published to an exchange from which the broker may route the message to one or more queues depending on the definition of the exchange from whence the message may be consumed by another client. In this simple example we are going to use the default exchange (named amq.default.)

The publish method is called on the exchange object, it has no return value as it is simply fire and forget, the broker doesn’t confirm the receipt and the delivery or otherwise to a queue is decoupled from the act of publishing the message. The routing-key parameter is, as the name suggests, used by the broker to determine which queue (or queues,) to route the message to. In the case of the default exchange used in this example the type of the exchange is direct which basically means that the messsage is delivered to exactly one consumer of a queue with a matching name to the routing key. The body is always a Buf and can be of an arbitrary length, in this case we are using an encoded string but it could equally be encoded JSON, MessagePack or BSON blob, whatever suits the consuming application. You can infact supply content-type and content-encoding parameters which will be passed on with the message delivered to a consumer if the design of your application requires it, but the broker itself is totally agnostic to the content of the payload. There are other optional parameters but none are required in this example.

Of course we also need something to read the messages that we are publishing (a consumer,) :

use Net::AMQP;

my $n = Net::AMQP.new;

my $connection = $n.connect.result;

react {
    whenever $n.open-channel(1) -> $channel {
        whenever $channel.declare-queue("hello") -> $queue {
            $queue.consume;
            whenever $queue.message-supply.map( -> $v { $v.body.decode }) -> $message {
                say $message;
                $n.close("", "");
                done();
            }
        }
    }
}

Here, rather than operating on an exchange as we did in the producer, we are using a named queue; declare-queue will cause the queue to be created if it doesn’t already exist and the broker will, by default, bind this queue to the default exchange, “binding” essentially means that messages sent to the exchange can be routed to the queue depending on the exchange type, the routing key of the messages and possibly other metadata from the message. In this case the “direct” type of the default exchange will cause the messages to be routed to a queue that matches the routing key (if one exists, the message will be silently dropped if it doesn’t.)

The consume method is called when you are ready to start receiving messages, it returns a Promise that will be kept with the “consumer tag” that uniquely identifies the consumer to the server but, as we don’t need it here, we can ignore it.

Once we have called consume (and the broker has sent the confirmation,) the messages that are routed to our queue will be emitted to the Supply returned by message-supply as Net::AMQP::Queue::Message objects, however as we aren’t interested in the message metadata in this example map is used to create a new Supply with the decoded bodies of the messages; this is safe where, as in this case, you can guarantee that you will be receiving utf-8 encoded but in a real world application you may want to be somewhat more robust about handling the body if you aren’t in control of the sender (which is often the case when integrating with third party applications.) The content-type and content-encoding as supplied when publishing the message are available in the headers attribute (a Hash,) of the Message object, but they aren’t required to be set, so you may want to consider an alternative scheme as suitable for your application.

In this example the connection is closed and the react exited after the first message is received, but in reality you may want remove the lines:

$n.close("", "");
done();

from the inner whenever and if you want to exit on a signal for example add:

whenever signal(SIGINT) {
    $n.close("", "");
    done();
}

within the top level of the react block. However you choose to exit your program you should always call call close on the connection object as it will cause a warning message in the broker logs that might upset the person administering the server if you don’t.

We could of course have used the react syntax in the producer example in a similar way, but it would have added verbosity for little benefit, however in a larger program where you may for instance be processing from, say, a Supply it can work quite nicely:

    use Net::AMQP;
      
    my $supply = Supply.from-list("Hello, World", "Bonjour le monde", "Hola Mundo");
    my $n = Net::AMQP.new;

    react {
        whenever $n.connect {
            whenever $n.open-channel(1) -> $channel {
                whenever $channel.exchange -> $exchange {
                    whenever $supply.map(-> $v { $v.encode }) -> $body {
                        $exchange.publish(routing-key => "hello", :$body );
                        LAST {
                            $n.close("", "");
                            done();
                        }
                    }
                }
            }
        }
    }

Something a bit more useful

You’re probably thinking “that’s all very well, but that’s nothing I couldn’t do with, say, an HTTP client and a small web-server”, well, you’re getting reliable queuing, persistence of unread messages and so forth, but yes, it could be over-kill for a simple application, until you add a requirement to send the messages to multiple, possibly unknown, consumers for example. This kind of pattern is a use of the “fanout” exchange type, which will deliver a message to all queues that are bound to the exchange.

In this example we will need to declare our own queue, in order that we can specify the type, but the producer doesn’t become much more complicated:

use Net::AMQP;

my $n = Net::AMQP.new;
my $con =  await $n.connect;
my $channel = $n.open-channel(1).result;
my $exchange = $channel.declare-exchange('logs', 'fanout').result;
$exchange.publish(body => 'Hello, World'.encode);
await $n.close("", "");

The only major difference here is that we use declare-exchange rather than exchange on the channel to obtain the exchange to which we send the message, this has the advantage of causing the exchange to be created on the broker with the specified type if it doesn’t already exist which is useful here as we don’t need to rely on the exchange being created beforehand (with the command line tool rabbitmqctl or via the web management interface,) but it similarly returns a Promise that will be kept with the exchange object. You probably also noticed that here the routing-key is not being passed to the publish method, this is because for a fanout exchange the routing key is ignored and the messages are delivered to all the consuming queues that are bound to the exchange.

The consumer code is likewise not dissimilar to our original consumer:

use Net::AMQP;

my $n = Net::AMQP.new;

my $connection = $n.connect.result;

react {
    whenever $n.open-channel(1) -> $channel {
        whenever $channel.declare-exchange('logs', 'fanout') -> $exchange {
            whenever $channel.declare-queue() -> $queue {
                whenever $queue.bind('logs') {
                    $queue.consume;
                    whenever $queue.message-supply.map( -> $v { $v.body.decode }) -> $message {
                        say $*PID ~ " : " ~ $message;
                    }
                }
                whenever signal(SIGINT) {
                    say $*PID ~ " exiting";
                    $n.close("", "");
                    done();
                }

            }
        }
    }
}

The exchange is declared in the same way that it was declared in the producer example, this is really a convenience so you don’t have to worry about which order to start the programs, the first one run will create the queue, however if you run the producer before the consumer is started the messages sent will be dropped as there is nowhere by default to route them. Here we are also declaring a queue without providing a name, this creates an “anonymous” queue (the name is made up by the broker,) because the name of the queue doesn’t play a part in the routing of the messages in this case.

You could provide a queue name but if there are duplicate names then the messages will be routed to the queues with the same names on a “first come, first served” basis, which is possibly not the expected behaviour (though it is possible and may have a use.)

Also in this case the queue has to be explictly bound to the exchange we have declared, in the first example the binding to the default exchange was performed by the broker automatically, but in most other cases you will have to use bind on the queue with the name of the exchange. bind, like many of the methods, returns a Promise that will be kept when the broker confirms that the operation has been completed (though in this case the value isn’t important.)

You should be able to start as many of the consumers as you want and they will all receive all the messages in the same order that they are sent. Of course in a real world application the consumers may be completely different prograns written in a variety of different languages.

Keeping on Topic

A common pattern is a set of consumers that are only interested in some of the messages published to a particular exchange, a classic example of this might be a logging system where there are consumers specialised to different log levels for instance. AMQP provides a topic exchange type that allows for the routing of the messages to a particular queue by pattern matching on the producer supplied routing key.

The simplest producer might be:

	use Net::AMQP;

	multi sub MAIN(Str $message = 'Hello, World', Str $level = 'application.info') {
		my $n = Net::AMQP.new;
		my $con =  await $n.connect;
		my $channel = $n.open-channel(1).result;
		my $exchange = $channel.declare-exchange('topic-logs', 'topic').result;
		$exchange.publish(routing-key => $level, body => $message.encode);
		await $n.close("", "");
	}

This should now be fairly clear from the previous examples, except in this case we declare the exchange as the topic type and also provide the routing key that will be used by the broker to match the consuming queues.

The consumer code itself is again fairly similar to the previous examples, except it will take a list of patterns on the command line that will be used to match the routing key sent to the exchange:

use Net::AMQP;

multi sub MAIN(*@topics ) {
    my $n = Net::AMQP.new(:debug);
    unless @topics.elems {
        say "will be displaying all the messages";
        @topics.push: '#';
    }
    my $connection = $n.connect.result;
    react {
        whenever $n.open-channel(1) -> $channel {
            whenever $channel.declare-exchange('topic-logs', 'topic') -> $exchange {
                whenever $channel.declare-queue() -> $queue {
                    for @topics -> $topic {
                        await $queue.bind('topic-logs', $topic);
                    }
                    $queue.consume;
                    my $body-supply = $queue.message-supply.map( -> $v { [ $v.routing-key, $v.body.decode ] }).share;
                    whenever $body-supply -> ( $topic , $message ) {
                            say $*PID ~ " : [$topic]  $message";
                    }
                }
            }
        }
    }
}

Here essentially the only difference from the previous consumer example is (aside from the type supplied to the exchange declaration,) that the topic is supplied to the bind method. The topic can be a simple pattern where a # will match any supplied routing key and the behaviour will be the same as a fanout exchange, otherwise a * can be used in any part of the binding topic as a wild card which will match any characters in the topic, so in this example application.* will match messages sent with the routing key application.info or application.debug for instance.

If there is more than one queue bound with the same pattern, they too will behave as if they were bound to a fanout exchange. If the bound pattern contains neither a hash nor an asterisk character then the queue will behave as if it was bound to a direct exchange as a queue with that name (that is to say it will have the messages delivered on a first come, first served basis.)

But there’s more to life than just AMQP

Of course. The beauty of the Perl 6 reactive model is that various sources feeding Supplies can be integrated into your producer code as touched on above and similarly a consumer can push a message to another transport mechanism.

I was delighted to discover while I was thinking about the examples for this that the following just works:

	use EventSource::Server;
	use Net::AMQP;
	use Cro::HTTP::Router;
	use Cro::HTTP::Server;

	my $supply = supply { 
		my $n = Net::AMQP.new;
		my $connection = $n.connect.result;
		whenever $n.open-channel(1) -> $channel {
			whenever $channel.declare-queue("hello") -> $queue {
				$queue.consume;
				whenever $queue.message-supply.map( -> $v { $v.body.decode }) -> $data {
					emit EventSource::Server::Event.new(type => 'hello', :$data);
				}
			}
		}
	};

	my $es = EventSource::Server.new(:$supply);

	my $application = route {
		get -> 'greet', $name {
			content 'text/event-stream; charset=utf-8', $es.out-supply;
		}
	}
	my Cro::Service $hello = Cro::HTTP::Server.new:
		:host, :port, :$application;
	$hello.start;

	react whenever signal(SIGINT) { $hello.stop; exit; }

This is a variation of the example in the EventSource::Server you could of course the alter it to use any of the exchange types as discussed above. It should work fine with the producer code from the first example. And (if you were so persuaded,) you could consume the events with a small piece of node.js code (or in some browser oriented javascript,):

	var EventSource = require('eventsource');

	var event = process.argv[2] || 'message';

	console.info(event);
	var v = new EventSource(' http://127.0.0.1:10000');

	v.addEventListener(event, function(e) {
		console.info(e);

	}, false);

Wrapping it up

I concluded after typing the first paragraph of this that I would never be able to do this subject justice in a short article, so I hope you consider this as an appetizer, I don’t think I’ll ever find the time to write the book that it probably deserves. But I do have all the examples based on the RabbitMQ tutorials so check that out and feel free to contribute.

 

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.