What is my concurrent or parallel Raku program doing?

 

Raku makes it easy to introduce a bit of parallelism into a program – at least, when the solution lends itself to that. Its asynchronous programming capabilities also shine when building concurrent applications. During the last year, I’ve enjoyed employing both of these in my work.

However, I also discovered that something was missing. It was relatively straightforward to build parallel and concurrent things what I wanted in Raku. However, once they were built, I found it hard to reason about what they were doing. For example:

  • For data parallel operations, what degree of parallelism was being achieved?
  • What tasks happened in parallel, and were there missed opportunities for further task parallelism?
  • Where was time being spent in different stages of an asynchronous workflow? For example, if a HTTP request could trigger background work, which later led to a message being sent over a WebSocket, where did the time go between these steps?
  • Given a Cro web application is also an asynchronous pipeline, where is time going on request processing? Is some piece of middleware surprisingly eating up a lot of time?

At first, I started putting in bits of timing code that wrote results out to the console. Of course, I either had to remove it again afterwards, or guard it with an environment variable if I wanted to keep it in the code. And even then, the data it produced was quite hard to interpret This all felt rather inefficient.

Thus, Log::Timeline was born. Using it, I learned a lot about my application’s behavior. Best of all, I didn’t just get answers to the questions I had, but also some that I hadn’t even considered asking. In this advent post, I’ll show how you can use this module also.

Trying it out

The first way we might make use of Log::Timeline doesn’t actually involve using the module at all, but rather using some library that already uses it to do logging. Since Cro::HTTP has done this (since 0.8.1), we can get our very first glimpse at Log::Timeline in action by building a Cro HTTP application.

The second thing we need is some way to view the logs. At present, there are two modes of Log::Timeline output: writing a file in JSONLines format or sending them over a socket. The second of these is used by the log viewer in Comma (a Raku IDE), meaning that we can view the output live as our application runs.

Thus, assuming we already installed Cro, we can:

  1. Create a new Cro Web Application project in Comma (works much like using cro stub at the command line)
  2. Create a “Cro Service” Run Configuration (choose Edit Configurations on the Run menu)
  3. On the Run Menu, now choose “Run ‘service.p6’ and show Timeline”

The service will start up, and display the URL it is running at in the console (probably http://localhost:20000). If we make a request to it, then flip to the Timeline tab, we’ll see the request logged (in fact, do it in a browser then probably two requests are logged, since a favicon.ico is automatically requested). The requests can be expanded to see how the processing time within them is spent.

cro-app-1

Cro can process requests to HTTP application in parallel. In fact, it’s both concurrent (thanks to use of asynchronous I/O) and parallel (all steps of request processing are run on the Raku thread pool). So, if we use the Apache Benchmark program to send 100 requests, 3 at a time, we’d hope to see an indication that up to 3 requests were being processed in parallel. Here’s the command:

ab -n 100 -c 3 http://localhost:20000/

And we do, indeed, see the parallelism:

cro-app-2

Similarly, if we jack it up to 10 concurrent requests:

ab -n 100 -c 10 http://localhost:20000/

Then we will see something like this:

cro-app-3

Looking a little more closely, we see that the “Process Request” task is logged as part of the HTTP Server category of the Cro module. However, that’s not all: tasks (things that happen over a period of time) can also have data logged with them. For example, the HTTP request method and target are logged too:

data

We might also notice that requests are logged in alternating shades of color; this is to allow us to differentiate two tasks if they occur “back to back” with no time between them (or at least, none visible at our zoom level).

Adding Log::Timeline support to an application

What if we want to add Log::Timeline support to our own an application, so we can understand more about its behavior? To illustrate this, let’s add it to jsonHound. This is an application that looks through JSON files, and ensures that they are in compliance with a set of rules (its was built for checking the security of router configurations, but in principle could be used for much more).

When jsonHound is run, there are two steps:

  1. Loading the rules, which are just written in Raku
  2. Checking each specified JSON file against the rules; if there is more than one file, they will be checked in parallel

We’ll create a Log::Timeline task for each of these. These go into a JsonHound::LogTimelineSchema module. The code in the module looks like this:

unit module JsonHound::Logging;

use Log::Timeline;

class LoadRules does Log::Timeline::Task["jsonHound", "Run", "Load Rules"] {
}

class CheckFile does Log::Timeline::Task["jsonHound", "Run", "Check File"] {
}

First, we use the Log::Timeline module. Then we create a class for each task that does the Log::Timeline::Task role (there is also an Event role, which can be used to log events that happen at a particular moment in time).

Next, we need to modify the program to use them. First, in the code we want to add logging to, we need to use our task schema:

use JsonHound::LogTimelineSchema;

And now we can go to work. Loading the ruleset looks like this:

my $*JSON-HOUND-RULESET = JsonHound::RuleSet.new;
my $rule-file = $validations.IO;
CompUnit::RepositoryRegistry.use-repository:
        CompUnit::RepositoryRegistry.repository-for-spec(
                $rule-file.parent.absolute);
require "$rule-file.basename()";

We wrap it up as follows:

JsonHound::Logging::LoadRules.log: {
    my $rule-file = $validations.IO;
    CompUnit::RepositoryRegistry.use-repository:
            CompUnit::RepositoryRegistry.repository-for-spec(
                    $rule-file.parent.absolute);
    require "$rule-file.basename()";
}

When we’re not running with any logging output, the block is just executed as normal. If, however, we were to have the socket output, then it would send a message out over the socket when the task begins, and another when it ends.

The per-file analysis looks like this:

.($reporter-object) for @json-files.race(:1batch).map: -> $file {
    # Analysis code here
}

That is, we take the JSON files, and then map over them in parallel. Each produces a result, which we then invoke with the reporter. The exact details matter little; all we really need to do is wrap the analysis in our task:

.($reporter-object) for @json-files.race(:1batch).map: -> $file {
    JsonHound::Logging::CheckFile.log: {
        # Analysis code here
    }
}

Handily, the log method passes along the return value of the block.

What we’ve done so far will work, but we can go one step better. If we look at the log output, we might see some input that takes a long time to process, but not be sure which one. We can annotate the log entry with whatever data we choose, simply by passing named arguments.

.($reporter-object) for @json-files.race(:1batch).map: -> $file {
    JsonHound::Logging::CheckFile.log: :$file, {
        # Analysis code here
    }
}

And with that, we’re ready! After adding a run configuration in Comma, and running it with the timeline viewer in, I get a chart like this:

jsonhound

The future

While Log::Timeline can already offer some interesting insights, there’s still plenty more functionality to come. Current work in progress uses a new MoarVM API to subscribe to GC events and send those over the timeline. This means it will be possible to visualize when GC runs and how much time is spent in it – and to correlate it with other events that are taking place.

I’m also interested in exposing various Rakudo-level events that could be interesting to see plotted on the timeline. With this, it may be possible to provide information on, for example, lock waiting times, or await times, or Supply contention times. Other ideas are plotting the times files or other resources are opened and closed, which may in turn help spot resource leaks.

Of course, just because there may be a wide range of things we could log does not mean they are all useful, and logging carries its own overhead. The use of the LogTimelineSchema naming convention looks forward to a further feature: being able to introspect the full set of events and tasks available. In Comma, we’ll then provide a UI to select them.

No small amount of the time we spend solving problems with our programs goes on finding out what is happening as they run. Good tooling can provide a window into the program’s behavior, and in some cases point out things that we might not even have considered. Log::Timeline is a pretty small module, and the visualizer didn’t take terribly long to implement either. However, the payback in terms of useful information makes it one of the most worthwhile thing I’ve built this year. I hope you might find it useful too.

Day 22 – Testing Cro HTTP APIs

A good amount of my work time this year has been spent on building a couple of Perl 6 applications. After a decade of contributing to Perl 6 compiler and runtime development, it feels great to finally be using it to deliver production solutions solving real-world problems. I’m still not sure whether writing code in an IDE I founded, using a HTTP library I designed, compiled by a compiler I implemented large parts of, and running on a VM that I play architect for, makes me one of the world’s worst cases of “Not Invented Here”, or just really Full Stack.

Whatever I’m working on, I highly value automated testing. Each passing test is something I know works – and something that I won’t break as I evolve the software in question. Even with automated tests, bugs happen, but adding a test to cover the bug at least means I’ll make different bugs in the future, which is perhaps a bit more forgivable.
Continue reading “Day 22 – Testing Cro HTTP APIs”

Day 9 – HTTP and Web Sockets with Cro

It’s not only Christmas time when gifts are given. This summer at the Swiss Perl Workshop – beautifully situated up in the Alps – I had the pleasure of revealing Cro. Cro is a set of libraries for building services in Perl 6, together with a little development tooling to stub, run, and trace services. Cro is intially focused on building services with HTTP (including HTTP/2.0) and web sockets, but early support for ZeroMQ is available, and a range of other options are planned for the future.

Reactive pipelines

Cro follows the Perl design principle of making the easy things easy, and the hard things possible. Much like Git, Cro can be thought of as having porcelain (making the easy things easy) and plumbing (making the hard things possible). The plumbing level consists of components that are composed to form pipelines. The components come in different shapes, such as sources, transforms, and sinks. Here’s a transform that turns a HTTP request into a HTTP response:

use Cro;
use Cro::HTTP::Request;
use Cro::HTTP::Response;
class MuskoxApp does Cro::Transform {
method consumes() { Cro::HTTP::Request }
method produces() { Cro::HTTP::Response }
method transformer(Supply $pipeline --> Supply) {
supply whenever $pipeline -> $request {
given Cro::HTTP::Response.new(:$request, :200status) {
.append-header: "Content-type", "text/html";
.set-body: "Muskox Rocks!\n".encode('ascii');
.emit;
}
}
}
}

Now, let’s compose it with a TCP listener, a HTTP request parser, and a HTTP response serializer:

use Cro::TCP;
use Cro::HTTP::RequestParser;
use Cro::HTTP::ResponseSerializer;
my $server = Cro.compose:
Cro::TCP::Listener.new(:port(4242)),
Cro::HTTP::RequestParser.new,
MuskoxApp,
Cro::HTTP::ResponseSerializer;

That gives back a Cro::Service, which we can now start, and stop upon Ctrl+C:

$server.start;
react whenever signal(SIGINT) {
$server.stop;
exit;
}

Run it. Then curl it.

$ curl http://localhost:4242/
Muskox Rocks!

Not bad. But what if we wanted a HTTPS server? Provided we’ve got key and certificate files handy, that’s just a case of replacing the TCP listener with a TLS listener:

use Cro::TLS;
my $server = Cro.compose:
Cro::TLS::Listener.new(
:port(4242),
:certificate-file('certs-and-keys/server-crt.pem'),
:private-key-file('certs-and-keys/server-key.pem')
),
Cro::HTTP::RequestParser.new,
MuskoxApp,
Cro::HTTP::ResponseSerializer;

Run it. Then curl -k it.

$ curl -k https://localhost:4242/
Muskox Rocks!

And middleware? That’s just another component to compose into the pipeline. Or, seen another way, with Cro everything is middleware. Even the request parser or response serializer can be easily replaced, should the need arise (which sounds like an odd thing to need, but that’s effectively what implementing FastCGI would involve).

So, that’s how Cro is plumbed. It also requires an amount of boilerplate to work at this level. Bring in the porcelain!

HTTP server, the easy way

The Cro::HTTP::Server class gets rid of the boilerplate of building the HTTP processing pipeline. The example from earlier becomes just:

use Cro;
use Cro::HTTP::Server;
class MuskoxApp does Cro::Transform {
method consumes() { Cro::HTTP::Request }
method produces() { Cro::HTTP::Response }
method transformer(Supply $pipeline --> Supply) {
supply whenever $pipeline -> $request {
given Cro::HTTP::Response.new(:$request, :200status) {
.append-header: "Content-type", "text/html";
.set-body: "Muskox Rocks!\n".encode('ascii');
.emit;
}
}
}
}
my $server = Cro::HTTP::Server.new: :port(4242), :application(MuskoxApp);
$server.start;
react whenever signal(SIGINT) {
$server.stop;
exit;
}

There’s no magic here; it really is just a more convenient way to compose a pipeline. And while that’s only so much of a saving for HTTP/1.*, a HTTP/2.0 pipeline involves some more components, and a pipeline that supports both is a bit more involved still. By comparison, it’s easy to configure Cro::HTTP::Server to do HTTPS with support for both HTTP/1.1 and HTTP/2.0:

my %tls =
:certificate-file('certs-and-keys/server-crt.pem'),
:private-key-file('certs-and-keys/server-key.pem');
my $server = Cro::HTTP::Server.new: :port(4242), :application(MuskoxApp),
:%tls, :http<1.1 2>;

The route to happiness

A web application in Cro is ultimately always a transform that turns a HTTP request into a HTTP response. It’s very rare to want to process all requests in exactly the same way, however. Typically, different URLs should be routed to different handlers. Enter Cro::HTTP::Router:

use Cro::HTTP::Router;
use Cro::HTTP::Server;
my $application = route {
get -> {
content 'text/html', 'Do you like dugongs?';
}
}
my $server = Cro::HTTP::Server.new: :port(4242), :$application;
$server.start;
react whenever signal(SIGINT) {
$server.stop;
exit;
}

The object returned by a route block does the Cro::Transform role, meaning it would work just fine to use it with Cro.compose(...) plumbing too. It’s a good bit more convenient to write an application using the router, however! Let’s look at the get call a little more closely:

get -> {
content 'text/html', 'Do you like dugongs?';
}

Here, get is saying that this handler will only deal with HTTP GET requests. The empty signature of the pointy block means no URL segments are expected, so this route only applies to /. Then, instead of having to make a response object instance, add a header, and encode a string, the content function does it all.

The router is built to take advantage of Perl 6 signatures, and also to behave in a way that will feel natural to Perl 6 programmers. Route segments are set up by declaring parameters, and literal string segments match literally:

get -> 'product', $id {
content 'application/json', {
id => $id,
name => 'Arctic fox photo on canvas'
}
}

A quick check with curl shows that it takes care of serializing the JSON for us also:

$ curl http://localhost:4242/product/42
{"name": "Arctic fox photo on canvas","id": "42"}

The JSON body serializer is activated by the content type. It’s possible, and pretty straightforward, to implement and plug in your own body serializers.

Want to capture multiple URL segments? Slurpy parameters work too, which is handy in combination with static for serving static assets, perhaps multiple levels of directories deep:

get -> 'css', *@path {
static 'assets/css', @path;
}

Optional parameters work for segments that may or may not be provided. Using subset types to constrain the allowed values work too. And Int will only accept requests where the value in the URL segment parses as an integer:

get -> 'product', Int $id {
content 'application/json', {
id => $id,
name => 'Arctic fox photo on canvas'
}
}

Named parameters are used to receive query string arguments:

get -> 'search', :$query {
content 'text/plain', "You searched for $query";
}

Which would be populated in a request like this:

$ curl http://localhost:4242/search?query=llama
You searched for llama

These too can be type constrained and/or made required (named parameters are optional by default in Perl 6). The Cro router tries to help you do HTTP well by giving a 404 error for failure to match a URL segments, 405 (method not allowed) when segments would match but the wrong method is used, and 400 when the method and segments are fine, but there’s a problem with the query string. Named parameters, through use of the is header and is cookie traits, can also be used to accept and/or constrain headers and cookies.

Rather than chugging through the routes one at a time, the router compiles all of the routes into a Perl 6 grammar. This means that routes will be matched using an NFA, rather than having to chug through them one at a time. Further, it means that the Perl 6 longest literal prefix rules apply, so:

get -> 'product', 'index' { ... }
get -> 'product', $what { ... }

Will always prefer the first of those two for a request to /product/index, even if you wrote them in the opposite order:

get -> 'product', $what { ... }
get -> 'product', 'index' { ... }

Middleware made easier

It’s fun to say that HTTP middleware is just a Cro::Transform, but it’d be less fun to write if that was all Cro had to offer. Happily, there are some easier options. A route block can contain before and after blocks, which will run before and after any of the routes in the block have been processed. So, one could add HSTS headers to all responses:

my $application = route {
after {
header 'Strict-transport-security', 'max-age=31536000; includeSubDomains';
}
# Routes here...
}

Or respond with a HTTP 403 Forbidden for all requests without an Authorization header:

my $application = route {
before {
unless .has-header('Authorization') {
forbidden 'text/plain', 'Missing authorization';
}
}
# Routes here...
}

Which behaves like this:

$ curl http://localhost:4242/
Missing authorization
$ curl -H"Authorization: Token 123" http://localhost:4242/
<strong>Do you like dugongs?</strong>

It’s all just a Supply chain

All of Cro is really just a way of building up a chain of Perl 6 Supply objects. While the before and after middleware blocks are convenient, writing middleware as a transform provides access to the full power of the Perl 6 supply/whenever syntax. Thus, should you ever need to take a request with a session token and make an asynchronous call to a session database, and only then either emit the request for further processing (or do a redirection to a login page), it’s possible to do it – in a way that doesn’t block other requests (including those on the same connection).

In fact, Cro is built entirely in terms of the higher level Perl 6 concurrency features. There’s no explicit threads, and no explicit locks. Instead, all concurrency is expressed in terms of Perl 6 Supply and Promise, and it is left to the Perl 6 runtime library to scale the application over multiple threads.

Oh, and WebSockets?

It turns out Perl 6 supplies map really nicely to web sockets. So nicely, in fact, that Cro was left with relatively little to add in terms of API. Here’s how an (overly) simple chat server backend might look:

my $chat = Supplier.new;
get -> 'chat' {
# For each request for a web socket...
web-socket -> $incoming {
# We start this bit of reactive logic...
supply {
# Whenever we get a message on the socket, we emit it into the
# $chat Supplier
whenever $incoming -> $message {
$chat.emit(await $message.body-text);
}
# Whatever is emitted on the $chat Supplier (shared between all)
# web sockets), we send on this web socket.
whenever $chat -> $text {
emit $text;
}
}
}
}

Note that doing this needs a use Cro::HTTP::Router::WebSocket; to import the module providing the web-socket function.

In summary

This is just a glimpse at what Cro has to offer. There wasn’t space to talk about the HTTP and web socket clients, the cro command line tool for stubbing and running projects, the cro web tool that provides a web UI for doing the same, or that if you stick CRO_TRACE=1 into your environment you get lots of juicy debugging details about request and response processing.

To learn more, check out the Cro documentation, including a tutorial about building a Single Page Application. And if you’ve more questions, there’s also a recently-created #cro IRC channel on Freenode.

Day 17 – Testing in virtual time

Over the last month, most of my work time has been spent building a proof of concept for a project that I’ll serve as architect for next year. When doing software design, I find spikes (time-boxed explorations of problems) and rapid prototyping really useful ways to gain knowledge of new problem spaces that I will need to work in. Finding myself with a month to do this before the “real” start of the project has been highly valuable. Thanks to being under NDA, I can’t say much about the problem domain itself. I can, however, say that it involves a reasonable amount of concurrency: juggling different tasks that overlap in time.

Perl’s “whipuptitude” – the ability to quickly put something together – is fairly well known. Figuring that Perl 6’s various built-in concurrency constructs would allow me to whip up concurrent things rapidly, I decided to build the proof of concept in Perl 6. I’m happy to report that the bet paid off pretty well, and by now the proof of concept has covered all of the areas I hoped to explore – and some further ones that turned out to matter but that weren’t obvious at the start.

To me, building rapid prototypes explicitly does not mean writing crappy code. For sure, simplifications and assumptions of things not critical to the problem space are important. But my prototype code was both well tested and well structured. Why? Because part of rapid prototyping is being able to evolve the prototype quickly. That means being able to refactor rapidly. Decent quality, well-structured, well-tested code is important to that. In the end, I had ~2,500 lines of code covered by ~3,500 lines of tests.

So, I’ve spent a lot of time testing concurrent code. That went pretty well, and I was able to make good use of Test::Mock in order to mock components that returned a Promise or Supply also. The fact that Perl 6 has, from the initial language release, had ways to express asynchronous values (Promise) or asynchronous streams of values (Supply) is in itself a win for testing. Concurrent APIs expressed via these standard data structures are easy to fake, since you can put anything you want behind a Promise or a Supply.

My work project didn’t involve a huge amount of dealing with time, but in the odd place it did, and I realized that testing this code effectively would be a challenge. That gave me the idea of writing about testing time-based code for this year’s Perl 6 advent, which in turn gave me the final nudge I needed to write a module that’s been on my todo list all year. Using it, testing things involving time can be a lot more pleasant.

Today’s example: a failover mechanism

Timeouts are one of the most obvious and familiar places that time comes up in fairly “everyday” code. To that end, let’s build a simple failover mechanism. It should be used as follows:

my $failover-source = failover($source-a, $source-b, $timeout);

Where:

  • $source-a is a Supply
  • $source-b is a Supply
  • $timeout is a timeout in seconds (any Real number)
  • The result, assigned to $failover-source, is also Supply

And it should function as follows:

  • The Supply passed as $source-a is immediately tapped (which means it is requested to do whatever is needed to start producing values)
  • If it produces its first value before $timeout seconds, then we simply emit every value it produces to the result Supply and ignore $source-b
  • Otherwise, after $timeout seconds, we also tap $source-b
  • Whichever source then produces a value first is the one that we “latch” on to; any results from the other should be discarded

Consider, for example, that $source-a and $source-b are supplies that, when tapped, will send the same query to two different servers, which will stream back results over time. Normally we expect the first result within a couple of seconds. However, if the server queried by $source-a is overloaded or has other issues, then we’d like to try using the other one, $source-b, to see if it can produce results faster. It’s a race, but where A gets a head start.

Stubbing stuff in

So, in a Failover.pm6, let’s stub in the failover sub as follows:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
    }
}

A t/failover.t then starts off as:

use Failover;
use Test;

# Tests will go here

done-testing;

And we’re ready to dig in to the fun stuff.

The first test

The simplest possible case for failover is when $source-a produces its first value in time. In this case, $source-b should be ignored totally. Here’s a test case for that:

subtest 'When first Supply produces a value in time, second not used', {
    my $test-source-a = supply {
        whenever Promise.in(1) {
            emit 'a 1';
        }
        whenever Promise.in(3) {
            emit 'a 2';
        }
    }
    my $test-source-b = supply {
        die "Should never be used";
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    is $output.receive, 'a 1', 'Received first value from source A';
    is $output.receive, 'a 2', 'Received second value from source A';
}

Here, we set up $test-source-a as a Supply that, when tapped, will emit a 1 after 1 second, and a 2 after 3 seconds. If $test-source-b is ever tapped it will die. We expect that if this wrongly happens, it will be at the 2 second mark, which is why a 2 is set to be emitted after 3 seconds. We then obtain a Channel from the resulting $failover-supply, which we can use to pull values from at will and check we got the right things. (On coercing a Supply to a Channel, the Supply is tapped, starting the flow of values, and each result value is fed into the Channel. Both completion and errors are also conveyed.)

Making it pass

There are a couple of ways that we might make this test pass. The absolute easiest one would be:

sub failover(Supply $source-a, Supply $source-b, Real $timeout) is export {
    return $source-a;
}

Which feels like cheating, but in TDD the code that passes the first test case almost always does. (It sometimes feels pointless to write said tests. More than once, they’ve ended up saving me when – while making a hard thing work – I ended up breaking the trivial thing.)

An equivalent, more forward-looking solution would be:

sub failover(Supply $source-a, Supply $source-b, Real $timeout) is export {
    supply {
        whenever $source-a {
            .emit;
        }
    }
}

Which is the identity operator on a Supply (just spit out everything you get). For those not familiar with supplies, it’s worth noting that this supply block does 3 useful things for you for free:

  • Passes along errors from $source-a
  • Passes along completion from $source-a
  • Closes the tap on $source-a – thus freeing up resources – if the tap on the supply we’re defining here is closed

Subscription management and error management are two common places for errors in asynchronous code; the supply/whenever syntax tries to do the Right Thing for you on both fronts. It’s more than just a bit of tinsel on the Christmas callback.

When the timeout…times out

So, time for a more interesting test case. This one covers the case where the $source-a fails to produce a value by the timeout. Then, $source-b produces a value within 1 second of being tapped – meaning its value should be relayed. We also want to ensure that even if $test-source-a were to emit a value a little later on, we’d disregard it. Here’s the test:

subtest 'When timeout reached, second Supply is used instead if it produces value first', {
    my $test-source-a = supply {
        whenever Promise.in(4) {
            emit 'a 1';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(1) { # start time + 2 (timeout) + 1
            emit 'b 1';
        }
        whenever Promise.in(3) { # start time + 2 (timeout) + 3
            emit 'b 2';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    is $output.receive, 'b 1', 'Received first value from source B';
    is $output.receive, 'b 2', 'Received second value from source B';
}

We expect a 1 to be ignored, because we chose $source-b. So, how can we make this pass? Here goes:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my $emitted-value = False;

        whenever $source-a {
            $emitted-value = True;
            .emit;
        }

        whenever Promise.in($timeout) {
            unless $emitted-value {
                whenever $source-b {
                    .emit;
                }
            }
        }
    }
}

Will this pass the test? Both subtests?

Think about it…

Well, no, it won’t. Why? Because it doesn’t do anything about disregarding $source-a after it has started spitting out values from $source-b. It needs to commit to one or the other. Didn’t spot that? Good job we have tests! So, here’s a more complete solution that makes both subests pass:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my enum Committed ;
        my $committed = None;

        whenever $source-a -> $value {
            given $committed {
                when None {
                    $committed = A;
                    emit $value;
                }
                when A {
                    emit $value;
                }
            }
        }

        whenever Promise.in($timeout) {
            if $committed == None {
                whenever $source-b -> $value {
                    $committed = B;
                    emit $value;
                }
            }
        }
    }
}

So tired of waiting

You’d think I’d be happy with this progress. Two passing test cases. Surely the end is in sight! Alas, development is getting…tedious. Yes, after just two test cases. Why? Here’s why:

$ time perl6-m -Ilib t/failover-bad.t
    ok 1 - Received first value from source A
    ok 2 - Received second value from source A
    1..2
ok 1 - When first Supply produces a value in time, second not used
    ok 1 - Received first value from source B
    ok 2 - Received second value from source B
    1..2
ok 2 - When timeout reached, second Supply is used instead if it produces value first
1..2

real    0m8.694s
user    0m0.600s
sys     0m0.072s

Every time I run my tests I’m waiting around 9 seconds now. And when I add more tests? Even longer! Now imagine I was going to write a whole suite of these failover and timeout routines, as a nice module. Or I was testing timeouts in a sizable app and would have dozens, even hundreds, of such tests.

Ouch.

Maybe, though, I could just make the timeouts smaller. Yes, that’ll do it! Here is how the second test looks now, for example:

subtest 'When timeout reached, second Supply is used instead if it produces value first', {
    my $test-source-a = supply {
        whenever Promise.in(0.04) {
            emit 'a 1';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(0.01) { # start time + 2 (timeout) + 1
            emit 'b 1';
        }
        whenever Promise.in(0.03) { # start time + 2 (timeout) + 3
            emit 'b 2';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 0.02);
    my $output = $failover-supply.Channel;

    is $output.receive, 'b 1', 'Received first value from source B';
    is $output.receive, 'b 2', 'Received second value from source B';
}

You want it faster? Divide by 100! Job done.

Of course, anybody who has actually done this knows precisely what comes next. The first 3 times I ran my tests after this change, all was well. But guess what happened on the forth time?

ok 1 - When first Supply produces a value in time, second not used
    not ok 1 - Received first value from source B

    # Failed test 'Received first value from source B'
    # at t/failover-short.t line 41
    # expected: 'b 1'
    #      got: 'a 1'
    not ok 2 - Received second value from source B

    # Failed test 'Received second value from source B'
    # at t/failover-short.t line 42
    # expected: 'b 2'
    #      got: 'b 1'
    1..2
    # Looks like you failed 2 tests of 2
not ok 2 - When timeout reached, second Supply is used instead if it produces value first

Mysteriously…it failed. Why? Bad luck. My computer is a busy little machine. It can’t just give my test programs all the CPU all of the time. It needs to decode that music I’m listening to, check if I need to install the 10th set of security updates so far this month, and cope with my web browser wanting to do stuff because somebody tweeted something or emailed me. And so, once in a while, just after the clock hits 0.01 seconds and a thread grabs the whenever block to work on, that thread will be dragged off the CPU. Then, before it can get back on again, the one set to run at 0.04 seconds gets to go, and spits out its value first.

Sufficiently large times mean slow tests. Smaller values mean unreliable tests. Heck, suspend the computer in the middle of running the test suite and even a couple of seconds is too short for reliable tests.

Stop! Virtual time!

This is why I wrote Test::Scheduler. It’s an implementation of the Perl 6 Scheduler role that virtualizes time. Let’s go back to our test code and see if we can do better. First, I’ll import the module:

use Test::Scheduler;

Here’s the first test, modified to use Test::Scheduler:

subtest 'When first Supply produces a value in time, second not used', {
    my $*SCHEDULER = Test::Scheduler.new;

    my $test-source-a = supply {
        whenever Promise.in(1) {
            emit 'a 1';
        }
        whenever Promise.in(3) {
            emit 'a 2';
        }
    }
    my $test-source-b = supply {
        die "Should never be used";
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    $*SCHEDULER.advance-by(3);
    is $output.receive, 'a 1', 'Received first value from source A';
    is $output.receive, 'a 2', 'Received second value from source A';
}

Perhaps the most striking thing is how much hasn’t changed. The changes amount to two additions:

  1. The creation of a Test::Scheduler instance and the assignment to the $*SCHEDULER variable. This dynamic variable is used to specify the current scheduler to use, and overriding it allows us to swap in a different one, much like you can declare a $*OUT to do stuff like capturing I/O.
  2. A line to advance the test scheduler by 3 seconds prior to the two assertions.

The changes for the second test are very similar:

subtest 'When timeout reached, second Supply is used instead if it produces value first', {
    my $*SCHEDULER = Test::Scheduler.new;

    my $test-source-a = supply {
        whenever Promise.in(4) {
            emit 'a 1';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(1) { # start time + 2 (timeout) + 1
            emit 'b 1';
        }
        whenever Promise.in(3) { # start time + 2 (timeout) + 3
            emit 'b 2';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    $*SCHEDULER.advance-by(6);
    is $output.receive, 'b 1', 'Received first value from source B';
    is $output.receive, 'b 2', 'Received second value from source B';
}

And what difference does this make to the runtime of my tests? Here we go:

$ time perl6-m -Ilib t/failover-good.t
    ok 1 - Received first value from source A
    ok 2 - Received second value from source A
    1..2
ok 1 - When first Supply produces a value in time, second not used
    ok 1 - Received first value from source B
    ok 2 - Received second value from source B
    1..2
ok 2 - When timeout reached, second Supply is used instead if it produces value first
1..2

real    0m0.679s
user    0m0.628s
sys     0m0.060s

From 9 seconds to sub-second – and much of that will be fixed overhead rather than the time running the tests.

One more test

Let’s deal with the final of the requirements, just to round off the test writing and get to a more complete solution to the original problem. The remaining test we need is for the case where the timeout is reached, and we tap $source-b. Then, before it can produce a value, $source-a emits its first value. Therefore, we should latch on to $source-a.

subtest 'When timeout reached, and second Supply tapped, first value still wins', {
    my $*SCHEDULER = Test::Scheduler.new;

    my $test-source-a = supply {
        whenever Promise.in(3) {
            emit 'a 1';
        }
        whenever Promise.in(4) {
            emit 'a 2';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(2) { # start time + 2 (timeout) + 2
            emit 'b 1';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    $*SCHEDULER.advance-by(4);
    is $output.receive, 'a 1', 'Received first value from source A';
    is $output.receive, 'a 2', 'Received second value from source A';
}

This fails, because the latch logic wasn’t included inside of the whenever block that subscribes to $source-b. Here’s the easy fix for that:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my enum Committed ;
        my $committed = None;

        whenever $source-a -> $value {
            given $committed {
                when None {
                    $committed = A;
                    emit $value;
                }
                when A {
                    emit $value;
                }
            }
        }

        whenever Promise.in($timeout) {
            if $committed == None {
                whenever $source-b -> $value {
                    given $committed {
                        when None {
                            $committed = B;
                            emit $value;
                        }
                        when B {
                            emit $value;
                        }
                    }
                }
            }
        }
    }
}

The easy thing is just a little bit repetitive, however. It would be nice to factor out the commonality into a sub. Here goes:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my enum Committed ;
        my $committed = None;

        sub latch($onto) {
            given $committed {
                when None {
                    $committed = $onto;
                    True
                }
                when $onto {
                    True
                }
            }
        }

        whenever $source-a -> $value {
            emit $value if latch(A);
        }

        whenever Promise.in($timeout) {
            if $committed == None {
                whenever $source-b -> $value {
                    emit $value if latch(B);
                }
            }
        }
    }
}

And in under a second, the tests can now assure us that this was indeed a successful refactor. Note that this does not yet cancel a discarded request, perhaps saving duplicate work. I’ll leave that as an exercise for the reader.

Safety and realism

One thing you might wonder about here is whether this code is really thread safe. The default Perl 6 scheduler will schedule code across a bunch of threads. What if $source-a and $source-b emit their first value almost simultaneously?

The answer is that supply (and react) blocks promise Actor-like semantics, processing just one message at a time. So, if we’re inside of the whenever block for $source-a, and $source-b emits a message on another thread, then it will be queued up for processing afterwards.

One interesting question that follows on from this is whether the test scheduler somehow serializes everything onto the test thread in order to do its job. The answer is that no, it doesn’t do that. It wraps the default ThreadPoolScheduler and always delegates to it to actually run code. This means that, just as with the real scheduler, the code will run across multiple threads and on the thread pool. This helps to avoid a couple of problems. Firstly, it means that testing code that relies on having real threads (by doing stuff that really blocks a thread) is possible. Secondly, it means that Test::Scheduler is less likely to hide real data race bugs that may exist in the code under test.

Of course, it’s important to keep in mind that virtual time is still very much a simulation of real time. It doesn’t account for the fact that running code takes time; virtual time stands still while code runs. At the same time, it goes to some amount of effort to get the right sequencing when a time-based event triggered in virtual time leads to additional time-based events being scheduled. For example, imagine we schedule E1 in 2s and E2 in 4s, and then advance the virtual time by 4s. If the triggering of E1 schedules E3 in 1s (so, 3s relative to the start point), we need to have it happen before E2. To have this work means trying to identify when all the consequences of E1 have been shaken out before proceeding (which is easier said than done). Doing this will, however, prevent some possible overlaps that could take place in real time.

In summary…

Unit tests for code involving time can easily end up being slow and/or unreliable. However, if we can virtualize time, it’s possible to write tests that are both fast and reliable – as good unit tests should be. The Test::Scheduler module provides a way to do this in Perl 6. At the same time, virtual time is not a simulation of the real thing. The usual rules apply: a good unit test suite will get you far, but don’t forget to have some integration tests too!

Day 14 – A nice supplies: syntactic relief for working with asynchronous data

Asynchronous data is all around us. What is it? It’s data that arrives whenever it pleases, as opposed to when we ask for it. Some common examples are:

  • GUI events
  • Data arriving over an async socket
  • Messages arriving from a message queue
  • Ticks of a timer
  • File change notifications
  • Signals

These contrast with synchronous data, which we get when we ask for it. For example, when we query a database and iterate the resulting rows, or iterate through file system entries in a directory, we block until the data we wish to have is available.

Our programming languages tend to be pretty good about synchronous data, giving us a range of syntactic constructs for producing and processing it. For example, in Perl 6 we have for loops for consuming streams of synchronously produced values, and gather blocks for producing them – perhaps doing so by in turn using for loops to consume other synchronous data sources. Inside for and gather, we’re free to use state (kept in variables), and use our comfortable range of conditionals (if, when, etc.) That is to say, we’re free to act like normal, down to earth, imperative programmers in dealing with our synchronous data. Sure, sometimes something just looks far nicer using the functional style, and we map, grep, zip, and reduce our way to the solution. But some problems are just unnatural – at least for most of us, me included – to see in that way.

In Perl 6, we have a common interface for talking about things that produce values over time – that is, synchronous data. These things do the Iterable role, and work in terms of Iterator objects. You never really see an Iterator in day to day Perl 6, and instead see the Seq type. These Seqs are the things gather blocks produce, and for loops can consume.

In the last years, we introduced supplies to Perl 6. Supplies are our common interface for talking about asynchronous data. These have, from the beginning, been rather well received. With supplies, you can grep UI events, map file system notifications, and zip whatever you fancied with ticks of a timer. And that’s just great when it’s easy to think about your problem in functional terms. But what about the rest of the time?

In the last year, we’ve also added supply/react blocks, along with the whenever asynchronous looping construct. In this post, we’ll take a look at how they might be used.

STOMP!

As I wrote the list of sources of asynchronous data at the start of this post, I realized that there was only one of them that I’d not yet done in Perl 6: working with message queues. Hmm. Two hours until midnight. Advent post needs completing before I sleep. Challenge accepted!

10 minutes later, and I’ve RabbitMQ happily installed and am reading the STOMP specification. STOMP is a simple text-oriented message protocol – that is, a text-based way to deal with message queues. Within an hour, I’d got a working STOMP client handling the absolute basics of sending messages to a queue and subscribing to a queue to get messages. So, let’s take a look at how I did it.

First, I sketched in a Stomp::Client class. I figured it would need the host and port we want to connect to, the connection credentials (yes, yes, this is not the height of security engineering), and something to hold the socket representing the connection to the server.

class Stomp::Client {
    has Str $.host is required;
    has Int $.port is required;
    has Str $.login = 'guest';
    has Str $.password = 'guest';
    has $!connection;
}

So, let’s get connected. The connect method will contain a start block, because we want it to function asynchronously. This means the method will return a Promise that the caller can await. To connect to a stomp server, you establish a TCP connection and send a CONNECT frame:

method connect() {
    start {
        my $conn = await IO::Socket::Async.connect($!host, $!port);
        await $conn.print: qq:to/FRAME/;
            CONNECT
            accept-version:1.2
            login:$!login
            passcode:$!password
            
            \0
            FRAME
        True
    }
}

But…then what? Well, then you wait for the server to either return a CONNECTED frame or an ERROR frame. Hm, seems we’ve some work to do. First is to translate the BNF from the STOMP specification into a Perl 6 grammar:

my grammar StompMessage {
    token TOP {
        <command> \n
        [<header> \n]*
        \n
        <body>
        \n*
    }
    token command {
        < CONNECTED MESSAGE RECEIPT ERROR >
    }
    token header {
        <header-name> ":" <header-value>
    }
    token header-name {
        <-[:\r\n]>+
    }
    token header-value {
        <-[:\r\n]>*
    }
    token body {
        <-[\x0]>* )> \x0
    }
}

Note it’s lexically scoped inside of our STOMP::Client class. So is the following little message data structure:

my class Message {
    has $.command;
    has %.headers;
    has $.body;
}

An IO::Socket::Async connection exposes incoming data as a Supply. There are a few things to consider:

  • A message may be spread over multiple packets
  • Two messages may be in one packet
  • The next packet may arrive while we’re processing the current one, and since Perl 6 delivers I/O notifications on the thread pool – like various other languages – we might worry about data races

In fact, the third point is a non-problem: Perl 6 promises that, unless you go out of your way to avoid it, supplies are serial, and will only have you processing one message at a time on a given message pipeline. Supplies are a tool for taming concurrency rather than introducing it.

Now, if I was writing this synchronously, and I wanted to expose a Seq of the incoming messages, I’d use a gather block, keep myself a buffer, recv each incoming chunk of data from the socket, and try to parse a new message at the start of the buffer. Whenever I got a message I’d take it. What about with supplies? Here’s how it looks:

method !process-messages($incoming) {
    supply {
        my $buffer = '';
        whenever $incoming -> $data {
            $buffer ~= $data;
            while StompMessage.subparse($buffer) -> $/ {
                $buffer .= substr($/.chars);
                if $<command> eq 'ERROR' {
                    die ~$<body>;
                }
                else {
                    emit Message.new(
                        command => ~$<command>,
                        headers => $<header>
                            .map({ ~.<header-name> => ~.<header-value> })
                            .hash,
                        body => ~$<body>
                    );
                }
            }
        }
    }
}

Taking it from the top, $incoming will be the Supply of data arriving from the socket, decoded to strings. We wrap the code in this method up in a supply block, since we’d like to return a new Supply. We declare $buffer, to hold data we’ve yet to parse. In this case, our thread safety is fairly trivial: $incoming will deliver a message at a time. But what if we were going to bring together data from many supplies? We’d still be fine: a supply block promises that, per tapping of the Supply, only one thread will ever be allowed in the code inside of the supply block at a time.

The whenever construct is a asynchronous loop. It taps the supply we specify, and the loop body will run whenever a value is emitted by that supply. In our case, that is every time we get data from the socket. We concatenate the incoming $data onto the $buffer. We then try to parse a message at the start of the buffer (subparse means that we don’t mind if we don’t reach the end of the buffer, unlike parse which will complain if it doesn’t completely parse the incoming string). When we do parse data, we lop it off the start of the buffer.

For each message we parse, we first see if it’s an ERROR frame; if so, we simply die with the message. A die inside a whenever block will propagate the error to whatever tapped the supply, so errors will not be lost. Otherwise, we’ve some kind of more interesting message; we turn the parsed data into a Message instance and emit it.

A supply block gives an “on-demand” supply. Each tap performed on the block will run the supply block, which in turn will tap each of the whenevers. That’s the right thing in most cases, but for my Stomp::Client I’d like to have various bits of code tap the same messages, to look for interesting things. One of these bits of code will look for a single CONNECTED message. So, I’ll add a $!incoming attribute to my class:

has $!incoming;

And then, after establishing the connection to the message queue server, do this:

$!incoming = self!process-messages($conn.Supply).share;

That is, I share the result of processing messages out amongst everything that is interested. And interested things will tap the $!incoming supply and filter out what they care about. For example, here’s how I create a Promise that will be kept when we receive a CONNECTED frame:

my $connected = $!incoming
    .grep(*.command eq 'CONNECTED')
    .head(1)
    .Promise;

Here is the connect method in full:

method connect() {
    start {
        my $conn = await IO::Socket::Async.connect($!host, $!port);
        $!incoming = self!process-messages($conn.Supply).share;
        
        my $connected = $!incoming
            .grep(*.command eq 'CONNECTED')
            .head(1)
            .Promise;
        await $conn.print: qq:to/FRAME/;
            CONNECT
            accept-version:1.2
            login:$!login
            passcode:$!password
            
            \0
            FRAME
        await $connected;
        $!connection = $conn;
        
        True
    }
}

Sending messages is easy:

method send($topic, $body) {
    self!ensure-connected;
    $!connection.print: qq:to/FRAME/;
        SEND
        destination:/queue/$topic
        content-type:text/plain
 
        $body\0
        FRAME
}

And subscription is implemented using another supply block, and a bit of simple filtering on the incoming message:

method subscribe($topic) {
    self!ensure-connected;
    state $next-id = 0;
    supply {
        my $id = $next-id++;
        
        $!connection.print: qq:to/FRAME/;
            SUBSCRIBE
            destination:/queue/$topic
            id:$id
 
            \0
            FRAME
        
        whenever $!incoming {
            if .command eq 'MESSAGE' && .headers<subscription> == $id {
                emit .body;
            }
        }
    }
}

And with that, we have a working STOMP client. Let’s try it:

my $queue = Stomp::Client.new(host => 'localhost', port => 61613);
await $queue.connect();
await $queue.send("test", "hello world");
react {
    whenever $queue.subscribe('test') {
        .say;
    }
}

Here, for the first time, we encounter the react block. What’s that? Well, if you imagine that supply blocks are a little like gather – that is, used to create something that can produce values – a react block is for the places you might normally use a for loop when processing synchronous data. However, in the asynchronous world you’re usually interested in a range of things that might happen. A react block runs, sets up all the whenevers, and then blocks until either all of the supplies tapped by whenever blocks are done, or the “done” control operator is used somewhere inside of react.

A little example

Suppose that we wanted to build a simple monitoring system. Workers “sign on”, and must send us a ping every 10 seconds. If they fail to do so, then a monitor will report this. Workers can sign off when finished. Here is the worker process:

sub MAIN($name) {
    my $queue = Stomp::Client.new(host => 'localhost', port => 61613);
    await $queue.connect();
    $queue.send('signon', $name);
    react {
        whenever Supply.interval(10, 5) {
            if <y y n>.pick eq 'y' {
                $queue.send('ping', $name);
            }
        }
    
        whenever signal(SIGINT) {
            $queue.send('signoff', $name);
            done;
        }
    }
}

We start it from the command line with a name. Every 10 seconds it sends a ping – or at least, two thirds of the time it will. And if we hit Ctrl+C the worker signs off.

Next, here’s the monitor:

my $queue = Stomp::Client.new(host => 'localhost', port => 61613);
await $queue.connect();
react {
    my %last-active;
 
    whenever $queue.subscribe('signon') -> $name {
        %last-active{$name} = now;
    }
  
    whenever $queue.subscribe('ping') -> $name {
        %last-active{$name} = now;
    }
  
    whenever $queue.subscribe('signoff') -> $name {
        %last-active{$name}:delete;
    }
 
    whenever Supply.interval(10) {
        for %last-active.kv -> $name, $last-ping {
            if now - $last-ping > 11 {
                say "No ping from $name";
            }
        }
    }
}

Note that the react block, like supply block, enforces that only a single thread at a time may be operating across the various whenever blocks, so the %last-active hash is safe. We have three subscriptions to queues, and every ten seconds look for missing pings. We give an extra second’s grace on those.

Notice how, by exposing the message queue in terms of supplies, we are able to effortlessly compose this asynchronous data with both signals and time. This is where the real power of a uniform interface to asynchronous data comes in. And, with a little syntactic support, we are no longer required to put our functional hat on to wield that power.

Day 11 – So, what does MoarVM do with your Perl 6 code?

The first day of the advent calendar remarked that during 2014, MoarVM became the de facto standard virtual machine to run Perl 6 on. And, with Perl 6 on course to hit 6.0.0 in 2015, MoarVM is what most people adopting Perl 6 at this point will be working with. In this post, we’ll take a look at what MoarVM brings to the table, and where it’s heading from here.

But first – why a VM?

Many modern languages depend on a good deal of runtime support. Perl 6 is very much in this camp, and the same goes for Perl 5. A VM is in the business of providing that support. With Perl 5, the word “interpreter” tends to be used over VM, but the goal is the same: provide the things needed to execute programs in the language across disparate hardware and operating system environments. The difference is mostly that we tend to use VM when there’s a clean separation of the language compiler and the runtime support – meaning that the VM might also be used to execute other languages. This has happened with the JVM, for example. MoarVM is squarely aimed at being a good host for the Rakudo Perl 6 compiler, and the NQP subset of Perl 6 that Rakudo is mostly implemented in. If any other language finds it useful, that’s fine, but it’s not a primary design consideration.

So why care so much over a clean interface between compiler and runtime? Because, in a project the size of “implement Perl 6”, managing implementation complexity is crucial. Establishing a clean boundary between the compiler and the runtime, and knowing what each side is responsible for, is one strategy for doing that. (Rakudo’s compilation process is also broken up into a pipeline of stages that only communicate by passing well-defined data structures from one to the next, which is another form of architectural boundary.)

But why interpreters and VMs at all? Why not go straight down to the metal? For any language where we can reasonably figure out a lot about the program statically, that is a fine strategy. The information needed to eliminate many forms of late binding, checks, and duplicate work is to hand at compile time, and a decent optimizer will be able to use that information. Of course, this approach restricts us to what can be seen at compile time. Some compilers as a result support profile-guided optimization, where we run the program on typical workloads, collect information about where time is spent and what code-paths are common and uncommon, and then compile the program again taking that data into account.

Profile-guided optimization hints at a strategy for efficiently executing languages where we can know rather less at compile time: put off most optimization until we observe the program running, and dynamically replace parts of the program with optimized versions. This isn’t a new idea, and there has been a massive amount of work done in this area. At its heart is the general principle that most programs are, if I may pun on a term from the database field, “eventually static”. Even though a Perl 6 program may have a huge number of points of potential dynamism and genericity, in most programs just a handful of them actually ever exploit it. Of course, different runs of the program on different inputs may tickle different bits of that dynamism, not to mention that as a program grows and evolves, these flexible points will be exploited here and there to (hopefully cleanly) add new functionality.

A modern VM aiming to support a language like Perl 6 is not, therefore, just there to run the code and provide services like garbage collection, OS abstraction, and so on. Its job is to take a program in all its dynamic glory, watch what code paths it actually takes, see what types really show up – and then produce optimized versions of carefully selected parts of the program, with the unused dynamism ruthlessly torn out, and much of the remaining dynamism massaged to more restricted and faster forms.

Bytecode specialization

So what exactly does MoarVM do with a program? First, it needs to find code worth considering optimizing. This is done by seeking the hot parts of a program: routines that are called again and again, and loops that are doing a lot of iterations. Since time to optimize a routine is proportional to the size of the routine, the threshold for “it’s worth considering” goes with code size. A small routine – such as a simple operator or an accessor method – has rather little code and will become hot quickly. A larger routine heats up more slowly. It’s just like our kettle: a little water boils quickly, but fill it up and we’re going to be waiting a bit for that cuppa. This is in part about risk management: we would prefer to avoid investing time optimizing code that’s never going to give a positive return on investment. We can’t predict the future, but some cheap, simple heuristics are at least a good start.

So, we’ve got some hot code. What now? Well, if it’s a call, we start by looking at the callsites that are invoking it. The callsite tells us how many arguments are being passed, whether they are native values or objects, and – for named arguments – what their names are. We can then start to produce bytecode specialized by callsite. Know that the object parameter is passed an object argument? Then replace the op that checks if we need to box the incoming argument with a simpler op that just copies the pointer. Know that an optional parameter actually doesn’t get passed? Then strip out the code to look for it, toss the branch, and just always run the code that sets the default value. Know that the named parameter “config” is always the first passed named argument? Then retrieve it by indexed access (so a cheap pointer copy), rather than messing about with strings.

Next up: types. If we are passed arguments, what types are they? If we look up constants, what types to those have? Knowing these is relatively easy: we just peek at the args buffer to see what’s being passed, or look up the constant and see what type it is. However, knowing them also presents us with a huge range of opportunities. See that method call? It doesn’t need to look up the method in a table each call any more, since we know where it’s going. And that attribute access? Since we know the object layout, it can just become a pointer offset from the object’s memory address. That type check? We often know the answer statically now – and can eliminate entire branches that won’t be taken. Oh, and that multiple dispatch? Let’s just resolve it once now we know the types we’re dealing with, not every single call. The list goes on.

Specialization works at the bytecode level, because that’s what the VM gets. And a specialization of a given piece of bytecode comes with a set of guards: conditions that must be met for it to be applicable. Those can constrain it by callsite and argument types. And we can produce multiple specializations for a given piece of original bytecode.

Of course, choosing the appropriate specialization is a cost too. It would eat into our winnings if we had to do that on every call. Thankfully, there are ways to avoid that. If we are optimizing a call to something and know the argument types being passed, we can pick the specialization of the callee and optimize the caller to always call that specialization, rather than having to hunt for the right one each time. And, if the callee is rather small, we can often do away with the call entirely and simply inline the callee into the caller – eliminating the need to create a callframe and keeping the code tighter, which CPU caches like.

Optimization speculation

So, all this is nice, but we want MOAR MOAR MOAR. Because there are lots of other things that we can’t statically figure out the types of, but that may end up having very stable types come runtime. Some examples are lexical variables that we close over, object attributes, and return values of our callees. So how do we get hold of these types?

When code becomes hot, we don’t actually optimize it right away. Well, we do for the simple callsite-related transformations. But before going and doing the type-driven stuff, we take a closer look at what’s actually going on in the program. The specializer quickly produces an instrumented version of the program that records what types show up where. This runs for the next bunch of invocations, and logs what it sees. After a threshold is hit, we have a look at the types that showed up and do a little analysis. If a given logging site consistently logs the same type, we’re onto something. If we see more than one type show up there, we consider that part of the program as being typically dynamic and leave it be.

The thing is, we didn’t actually prove a property of the program here, just profiled it and figured out what tends to happen. So how can we use this information? The trick is to insert a guard into the specialized program that cheaply asserts that we got the expected type. Beyond that guard, we can optimize assuming we really did. (We also make sure that we actually do an optimization based upon what the guard checks, and don’t bother to insert it otherwise.)

Which is all well and good when the guard is met, but what about when the assertion fails? In fact, this is a more general problem. Even the far simpler type-based optimizations on incoming parameters assume that an object’s type will not change – and of course, thanks to mixins, they can. All of these situations trigger deoptimization: replacing the optimized code we’re in the midst of running with the unoptimized, naive, but safe version that is dynamic enough to handle all the cases.

Deoptimization is quite a pain to get right. For one, if we were inside of inlined code, we have to go back and create all the callframes we skipped. Since inlining can be many levels deep, that can be a bunch of callframes to wire up. Then we have to make sure that our optimizations don’t throw away code that produces a value the deoptimized code relies on being there, but the optimized code would otherwise not. Of course, we still need to do dead code elimination in general; it’s just that it has to be aware of deoptimization.

Therefore, it’s not only important that MoarVM can optimize code. It’s also critical to its ability to do so that it can also deoptimize, falling back to slow paths when needed.

Down to machine code

Interpreting specialized bytecode can yield, in some programs, a significant improvement. The simple program:

my $i = 0; while ++$i <= 100000000 { }

Got a factor of 2.5 improvement with what we had several months ago, and there are still a lot of opportunities left (some of which we’ve already explored, and with others yet to go). However, interpreting – that is, looping over an instruction stream and choosing what to do for each instruction – has its overhead. Before the specialization process eliminates a lot of the dynamism, interpretation is only so bad; some ops have some checks to do, and so they cost a bit. But specialized code tends to have a lot of very cheap operations that just play with pointers – and then the overhead of interpreting can quickly come to account for the majority of the execution time.

Therefore, on x64, we can often take the further step of producing machine code to run, instead of specialized bytecode to interpret. Of course, we have to be ready to deoptimize from that too. I’ll save talking about the machine code part too much, since the guy who worked on it is, I believe, plotting an advent post about it in the near future. Needless to say, it makes a dramatic difference; for the benchmark above it more than halved the time again relative to the specialized bytecode.

We often referred to the “produce machine code” part of the work as “the JIT”. But really, the entire set of optimizations described in this article are part of the typical work a JIT compiler does. Producing machine code is just one small part, and it’s interesting to note that the win from eliminating dynamism was a bigger one than eliminating interpretation overhead.

What next?

The work so far has had a dramatic impact on performance. However, there’s still plenty of areas for improvement.

For one, the number of specializations produced and how we choose them can certainly use some tweaking. It’s possible for a routine to saturate its number of allowed specializations with ones that, in the long run, are not entirely strategic. Or we may be able to reasonably argue that it should be allowed more. At the same time, specialized bytecode is a notable memory overhead at this point. So, we need to produce both more and less specializations. Of course, what that really means is we need a better strategy for picking what is worth doing.

Another clear issue is that some speculative optimizations turn out to be bad bets, and we have no way of recovering from that. We can deoptimize pretty fast (in fact, some trade-offs have been made to have it be quite affordable). But it’s still a cost, and it’s possible a gamble that looked good based on available data may lead to code that has to be deoptimized a majority of the time – and thus run slower than if we’d never bothered. Again, it’s about being smarter and keeping on learning as we run the program.

Then there’s a bunch of things that we don’t specialize or compile to machine code in a good way yet. Array access is an obvious one, and big integer operations – which are at the heart of Perl 6’s Int type – are another. The specializer is also not as aware as it really should be of closures. These are all areas where we could do a whole lot better – and they are nice, incremental advances on what we already have.

A further interesting area for exploration is doing escape analysis, and being able to eliminate a bunch of GC allocations in favor of allocating the memory as part of the call frame, because we know it can never end up being used beyond it. This is a fair amount of work, but also highly relevant to Perl 6: many scalar containers for lexical variables would be great candidates.

Then there’s the whole idea of trace JIT. MoarVM so far has a routine-level JIT, often called a method JIT. There, we optimize at routine level first, and may do some inlining to flatten away the call tree. This is a pretty decent strategy for Perl 6, in so far as we’re often very interested in inlining primitive operators that, naively interpreted in the original dynamic program, would be multiple dispatch calls. Trace JITs, by contrast, just follow the flow of instructions, over however many callframes, and every single conditional branch becomes a guard. Their strength is loops, which today we cope with using On Stack Replacement (detect we’re in a hot loop, compile an optimized version of it, and hot-swap it). While it’s easy to talk about trace JIT vs. method JIT as a “pick one”, I’m much more interested in the “pick both” track. They both, after all, share a lot of infrastructural needs – and they have their strengths and weaknesses. There’s more than one way to dynamically optimize it, and, this being Perl, we might as well just do all the ways – spotting all the nice unifications and re-use opportunities along the way.

Day 19 – Perl 6 Supplies Reactive Programming

Several days back, we took a look at promises and channels. Promises provided a synchronization mechanisms for asynchronous things that produced a single result, while channels were ideal for setting up producer/consumer style workflows, with producers and consumers able to work in parallel. Today we’ll take a look at a third mechanism aimed at introducing asynchrony and coping with concurrency: supplies!

Synchronous = Pull, Asynchronous = Push

One of the most important features of promises is the then method. It enables one or more things to be run whenever the asynchronous work the promise represents is completed. In a sense, it’s like a publish/subscribe mechanism: you call then to subscribe, and when the work is done then the notification is published. This happens asynchronously. You don’t sit around waiting for the promise to complete, but instead just say what to do when this takes place. Thinking about the way data flows, values are pushed along to the next step of the process.

This is rather different to things like sub and method calls. There, we make the call, then we block until it has completed. Iterating over a lazy list is the same: you can’t progress with the iteration until the next value is available. Thus, iteration is really about pulling stuff from a source. So, if a promise can be thought of as something that can push a single values out as it becomes available, do we have something that can push a whole stream of values outwards, as they are produced over time?

Supplies! We do!

A while back, it was realized that the observer pattern is the mathematical dual of the iterator pattern. Why is this exciting? Quite simply, because it means that all the things you can sensibly do with something you can iterate (map, grep, zip, etc.), you can also sensibly do with something you can observe. Out of this was born reactive programming, and the Rx (Reactive Extensions) library, which has now been ported to many platforms. In Perl 6, we’re providing support for this in core.

The basics

Let’s start out simple. First, we create a new Supply:

my $measurements = Supply.new;

We can then tap the supply, passing a closure that should be called whenever a value is made available:

$measurements.tap(-> $value {
    say "Measured: $value";
});

Finally, we produce some values:

$measurements.more(1.5);
$measurements.more(2.3);
$measurements.more(4.6);

On each of these calls, the closure we tapped the supply with is invoked. Just as we can call then many times, so we can tap many times too:

$measurements.tap(-> $value {
    say "Also measured: $value";
});

Now, when we produce a value:

$measurements.more(2.8);

Both of the closures tapping the supply will be called. Note that tap returns an object which can be used to express you’re no longer interested in the supply, essentially turning that tap off.

Note that we didn’t introduce any asynchrony so far. However, supplies are built for it. You can safely have multiple threads supplying values. By default, the supplying thread is used to execute the taps.

Enter the combinators!

Since supplies are essentially a thread-safe observer implementation, we can define many of the same things on them as we’re used to having on lists. For example, imagine we just wanted to tap high measurements. Well, we just re-use knowledge from how we’d filter a list: using grep!

$measurements.grep(* > 4).tap(-> $value {
    say "HIGH: $value";
});

Calling grep on a supply produces another supply, just as calling grep on a list gives another list. We could, if we wished, store it in a variable and tap this derived supply many times, grep it again, map it, etc.

Supply factories

There are ways to get supplies besides simply creating them directly. The Supply class has various factory methods that create various interesting kinds of supply, while introducing asynchrony. For example, interval gives a supply that, when tapped, will produce an ascending integer once per time interval.

my $secs = Supply.interval(1);
$secs.tap(-> $s { say "Started $s seconds ago" });
sleep 10;

Factories can also help map between paradigms. The Supply.for method produces a supply that, when tapped, will iterate the specified (potentially lazy) list and push the values out to the tapper. It does the iteration asynchronously. While it’s not implemented yet, we’ll be able to define a similar mechanism for taking a Channel and tapping each value that is received.

Crossing the streams

Some of the most powerful – and tricky to implement – combinators are those that involve multiple supplies. For example, merge gives a single supply whose values are those of the two other supplies it tapped, and zip pairs together values from two different supplies. These are tricky to implement because it’s entirely possible that two different threads will be supplying values. Thankfully, though, we just need to manage this once inside of the supplies implementation, and save those using them from worrying about the problem! In a sense, combinators on lists factor out flow control, while combinators on supplies factor out both flow control and synchronization. Both let us program in a more declarative style, getting the imperative clutter out of our code.

Let’s bring all of this together with an example from one of my recent presentations. We simulate a situation where we have two sets of readings coming in: first, measurements from a belt, arriving in batches of 100, which we need to calculate the mean of, and second another simpler value arriving once every 5 seconds. We want to label them, and get a single supply merging these two streams of readings together. Here’s how it can be done:

my $belt_raw = Supply.interval(1).map({ rand xx 100 });
my $belt_avg = $belt_raw.map(sub (@values) {
    ([+] @values) / @values
});
my $belt_labeled = $belt_avg.map({ "Belt: $_" });
my $samples = Supply.interval(5).map({ rand });
my $samples_labeled = $samples.map({ "Sample: $_" });
my $merged = $belt_labeled.merge($samples_labeled);
$merged.tap(&say);
sleep 20;

Notice how it’s not actually that much harder than code that maps and greps lists we already had – and yet we’re managing to deal with both time and concurrently arriving data.

The future

Supplies are one of the most recently implemented things in Rakudo, and what’s done so far works on Rakudo on the JVM. With time, we’ll flesh out the range of combinators and factories, keep growing our test coverage, and deliver this functionality on Rakudo on MoarVM too, for those who don’t want to use the JVM.