Day 9 – HTTP and Web Sockets with Cro

It’s not only Christmas time when gifts are given. This summer at the Swiss Perl Workshop – beautifully situated up in the Alps – I had the pleasure of revealing Cro. Cro is a set of libraries for building services in Perl 6, together with a little development tooling to stub, run, and trace services. Cro is intially focused on building services with HTTP (including HTTP/2.0) and web sockets, but early support for ZeroMQ is available, and a range of other options are planned for the future.

Reactive pipelines

Cro follows the Perl design principle of making the easy things easy, and the hard things possible. Much like Git, Cro can be thought of as having porcelain (making the easy things easy) and plumbing (making the hard things possible). The plumbing level consists of components that are composed to form pipelines. The components come in different shapes, such as sources, transforms, and sinks. Here’s a transform that turns a HTTP request into a HTTP response:

use Cro;
use Cro::HTTP::Request;
use Cro::HTTP::Response;
class MuskoxApp does Cro::Transform {
method consumes() { Cro::HTTP::Request }
method produces() { Cro::HTTP::Response }
method transformer(Supply $pipeline --> Supply) {
supply whenever $pipeline -> $request {
given Cro::HTTP::Response.new(:$request, :200status) {
.append-header: "Content-type", "text/html";
.set-body: "Muskox Rocks!\n".encode('ascii');
.emit;
}
}
}
}

Now, let’s compose it with a TCP listener, a HTTP request parser, and a HTTP response serializer:

use Cro::TCP;
use Cro::HTTP::RequestParser;
use Cro::HTTP::ResponseSerializer;
my $server = Cro.compose:
Cro::TCP::Listener.new(:port(4242)),
Cro::HTTP::RequestParser.new,
MuskoxApp,
Cro::HTTP::ResponseSerializer;

That gives back a Cro::Service, which we can now start, and stop upon Ctrl+C:

$server.start;
react whenever signal(SIGINT) {
$server.stop;
exit;
}

Run it. Then curl it.

$ curl http://localhost:4242/
Muskox Rocks!

Not bad. But what if we wanted a HTTPS server? Provided we’ve got key and certificate files handy, that’s just a case of replacing the TCP listener with a TLS listener:

use Cro::TLS;
my $server = Cro.compose:
Cro::TLS::Listener.new(
:port(4242),
:certificate-file('certs-and-keys/server-crt.pem'),
:private-key-file('certs-and-keys/server-key.pem')
),
Cro::HTTP::RequestParser.new,
MuskoxApp,
Cro::HTTP::ResponseSerializer;

Run it. Then curl -k it.

$ curl -k https://localhost:4242/
Muskox Rocks!

And middleware? That’s just another component to compose into the pipeline. Or, seen another way, with Cro everything is middleware. Even the request parser or response serializer can be easily replaced, should the need arise (which sounds like an odd thing to need, but that’s effectively what implementing FastCGI would involve).

So, that’s how Cro is plumbed. It also requires an amount of boilerplate to work at this level. Bring in the porcelain!

HTTP server, the easy way

The Cro::HTTP::Server class gets rid of the boilerplate of building the HTTP processing pipeline. The example from earlier becomes just:

use Cro;
use Cro::HTTP::Server;
class MuskoxApp does Cro::Transform {
method consumes() { Cro::HTTP::Request }
method produces() { Cro::HTTP::Response }
method transformer(Supply $pipeline --> Supply) {
supply whenever $pipeline -> $request {
given Cro::HTTP::Response.new(:$request, :200status) {
.append-header: "Content-type", "text/html";
.set-body: "Muskox Rocks!\n".encode('ascii');
.emit;
}
}
}
}
my $server = Cro::HTTP::Server.new: :port(4242), :application(MuskoxApp);
$server.start;
react whenever signal(SIGINT) {
$server.stop;
exit;
}

There’s no magic here; it really is just a more convenient way to compose a pipeline. And while that’s only so much of a saving for HTTP/1.*, a HTTP/2.0 pipeline involves some more components, and a pipeline that supports both is a bit more involved still. By comparison, it’s easy to configure Cro::HTTP::Server to do HTTPS with support for both HTTP/1.1 and HTTP/2.0:

my %tls =
:certificate-file('certs-and-keys/server-crt.pem'),
:private-key-file('certs-and-keys/server-key.pem');
my $server = Cro::HTTP::Server.new: :port(4242), :application(MuskoxApp),
:%tls, :http<1.1 2>;

The route to happiness

A web application in Cro is ultimately always a transform that turns a HTTP request into a HTTP response. It’s very rare to want to process all requests in exactly the same way, however. Typically, different URLs should be routed to different handlers. Enter Cro::HTTP::Router:

use Cro::HTTP::Router;
use Cro::HTTP::Server;
my $application = route {
get -> {
content 'text/html', 'Do you like dugongs?';
}
}
my $server = Cro::HTTP::Server.new: :port(4242), :$application;
$server.start;
react whenever signal(SIGINT) {
$server.stop;
exit;
}

The object returned by a route block does the Cro::Transform role, meaning it would work just fine to use it with Cro.compose(...) plumbing too. It’s a good bit more convenient to write an application using the router, however! Let’s look at the get call a little more closely:

get -> {
content 'text/html', 'Do you like dugongs?';
}

Here, get is saying that this handler will only deal with HTTP GET requests. The empty signature of the pointy block means no URL segments are expected, so this route only applies to /. Then, instead of having to make a response object instance, add a header, and encode a string, the content function does it all.

The router is built to take advantage of Perl 6 signatures, and also to behave in a way that will feel natural to Perl 6 programmers. Route segments are set up by declaring parameters, and literal string segments match literally:

get -> 'product', $id {
content 'application/json', {
id => $id,
name => 'Arctic fox photo on canvas'
}
}

A quick check with curl shows that it takes care of serializing the JSON for us also:

$ curl http://localhost:4242/product/42
{"name": "Arctic fox photo on canvas","id": "42"}

The JSON body serializer is activated by the content type. It’s possible, and pretty straightforward, to implement and plug in your own body serializers.

Want to capture multiple URL segments? Slurpy parameters work too, which is handy in combination with static for serving static assets, perhaps multiple levels of directories deep:

get -> 'css', *@path {
static 'assets/css', @path;
}

Optional parameters work for segments that may or may not be provided. Using subset types to constrain the allowed values work too. And Int will only accept requests where the value in the URL segment parses as an integer:

get -> 'product', Int $id {
content 'application/json', {
id => $id,
name => 'Arctic fox photo on canvas'
}
}

Named parameters are used to receive query string arguments:

get -> 'search', :$query {
content 'text/plain', "You searched for $query";
}

Which would be populated in a request like this:

$ curl http://localhost:4242/search?query=llama
You searched for llama

These too can be type constrained and/or made required (named parameters are optional by default in Perl 6). The Cro router tries to help you do HTTP well by giving a 404 error for failure to match a URL segments, 405 (method not allowed) when segments would match but the wrong method is used, and 400 when the method and segments are fine, but there’s a problem with the query string. Named parameters, through use of the is header and is cookie traits, can also be used to accept and/or constrain headers and cookies.

Rather than chugging through the routes one at a time, the router compiles all of the routes into a Perl 6 grammar. This means that routes will be matched using an NFA, rather than having to chug through them one at a time. Further, it means that the Perl 6 longest literal prefix rules apply, so:

get -> 'product', 'index' { ... }
get -> 'product', $what { ... }

Will always prefer the first of those two for a request to /product/index, even if you wrote them in the opposite order:

get -> 'product', $what { ... }
get -> 'product', 'index' { ... }

Middleware made easier

It’s fun to say that HTTP middleware is just a Cro::Transform, but it’d be less fun to write if that was all Cro had to offer. Happily, there are some easier options. A route block can contain before and after blocks, which will run before and after any of the routes in the block have been processed. So, one could add HSTS headers to all responses:

my $application = route {
after {
header 'Strict-transport-security', 'max-age=31536000; includeSubDomains';
}
# Routes here...
}

Or respond with a HTTP 403 Forbidden for all requests without an Authorization header:

my $application = route {
before {
unless .has-header('Authorization') {
forbidden 'text/plain', 'Missing authorization';
}
}
# Routes here...
}

Which behaves like this:

$ curl http://localhost:4242/
Missing authorization
$ curl -H"Authorization: Token 123" http://localhost:4242/
<strong>Do you like dugongs?</strong>

It’s all just a Supply chain

All of Cro is really just a way of building up a chain of Perl 6 Supply objects. While the before and after middleware blocks are convenient, writing middleware as a transform provides access to the full power of the Perl 6 supply/whenever syntax. Thus, should you ever need to take a request with a session token and make an asynchronous call to a session database, and only then either emit the request for further processing (or do a redirection to a login page), it’s possible to do it – in a way that doesn’t block other requests (including those on the same connection).

In fact, Cro is built entirely in terms of the higher level Perl 6 concurrency features. There’s no explicit threads, and no explicit locks. Instead, all concurrency is expressed in terms of Perl 6 Supply and Promise, and it is left to the Perl 6 runtime library to scale the application over multiple threads.

Oh, and WebSockets?

It turns out Perl 6 supplies map really nicely to web sockets. So nicely, in fact, that Cro was left with relatively little to add in terms of API. Here’s how an (overly) simple chat server backend might look:

my $chat = Supplier.new;
get -> 'chat' {
# For each request for a web socket...
web-socket -> $incoming {
# We start this bit of reactive logic...
supply {
# Whenever we get a message on the socket, we emit it into the
# $chat Supplier
whenever $incoming -> $message {
$chat.emit(await $message.body-text);
}
# Whatever is emitted on the $chat Supplier (shared between all)
# web sockets), we send on this web socket.
whenever $chat -> $text {
emit $text;
}
}
}
}

Note that doing this needs a use Cro::HTTP::Router::WebSocket; to import the module providing the web-socket function.

In summary

This is just a glimpse at what Cro has to offer. There wasn’t space to talk about the HTTP and web socket clients, the cro command line tool for stubbing and running projects, the cro web tool that provides a web UI for doing the same, or that if you stick CRO_TRACE=1 into your environment you get lots of juicy debugging details about request and response processing.

To learn more, check out the Cro documentation, including a tutorial about building a Single Page Application. And if you’ve more questions, there’s also a recently-created #cro IRC channel on Freenode.

Day 17 – Testing in virtual time

Over the last month, most of my work time has been spent building a proof of concept for a project that I’ll serve as architect for next year. When doing software design, I find spikes (time-boxed explorations of problems) and rapid prototyping really useful ways to gain knowledge of new problem spaces that I will need to work in. Finding myself with a month to do this before the “real” start of the project has been highly valuable. Thanks to being under NDA, I can’t say much about the problem domain itself. I can, however, say that it involves a reasonable amount of concurrency: juggling different tasks that overlap in time.

Perl’s “whipuptitude” – the ability to quickly put something together – is fairly well known. Figuring that Perl 6’s various built-in concurrency constructs would allow me to whip up concurrent things rapidly, I decided to build the proof of concept in Perl 6. I’m happy to report that the bet paid off pretty well, and by now the proof of concept has covered all of the areas I hoped to explore – and some further ones that turned out to matter but that weren’t obvious at the start.

To me, building rapid prototypes explicitly does not mean writing crappy code. For sure, simplifications and assumptions of things not critical to the problem space are important. But my prototype code was both well tested and well structured. Why? Because part of rapid prototyping is being able to evolve the prototype quickly. That means being able to refactor rapidly. Decent quality, well-structured, well-tested code is important to that. In the end, I had ~2,500 lines of code covered by ~3,500 lines of tests.

So, I’ve spent a lot of time testing concurrent code. That went pretty well, and I was able to make good use of Test::Mock in order to mock components that returned a Promise or Supply also. The fact that Perl 6 has, from the initial language release, had ways to express asynchronous values (Promise) or asynchronous streams of values (Supply) is in itself a win for testing. Concurrent APIs expressed via these standard data structures are easy to fake, since you can put anything you want behind a Promise or a Supply.

My work project didn’t involve a huge amount of dealing with time, but in the odd place it did, and I realized that testing this code effectively would be a challenge. That gave me the idea of writing about testing time-based code for this year’s Perl 6 advent, which in turn gave me the final nudge I needed to write a module that’s been on my todo list all year. Using it, testing things involving time can be a lot more pleasant.

Today’s example: a failover mechanism

Timeouts are one of the most obvious and familiar places that time comes up in fairly “everyday” code. To that end, let’s build a simple failover mechanism. It should be used as follows:

my $failover-source = failover($source-a, $source-b, $timeout);

Where:

  • $source-a is a Supply
  • $source-b is a Supply
  • $timeout is a timeout in seconds (any Real number)
  • The result, assigned to $failover-source, is also Supply

And it should function as follows:

  • The Supply passed as $source-a is immediately tapped (which means it is requested to do whatever is needed to start producing values)
  • If it produces its first value before $timeout seconds, then we simply emit every value it produces to the result Supply and ignore $source-b
  • Otherwise, after $timeout seconds, we also tap $source-b
  • Whichever source then produces a value first is the one that we “latch” on to; any results from the other should be discarded

Consider, for example, that $source-a and $source-b are supplies that, when tapped, will send the same query to two different servers, which will stream back results over time. Normally we expect the first result within a couple of seconds. However, if the server queried by $source-a is overloaded or has other issues, then we’d like to try using the other one, $source-b, to see if it can produce results faster. It’s a race, but where A gets a head start.

Stubbing stuff in

So, in a Failover.pm6, let’s stub in the failover sub as follows:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
    }
}

A t/failover.t then starts off as:

use Failover;
use Test;

# Tests will go here

done-testing;

And we’re ready to dig in to the fun stuff.

The first test

The simplest possible case for failover is when $source-a produces its first value in time. In this case, $source-b should be ignored totally. Here’s a test case for that:

subtest 'When first Supply produces a value in time, second not used', {
    my $test-source-a = supply {
        whenever Promise.in(1) {
            emit 'a 1';
        }
        whenever Promise.in(3) {
            emit 'a 2';
        }
    }
    my $test-source-b = supply {
        die "Should never be used";
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    is $output.receive, 'a 1', 'Received first value from source A';
    is $output.receive, 'a 2', 'Received second value from source A';
}

Here, we set up $test-source-a as a Supply that, when tapped, will emit a 1 after 1 second, and a 2 after 3 seconds. If $test-source-b is ever tapped it will die. We expect that if this wrongly happens, it will be at the 2 second mark, which is why a 2 is set to be emitted after 3 seconds. We then obtain a Channel from the resulting $failover-supply, which we can use to pull values from at will and check we got the right things. (On coercing a Supply to a Channel, the Supply is tapped, starting the flow of values, and each result value is fed into the Channel. Both completion and errors are also conveyed.)

Making it pass

There are a couple of ways that we might make this test pass. The absolute easiest one would be:

sub failover(Supply $source-a, Supply $source-b, Real $timeout) is export {
    return $source-a;
}

Which feels like cheating, but in TDD the code that passes the first test case almost always does. (It sometimes feels pointless to write said tests. More than once, they’ve ended up saving me when – while making a hard thing work – I ended up breaking the trivial thing.)

An equivalent, more forward-looking solution would be:

sub failover(Supply $source-a, Supply $source-b, Real $timeout) is export {
    supply {
        whenever $source-a {
            .emit;
        }
    }
}

Which is the identity operator on a Supply (just spit out everything you get). For those not familiar with supplies, it’s worth noting that this supply block does 3 useful things for you for free:

  • Passes along errors from $source-a
  • Passes along completion from $source-a
  • Closes the tap on $source-a – thus freeing up resources – if the tap on the supply we’re defining here is closed

Subscription management and error management are two common places for errors in asynchronous code; the supply/whenever syntax tries to do the Right Thing for you on both fronts. It’s more than just a bit of tinsel on the Christmas callback.

When the timeout…times out

So, time for a more interesting test case. This one covers the case where the $source-a fails to produce a value by the timeout. Then, $source-b produces a value within 1 second of being tapped – meaning its value should be relayed. We also want to ensure that even if $test-source-a were to emit a value a little later on, we’d disregard it. Here’s the test:

subtest 'When timeout reached, second Supply is used instead if it produces value first', {
    my $test-source-a = supply {
        whenever Promise.in(4) {
            emit 'a 1';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(1) { # start time + 2 (timeout) + 1
            emit 'b 1';
        }
        whenever Promise.in(3) { # start time + 2 (timeout) + 3
            emit 'b 2';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    is $output.receive, 'b 1', 'Received first value from source B';
    is $output.receive, 'b 2', 'Received second value from source B';
}

We expect a 1 to be ignored, because we chose $source-b. So, how can we make this pass? Here goes:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my $emitted-value = False;

        whenever $source-a {
            $emitted-value = True;
            .emit;
        }

        whenever Promise.in($timeout) {
            unless $emitted-value {
                whenever $source-b {
                    .emit;
                }
            }
        }
    }
}

Will this pass the test? Both subtests?

Think about it…

Well, no, it won’t. Why? Because it doesn’t do anything about disregarding $source-a after it has started spitting out values from $source-b. It needs to commit to one or the other. Didn’t spot that? Good job we have tests! So, here’s a more complete solution that makes both subests pass:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my enum Committed ;
        my $committed = None;

        whenever $source-a -> $value {
            given $committed {
                when None {
                    $committed = A;
                    emit $value;
                }
                when A {
                    emit $value;
                }
            }
        }

        whenever Promise.in($timeout) {
            if $committed == None {
                whenever $source-b -> $value {
                    $committed = B;
                    emit $value;
                }
            }
        }
    }
}

So tired of waiting

You’d think I’d be happy with this progress. Two passing test cases. Surely the end is in sight! Alas, development is getting…tedious. Yes, after just two test cases. Why? Here’s why:

$ time perl6-m -Ilib t/failover-bad.t
    ok 1 - Received first value from source A
    ok 2 - Received second value from source A
    1..2
ok 1 - When first Supply produces a value in time, second not used
    ok 1 - Received first value from source B
    ok 2 - Received second value from source B
    1..2
ok 2 - When timeout reached, second Supply is used instead if it produces value first
1..2

real    0m8.694s
user    0m0.600s
sys     0m0.072s

Every time I run my tests I’m waiting around 9 seconds now. And when I add more tests? Even longer! Now imagine I was going to write a whole suite of these failover and timeout routines, as a nice module. Or I was testing timeouts in a sizable app and would have dozens, even hundreds, of such tests.

Ouch.

Maybe, though, I could just make the timeouts smaller. Yes, that’ll do it! Here is how the second test looks now, for example:

subtest 'When timeout reached, second Supply is used instead if it produces value first', {
    my $test-source-a = supply {
        whenever Promise.in(0.04) {
            emit 'a 1';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(0.01) { # start time + 2 (timeout) + 1
            emit 'b 1';
        }
        whenever Promise.in(0.03) { # start time + 2 (timeout) + 3
            emit 'b 2';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 0.02);
    my $output = $failover-supply.Channel;

    is $output.receive, 'b 1', 'Received first value from source B';
    is $output.receive, 'b 2', 'Received second value from source B';
}

You want it faster? Divide by 100! Job done.

Of course, anybody who has actually done this knows precisely what comes next. The first 3 times I ran my tests after this change, all was well. But guess what happened on the forth time?

ok 1 - When first Supply produces a value in time, second not used
    not ok 1 - Received first value from source B

    # Failed test 'Received first value from source B'
    # at t/failover-short.t line 41
    # expected: 'b 1'
    #      got: 'a 1'
    not ok 2 - Received second value from source B

    # Failed test 'Received second value from source B'
    # at t/failover-short.t line 42
    # expected: 'b 2'
    #      got: 'b 1'
    1..2
    # Looks like you failed 2 tests of 2
not ok 2 - When timeout reached, second Supply is used instead if it produces value first

Mysteriously…it failed. Why? Bad luck. My computer is a busy little machine. It can’t just give my test programs all the CPU all of the time. It needs to decode that music I’m listening to, check if I need to install the 10th set of security updates so far this month, and cope with my web browser wanting to do stuff because somebody tweeted something or emailed me. And so, once in a while, just after the clock hits 0.01 seconds and a thread grabs the whenever block to work on, that thread will be dragged off the CPU. Then, before it can get back on again, the one set to run at 0.04 seconds gets to go, and spits out its value first.

Sufficiently large times mean slow tests. Smaller values mean unreliable tests. Heck, suspend the computer in the middle of running the test suite and even a couple of seconds is too short for reliable tests.

Stop! Virtual time!

This is why I wrote Test::Scheduler. It’s an implementation of the Perl 6 Scheduler role that virtualizes time. Let’s go back to our test code and see if we can do better. First, I’ll import the module:

use Test::Scheduler;

Here’s the first test, modified to use Test::Scheduler:

subtest 'When first Supply produces a value in time, second not used', {
    my $*SCHEDULER = Test::Scheduler.new;

    my $test-source-a = supply {
        whenever Promise.in(1) {
            emit 'a 1';
        }
        whenever Promise.in(3) {
            emit 'a 2';
        }
    }
    my $test-source-b = supply {
        die "Should never be used";
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    $*SCHEDULER.advance-by(3);
    is $output.receive, 'a 1', 'Received first value from source A';
    is $output.receive, 'a 2', 'Received second value from source A';
}

Perhaps the most striking thing is how much hasn’t changed. The changes amount to two additions:

  1. The creation of a Test::Scheduler instance and the assignment to the $*SCHEDULER variable. This dynamic variable is used to specify the current scheduler to use, and overriding it allows us to swap in a different one, much like you can declare a $*OUT to do stuff like capturing I/O.
  2. A line to advance the test scheduler by 3 seconds prior to the two assertions.

The changes for the second test are very similar:

subtest 'When timeout reached, second Supply is used instead if it produces value first', {
    my $*SCHEDULER = Test::Scheduler.new;

    my $test-source-a = supply {
        whenever Promise.in(4) {
            emit 'a 1';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(1) { # start time + 2 (timeout) + 1
            emit 'b 1';
        }
        whenever Promise.in(3) { # start time + 2 (timeout) + 3
            emit 'b 2';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    $*SCHEDULER.advance-by(6);
    is $output.receive, 'b 1', 'Received first value from source B';
    is $output.receive, 'b 2', 'Received second value from source B';
}

And what difference does this make to the runtime of my tests? Here we go:

$ time perl6-m -Ilib t/failover-good.t
    ok 1 - Received first value from source A
    ok 2 - Received second value from source A
    1..2
ok 1 - When first Supply produces a value in time, second not used
    ok 1 - Received first value from source B
    ok 2 - Received second value from source B
    1..2
ok 2 - When timeout reached, second Supply is used instead if it produces value first
1..2

real    0m0.679s
user    0m0.628s
sys     0m0.060s

From 9 seconds to sub-second – and much of that will be fixed overhead rather than the time running the tests.

One more test

Let’s deal with the final of the requirements, just to round off the test writing and get to a more complete solution to the original problem. The remaining test we need is for the case where the timeout is reached, and we tap $source-b. Then, before it can produce a value, $source-a emits its first value. Therefore, we should latch on to $source-a.

subtest 'When timeout reached, and second Supply tapped, first value still wins', {
    my $*SCHEDULER = Test::Scheduler.new;

    my $test-source-a = supply {
        whenever Promise.in(3) {
            emit 'a 1';
        }
        whenever Promise.in(4) {
            emit 'a 2';
        }
    }
    my $test-source-b = supply {
        whenever Promise.in(2) { # start time + 2 (timeout) + 2
            emit 'b 1';
        }
    }
    my $failover-supply = failover($test-source-a, $test-source-b, 2);
    my $output = $failover-supply.Channel;

    $*SCHEDULER.advance-by(4);
    is $output.receive, 'a 1', 'Received first value from source A';
    is $output.receive, 'a 2', 'Received second value from source A';
}

This fails, because the latch logic wasn’t included inside of the whenever block that subscribes to $source-b. Here’s the easy fix for that:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my enum Committed ;
        my $committed = None;

        whenever $source-a -> $value {
            given $committed {
                when None {
                    $committed = A;
                    emit $value;
                }
                when A {
                    emit $value;
                }
            }
        }

        whenever Promise.in($timeout) {
            if $committed == None {
                whenever $source-b -> $value {
                    given $committed {
                        when None {
                            $committed = B;
                            emit $value;
                        }
                        when B {
                            emit $value;
                        }
                    }
                }
            }
        }
    }
}

The easy thing is just a little bit repetitive, however. It would be nice to factor out the commonality into a sub. Here goes:

sub failover(Supply $source-a, Supply $source-b, Real $timeout --> Supply) is export {
    supply {
        my enum Committed ;
        my $committed = None;

        sub latch($onto) {
            given $committed {
                when None {
                    $committed = $onto;
                    True
                }
                when $onto {
                    True
                }
            }
        }

        whenever $source-a -> $value {
            emit $value if latch(A);
        }

        whenever Promise.in($timeout) {
            if $committed == None {
                whenever $source-b -> $value {
                    emit $value if latch(B);
                }
            }
        }
    }
}

And in under a second, the tests can now assure us that this was indeed a successful refactor. Note that this does not yet cancel a discarded request, perhaps saving duplicate work. I’ll leave that as an exercise for the reader.

Safety and realism

One thing you might wonder about here is whether this code is really thread safe. The default Perl 6 scheduler will schedule code across a bunch of threads. What if $source-a and $source-b emit their first value almost simultaneously?

The answer is that supply (and react) blocks promise Actor-like semantics, processing just one message at a time. So, if we’re inside of the whenever block for $source-a, and $source-b emits a message on another thread, then it will be queued up for processing afterwards.

One interesting question that follows on from this is whether the test scheduler somehow serializes everything onto the test thread in order to do its job. The answer is that no, it doesn’t do that. It wraps the default ThreadPoolScheduler and always delegates to it to actually run code. This means that, just as with the real scheduler, the code will run across multiple threads and on the thread pool. This helps to avoid a couple of problems. Firstly, it means that testing code that relies on having real threads (by doing stuff that really blocks a thread) is possible. Secondly, it means that Test::Scheduler is less likely to hide real data race bugs that may exist in the code under test.

Of course, it’s important to keep in mind that virtual time is still very much a simulation of real time. It doesn’t account for the fact that running code takes time; virtual time stands still while code runs. At the same time, it goes to some amount of effort to get the right sequencing when a time-based event triggered in virtual time leads to additional time-based events being scheduled. For example, imagine we schedule E1 in 2s and E2 in 4s, and then advance the virtual time by 4s. If the triggering of E1 schedules E3 in 1s (so, 3s relative to the start point), we need to have it happen before E2. To have this work means trying to identify when all the consequences of E1 have been shaken out before proceeding (which is easier said than done). Doing this will, however, prevent some possible overlaps that could take place in real time.

In summary…

Unit tests for code involving time can easily end up being slow and/or unreliable. However, if we can virtualize time, it’s possible to write tests that are both fast and reliable – as good unit tests should be. The Test::Scheduler module provides a way to do this in Perl 6. At the same time, virtual time is not a simulation of the real thing. The usual rules apply: a good unit test suite will get you far, but don’t forget to have some integration tests too!

Day 14 – A nice supplies: syntactic relief for working with asynchronous data

Asynchronous data is all around us. What is it? It’s data that arrives whenever it pleases, as opposed to when we ask for it. Some common examples are:

  • GUI events
  • Data arriving over an async socket
  • Messages arriving from a message queue
  • Ticks of a timer
  • File change notifications
  • Signals

These contrast with synchronous data, which we get when we ask for it. For example, when we query a database and iterate the resulting rows, or iterate through file system entries in a directory, we block until the data we wish to have is available.

Our programming languages tend to be pretty good about synchronous data, giving us a range of syntactic constructs for producing and processing it. For example, in Perl 6 we have for loops for consuming streams of synchronously produced values, and gather blocks for producing them – perhaps doing so by in turn using for loops to consume other synchronous data sources. Inside for and gather, we’re free to use state (kept in variables), and use our comfortable range of conditionals (if, when, etc.) That is to say, we’re free to act like normal, down to earth, imperative programmers in dealing with our synchronous data. Sure, sometimes something just looks far nicer using the functional style, and we map, grep, zip, and reduce our way to the solution. But some problems are just unnatural – at least for most of us, me included – to see in that way.

In Perl 6, we have a common interface for talking about things that produce values over time – that is, synchronous data. These things do the Iterable role, and work in terms of Iterator objects. You never really see an Iterator in day to day Perl 6, and instead see the Seq type. These Seqs are the things gather blocks produce, and for loops can consume.

In the last years, we introduced supplies to Perl 6. Supplies are our common interface for talking about asynchronous data. These have, from the beginning, been rather well received. With supplies, you can grep UI events, map file system notifications, and zip whatever you fancied with ticks of a timer. And that’s just great when it’s easy to think about your problem in functional terms. But what about the rest of the time?

In the last year, we’ve also added supply/react blocks, along with the whenever asynchronous looping construct. In this post, we’ll take a look at how they might be used.

STOMP!

As I wrote the list of sources of asynchronous data at the start of this post, I realized that there was only one of them that I’d not yet done in Perl 6: working with message queues. Hmm. Two hours until midnight. Advent post needs completing before I sleep. Challenge accepted!

10 minutes later, and I’ve RabbitMQ happily installed and am reading the STOMP specification. STOMP is a simple text-oriented message protocol – that is, a text-based way to deal with message queues. Within an hour, I’d got a working STOMP client handling the absolute basics of sending messages to a queue and subscribing to a queue to get messages. So, let’s take a look at how I did it.

First, I sketched in a Stomp::Client class. I figured it would need the host and port we want to connect to, the connection credentials (yes, yes, this is not the height of security engineering), and something to hold the socket representing the connection to the server.

class Stomp::Client {
    has Str $.host is required;
    has Int $.port is required;
    has Str $.login = 'guest';
    has Str $.password = 'guest';
    has $!connection;
}

So, let’s get connected. The connect method will contain a start block, because we want it to function asynchronously. This means the method will return a Promise that the caller can await. To connect to a stomp server, you establish a TCP connection and send a CONNECT frame:

method connect() {
    start {
        my $conn = await IO::Socket::Async.connect($!host, $!port);
        await $conn.print: qq:to/FRAME/;
            CONNECT
            accept-version:1.2
            login:$!login
            passcode:$!password
            
            \0
            FRAME
        True
    }
}

But…then what? Well, then you wait for the server to either return a CONNECTED frame or an ERROR frame. Hm, seems we’ve some work to do. First is to translate the BNF from the STOMP specification into a Perl 6 grammar:

my grammar StompMessage {
    token TOP {
        <command> \n
        [<header> \n]*
        \n
        <body>
        \n*
    }
    token command {
        < CONNECTED MESSAGE RECEIPT ERROR >
    }
    token header {
        <header-name> ":" <header-value>
    }
    token header-name {
        <-[:\r\n]>+
    }
    token header-value {
        <-[:\r\n]>*
    }
    token body {
        <-[\x0]>* )> \x0
    }
}

Note it’s lexically scoped inside of our STOMP::Client class. So is the following little message data structure:

my class Message {
    has $.command;
    has %.headers;
    has $.body;
}

An IO::Socket::Async connection exposes incoming data as a Supply. There are a few things to consider:

  • A message may be spread over multiple packets
  • Two messages may be in one packet
  • The next packet may arrive while we’re processing the current one, and since Perl 6 delivers I/O notifications on the thread pool – like various other languages – we might worry about data races

In fact, the third point is a non-problem: Perl 6 promises that, unless you go out of your way to avoid it, supplies are serial, and will only have you processing one message at a time on a given message pipeline. Supplies are a tool for taming concurrency rather than introducing it.

Now, if I was writing this synchronously, and I wanted to expose a Seq of the incoming messages, I’d use a gather block, keep myself a buffer, recv each incoming chunk of data from the socket, and try to parse a new message at the start of the buffer. Whenever I got a message I’d take it. What about with supplies? Here’s how it looks:

method !process-messages($incoming) {
    supply {
        my $buffer = '';
        whenever $incoming -> $data {
            $buffer ~= $data;
            while StompMessage.subparse($buffer) -> $/ {
                $buffer .= substr($/.chars);
                if $<command> eq 'ERROR' {
                    die ~$<body>;
                }
                else {
                    emit Message.new(
                        command => ~$<command>,
                        headers => $<header>
                            .map({ ~.<header-name> => ~.<header-value> })
                            .hash,
                        body => ~$<body>
                    );
                }
            }
        }
    }
}

Taking it from the top, $incoming will be the Supply of data arriving from the socket, decoded to strings. We wrap the code in this method up in a supply block, since we’d like to return a new Supply. We declare $buffer, to hold data we’ve yet to parse. In this case, our thread safety is fairly trivial: $incoming will deliver a message at a time. But what if we were going to bring together data from many supplies? We’d still be fine: a supply block promises that, per tapping of the Supply, only one thread will ever be allowed in the code inside of the supply block at a time.

The whenever construct is a asynchronous loop. It taps the supply we specify, and the loop body will run whenever a value is emitted by that supply. In our case, that is every time we get data from the socket. We concatenate the incoming $data onto the $buffer. We then try to parse a message at the start of the buffer (subparse means that we don’t mind if we don’t reach the end of the buffer, unlike parse which will complain if it doesn’t completely parse the incoming string). When we do parse data, we lop it off the start of the buffer.

For each message we parse, we first see if it’s an ERROR frame; if so, we simply die with the message. A die inside a whenever block will propagate the error to whatever tapped the supply, so errors will not be lost. Otherwise, we’ve some kind of more interesting message; we turn the parsed data into a Message instance and emit it.

A supply block gives an “on-demand” supply. Each tap performed on the block will run the supply block, which in turn will tap each of the whenevers. That’s the right thing in most cases, but for my Stomp::Client I’d like to have various bits of code tap the same messages, to look for interesting things. One of these bits of code will look for a single CONNECTED message. So, I’ll add a $!incoming attribute to my class:

has $!incoming;

And then, after establishing the connection to the message queue server, do this:

$!incoming = self!process-messages($conn.Supply).share;

That is, I share the result of processing messages out amongst everything that is interested. And interested things will tap the $!incoming supply and filter out what they care about. For example, here’s how I create a Promise that will be kept when we receive a CONNECTED frame:

my $connected = $!incoming
    .grep(*.command eq 'CONNECTED')
    .head(1)
    .Promise;

Here is the connect method in full:

method connect() {
    start {
        my $conn = await IO::Socket::Async.connect($!host, $!port);
        $!incoming = self!process-messages($conn.Supply).share;
        
        my $connected = $!incoming
            .grep(*.command eq 'CONNECTED')
            .head(1)
            .Promise;
        await $conn.print: qq:to/FRAME/;
            CONNECT
            accept-version:1.2
            login:$!login
            passcode:$!password
            
            \0
            FRAME
        await $connected;
        $!connection = $conn;
        
        True
    }
}

Sending messages is easy:

method send($topic, $body) {
    self!ensure-connected;
    $!connection.print: qq:to/FRAME/;
        SEND
        destination:/queue/$topic
        content-type:text/plain
 
        $body\0
        FRAME
}

And subscription is implemented using another supply block, and a bit of simple filtering on the incoming message:

method subscribe($topic) {
    self!ensure-connected;
    state $next-id = 0;
    supply {
        my $id = $next-id++;
        
        $!connection.print: qq:to/FRAME/;
            SUBSCRIBE
            destination:/queue/$topic
            id:$id
 
            \0
            FRAME
        
        whenever $!incoming {
            if .command eq 'MESSAGE' && .headers<subscription> == $id {
                emit .body;
            }
        }
    }
}

And with that, we have a working STOMP client. Let’s try it:

my $queue = Stomp::Client.new(host => 'localhost', port => 61613);
await $queue.connect();
await $queue.send("test", "hello world");
react {
    whenever $queue.subscribe('test') {
        .say;
    }
}

Here, for the first time, we encounter the react block. What’s that? Well, if you imagine that supply blocks are a little like gather – that is, used to create something that can produce values – a react block is for the places you might normally use a for loop when processing synchronous data. However, in the asynchronous world you’re usually interested in a range of things that might happen. A react block runs, sets up all the whenevers, and then blocks until either all of the supplies tapped by whenever blocks are done, or the “done” control operator is used somewhere inside of react.

A little example

Suppose that we wanted to build a simple monitoring system. Workers “sign on”, and must send us a ping every 10 seconds. If they fail to do so, then a monitor will report this. Workers can sign off when finished. Here is the worker process:

sub MAIN($name) {
    my $queue = Stomp::Client.new(host => 'localhost', port => 61613);
    await $queue.connect();
    $queue.send('signon', $name);
    react {
        whenever Supply.interval(10, 5) {
            if <y y n>.pick eq 'y' {
                $queue.send('ping', $name);
            }
        }
    
        whenever signal(SIGINT) {
            $queue.send('signoff', $name);
            done;
        }
    }
}

We start it from the command line with a name. Every 10 seconds it sends a ping – or at least, two thirds of the time it will. And if we hit Ctrl+C the worker signs off.

Next, here’s the monitor:

my $queue = Stomp::Client.new(host => 'localhost', port => 61613);
await $queue.connect();
react {
    my %last-active;
 
    whenever $queue.subscribe('signon') -> $name {
        %last-active{$name} = now;
    }
  
    whenever $queue.subscribe('ping') -> $name {
        %last-active{$name} = now;
    }
  
    whenever $queue.subscribe('signoff') -> $name {
        %last-active{$name}:delete;
    }
 
    whenever Supply.interval(10) {
        for %last-active.kv -> $name, $last-ping {
            if now - $last-ping > 11 {
                say "No ping from $name";
            }
        }
    }
}

Note that the react block, like supply block, enforces that only a single thread at a time may be operating across the various whenever blocks, so the %last-active hash is safe. We have three subscriptions to queues, and every ten seconds look for missing pings. We give an extra second’s grace on those.

Notice how, by exposing the message queue in terms of supplies, we are able to effortlessly compose this asynchronous data with both signals and time. This is where the real power of a uniform interface to asynchronous data comes in. And, with a little syntactic support, we are no longer required to put our functional hat on to wield that power.

Day 11 – So, what does MoarVM do with your Perl 6 code?

The first day of the advent calendar remarked that during 2014, MoarVM became the de facto standard virtual machine to run Perl 6 on. And, with Perl 6 on course to hit 6.0.0 in 2015, MoarVM is what most people adopting Perl 6 at this point will be working with. In this post, we’ll take a look at what MoarVM brings to the table, and where it’s heading from here.

But first – why a VM?

Many modern languages depend on a good deal of runtime support. Perl 6 is very much in this camp, and the same goes for Perl 5. A VM is in the business of providing that support. With Perl 5, the word “interpreter” tends to be used over VM, but the goal is the same: provide the things needed to execute programs in the language across disparate hardware and operating system environments. The difference is mostly that we tend to use VM when there’s a clean separation of the language compiler and the runtime support – meaning that the VM might also be used to execute other languages. This has happened with the JVM, for example. MoarVM is squarely aimed at being a good host for the Rakudo Perl 6 compiler, and the NQP subset of Perl 6 that Rakudo is mostly implemented in. If any other language finds it useful, that’s fine, but it’s not a primary design consideration.

So why care so much over a clean interface between compiler and runtime? Because, in a project the size of “implement Perl 6”, managing implementation complexity is crucial. Establishing a clean boundary between the compiler and the runtime, and knowing what each side is responsible for, is one strategy for doing that. (Rakudo’s compilation process is also broken up into a pipeline of stages that only communicate by passing well-defined data structures from one to the next, which is another form of architectural boundary.)

But why interpreters and VMs at all? Why not go straight down to the metal? For any language where we can reasonably figure out a lot about the program statically, that is a fine strategy. The information needed to eliminate many forms of late binding, checks, and duplicate work is to hand at compile time, and a decent optimizer will be able to use that information. Of course, this approach restricts us to what can be seen at compile time. Some compilers as a result support profile-guided optimization, where we run the program on typical workloads, collect information about where time is spent and what code-paths are common and uncommon, and then compile the program again taking that data into account.

Profile-guided optimization hints at a strategy for efficiently executing languages where we can know rather less at compile time: put off most optimization until we observe the program running, and dynamically replace parts of the program with optimized versions. This isn’t a new idea, and there has been a massive amount of work done in this area. At its heart is the general principle that most programs are, if I may pun on a term from the database field, “eventually static”. Even though a Perl 6 program may have a huge number of points of potential dynamism and genericity, in most programs just a handful of them actually ever exploit it. Of course, different runs of the program on different inputs may tickle different bits of that dynamism, not to mention that as a program grows and evolves, these flexible points will be exploited here and there to (hopefully cleanly) add new functionality.

A modern VM aiming to support a language like Perl 6 is not, therefore, just there to run the code and provide services like garbage collection, OS abstraction, and so on. Its job is to take a program in all its dynamic glory, watch what code paths it actually takes, see what types really show up – and then produce optimized versions of carefully selected parts of the program, with the unused dynamism ruthlessly torn out, and much of the remaining dynamism massaged to more restricted and faster forms.

Bytecode specialization

So what exactly does MoarVM do with a program? First, it needs to find code worth considering optimizing. This is done by seeking the hot parts of a program: routines that are called again and again, and loops that are doing a lot of iterations. Since time to optimize a routine is proportional to the size of the routine, the threshold for “it’s worth considering” goes with code size. A small routine – such as a simple operator or an accessor method – has rather little code and will become hot quickly. A larger routine heats up more slowly. It’s just like our kettle: a little water boils quickly, but fill it up and we’re going to be waiting a bit for that cuppa. This is in part about risk management: we would prefer to avoid investing time optimizing code that’s never going to give a positive return on investment. We can’t predict the future, but some cheap, simple heuristics are at least a good start.

So, we’ve got some hot code. What now? Well, if it’s a call, we start by looking at the callsites that are invoking it. The callsite tells us how many arguments are being passed, whether they are native values or objects, and – for named arguments – what their names are. We can then start to produce bytecode specialized by callsite. Know that the object parameter is passed an object argument? Then replace the op that checks if we need to box the incoming argument with a simpler op that just copies the pointer. Know that an optional parameter actually doesn’t get passed? Then strip out the code to look for it, toss the branch, and just always run the code that sets the default value. Know that the named parameter “config” is always the first passed named argument? Then retrieve it by indexed access (so a cheap pointer copy), rather than messing about with strings.

Next up: types. If we are passed arguments, what types are they? If we look up constants, what types to those have? Knowing these is relatively easy: we just peek at the args buffer to see what’s being passed, or look up the constant and see what type it is. However, knowing them also presents us with a huge range of opportunities. See that method call? It doesn’t need to look up the method in a table each call any more, since we know where it’s going. And that attribute access? Since we know the object layout, it can just become a pointer offset from the object’s memory address. That type check? We often know the answer statically now – and can eliminate entire branches that won’t be taken. Oh, and that multiple dispatch? Let’s just resolve it once now we know the types we’re dealing with, not every single call. The list goes on.

Specialization works at the bytecode level, because that’s what the VM gets. And a specialization of a given piece of bytecode comes with a set of guards: conditions that must be met for it to be applicable. Those can constrain it by callsite and argument types. And we can produce multiple specializations for a given piece of original bytecode.

Of course, choosing the appropriate specialization is a cost too. It would eat into our winnings if we had to do that on every call. Thankfully, there are ways to avoid that. If we are optimizing a call to something and know the argument types being passed, we can pick the specialization of the callee and optimize the caller to always call that specialization, rather than having to hunt for the right one each time. And, if the callee is rather small, we can often do away with the call entirely and simply inline the callee into the caller – eliminating the need to create a callframe and keeping the code tighter, which CPU caches like.

Optimization speculation

So, all this is nice, but we want MOAR MOAR MOAR. Because there are lots of other things that we can’t statically figure out the types of, but that may end up having very stable types come runtime. Some examples are lexical variables that we close over, object attributes, and return values of our callees. So how do we get hold of these types?

When code becomes hot, we don’t actually optimize it right away. Well, we do for the simple callsite-related transformations. But before going and doing the type-driven stuff, we take a closer look at what’s actually going on in the program. The specializer quickly produces an instrumented version of the program that records what types show up where. This runs for the next bunch of invocations, and logs what it sees. After a threshold is hit, we have a look at the types that showed up and do a little analysis. If a given logging site consistently logs the same type, we’re onto something. If we see more than one type show up there, we consider that part of the program as being typically dynamic and leave it be.

The thing is, we didn’t actually prove a property of the program here, just profiled it and figured out what tends to happen. So how can we use this information? The trick is to insert a guard into the specialized program that cheaply asserts that we got the expected type. Beyond that guard, we can optimize assuming we really did. (We also make sure that we actually do an optimization based upon what the guard checks, and don’t bother to insert it otherwise.)

Which is all well and good when the guard is met, but what about when the assertion fails? In fact, this is a more general problem. Even the far simpler type-based optimizations on incoming parameters assume that an object’s type will not change – and of course, thanks to mixins, they can. All of these situations trigger deoptimization: replacing the optimized code we’re in the midst of running with the unoptimized, naive, but safe version that is dynamic enough to handle all the cases.

Deoptimization is quite a pain to get right. For one, if we were inside of inlined code, we have to go back and create all the callframes we skipped. Since inlining can be many levels deep, that can be a bunch of callframes to wire up. Then we have to make sure that our optimizations don’t throw away code that produces a value the deoptimized code relies on being there, but the optimized code would otherwise not. Of course, we still need to do dead code elimination in general; it’s just that it has to be aware of deoptimization.

Therefore, it’s not only important that MoarVM can optimize code. It’s also critical to its ability to do so that it can also deoptimize, falling back to slow paths when needed.

Down to machine code

Interpreting specialized bytecode can yield, in some programs, a significant improvement. The simple program:

my $i = 0; while ++$i <= 100000000 { }

Got a factor of 2.5 improvement with what we had several months ago, and there are still a lot of opportunities left (some of which we’ve already explored, and with others yet to go). However, interpreting – that is, looping over an instruction stream and choosing what to do for each instruction – has its overhead. Before the specialization process eliminates a lot of the dynamism, interpretation is only so bad; some ops have some checks to do, and so they cost a bit. But specialized code tends to have a lot of very cheap operations that just play with pointers – and then the overhead of interpreting can quickly come to account for the majority of the execution time.

Therefore, on x64, we can often take the further step of producing machine code to run, instead of specialized bytecode to interpret. Of course, we have to be ready to deoptimize from that too. I’ll save talking about the machine code part too much, since the guy who worked on it is, I believe, plotting an advent post about it in the near future. Needless to say, it makes a dramatic difference; for the benchmark above it more than halved the time again relative to the specialized bytecode.

We often referred to the “produce machine code” part of the work as “the JIT”. But really, the entire set of optimizations described in this article are part of the typical work a JIT compiler does. Producing machine code is just one small part, and it’s interesting to note that the win from eliminating dynamism was a bigger one than eliminating interpretation overhead.

What next?

The work so far has had a dramatic impact on performance. However, there’s still plenty of areas for improvement.

For one, the number of specializations produced and how we choose them can certainly use some tweaking. It’s possible for a routine to saturate its number of allowed specializations with ones that, in the long run, are not entirely strategic. Or we may be able to reasonably argue that it should be allowed more. At the same time, specialized bytecode is a notable memory overhead at this point. So, we need to produce both more and less specializations. Of course, what that really means is we need a better strategy for picking what is worth doing.

Another clear issue is that some speculative optimizations turn out to be bad bets, and we have no way of recovering from that. We can deoptimize pretty fast (in fact, some trade-offs have been made to have it be quite affordable). But it’s still a cost, and it’s possible a gamble that looked good based on available data may lead to code that has to be deoptimized a majority of the time – and thus run slower than if we’d never bothered. Again, it’s about being smarter and keeping on learning as we run the program.

Then there’s a bunch of things that we don’t specialize or compile to machine code in a good way yet. Array access is an obvious one, and big integer operations – which are at the heart of Perl 6’s Int type – are another. The specializer is also not as aware as it really should be of closures. These are all areas where we could do a whole lot better – and they are nice, incremental advances on what we already have.

A further interesting area for exploration is doing escape analysis, and being able to eliminate a bunch of GC allocations in favor of allocating the memory as part of the call frame, because we know it can never end up being used beyond it. This is a fair amount of work, but also highly relevant to Perl 6: many scalar containers for lexical variables would be great candidates.

Then there’s the whole idea of trace JIT. MoarVM so far has a routine-level JIT, often called a method JIT. There, we optimize at routine level first, and may do some inlining to flatten away the call tree. This is a pretty decent strategy for Perl 6, in so far as we’re often very interested in inlining primitive operators that, naively interpreted in the original dynamic program, would be multiple dispatch calls. Trace JITs, by contrast, just follow the flow of instructions, over however many callframes, and every single conditional branch becomes a guard. Their strength is loops, which today we cope with using On Stack Replacement (detect we’re in a hot loop, compile an optimized version of it, and hot-swap it). While it’s easy to talk about trace JIT vs. method JIT as a “pick one”, I’m much more interested in the “pick both” track. They both, after all, share a lot of infrastructural needs – and they have their strengths and weaknesses. There’s more than one way to dynamically optimize it, and, this being Perl, we might as well just do all the ways – spotting all the nice unifications and re-use opportunities along the way.

Day 19 – Perl 6 Supplies Reactive Programming

Several days back, we took a look at promises and channels. Promises provided a synchronization mechanisms for asynchronous things that produced a single result, while channels were ideal for setting up producer/consumer style workflows, with producers and consumers able to work in parallel. Today we’ll take a look at a third mechanism aimed at introducing asynchrony and coping with concurrency: supplies!

Synchronous = Pull, Asynchronous = Push

One of the most important features of promises is the then method. It enables one or more things to be run whenever the asynchronous work the promise represents is completed. In a sense, it’s like a publish/subscribe mechanism: you call then to subscribe, and when the work is done then the notification is published. This happens asynchronously. You don’t sit around waiting for the promise to complete, but instead just say what to do when this takes place. Thinking about the way data flows, values are pushed along to the next step of the process.

This is rather different to things like sub and method calls. There, we make the call, then we block until it has completed. Iterating over a lazy list is the same: you can’t progress with the iteration until the next value is available. Thus, iteration is really about pulling stuff from a source. So, if a promise can be thought of as something that can push a single values out as it becomes available, do we have something that can push a whole stream of values outwards, as they are produced over time?

Supplies! We do!

A while back, it was realized that the observer pattern is the mathematical dual of the iterator pattern. Why is this exciting? Quite simply, because it means that all the things you can sensibly do with something you can iterate (map, grep, zip, etc.), you can also sensibly do with something you can observe. Out of this was born reactive programming, and the Rx (Reactive Extensions) library, which has now been ported to many platforms. In Perl 6, we’re providing support for this in core.

The basics

Let’s start out simple. First, we create a new Supply:

my $measurements = Supply.new;

We can then tap the supply, passing a closure that should be called whenever a value is made available:

$measurements.tap(-> $value {
    say "Measured: $value";
});

Finally, we produce some values:

$measurements.more(1.5);
$measurements.more(2.3);
$measurements.more(4.6);

On each of these calls, the closure we tapped the supply with is invoked. Just as we can call then many times, so we can tap many times too:

$measurements.tap(-> $value {
    say "Also measured: $value";
});

Now, when we produce a value:

$measurements.more(2.8);

Both of the closures tapping the supply will be called. Note that tap returns an object which can be used to express you’re no longer interested in the supply, essentially turning that tap off.

Note that we didn’t introduce any asynchrony so far. However, supplies are built for it. You can safely have multiple threads supplying values. By default, the supplying thread is used to execute the taps.

Enter the combinators!

Since supplies are essentially a thread-safe observer implementation, we can define many of the same things on them as we’re used to having on lists. For example, imagine we just wanted to tap high measurements. Well, we just re-use knowledge from how we’d filter a list: using grep!

$measurements.grep(* > 4).tap(-> $value {
    say "HIGH: $value";
});

Calling grep on a supply produces another supply, just as calling grep on a list gives another list. We could, if we wished, store it in a variable and tap this derived supply many times, grep it again, map it, etc.

Supply factories

There are ways to get supplies besides simply creating them directly. The Supply class has various factory methods that create various interesting kinds of supply, while introducing asynchrony. For example, interval gives a supply that, when tapped, will produce an ascending integer once per time interval.

my $secs = Supply.interval(1);
$secs.tap(-> $s { say "Started $s seconds ago" });
sleep 10;

Factories can also help map between paradigms. The Supply.for method produces a supply that, when tapped, will iterate the specified (potentially lazy) list and push the values out to the tapper. It does the iteration asynchronously. While it’s not implemented yet, we’ll be able to define a similar mechanism for taking a Channel and tapping each value that is received.

Crossing the streams

Some of the most powerful – and tricky to implement – combinators are those that involve multiple supplies. For example, merge gives a single supply whose values are those of the two other supplies it tapped, and zip pairs together values from two different supplies. These are tricky to implement because it’s entirely possible that two different threads will be supplying values. Thankfully, though, we just need to manage this once inside of the supplies implementation, and save those using them from worrying about the problem! In a sense, combinators on lists factor out flow control, while combinators on supplies factor out both flow control and synchronization. Both let us program in a more declarative style, getting the imperative clutter out of our code.

Let’s bring all of this together with an example from one of my recent presentations. We simulate a situation where we have two sets of readings coming in: first, measurements from a belt, arriving in batches of 100, which we need to calculate the mean of, and second another simpler value arriving once every 5 seconds. We want to label them, and get a single supply merging these two streams of readings together. Here’s how it can be done:

my $belt_raw = Supply.interval(1).map({ rand xx 100 });
my $belt_avg = $belt_raw.map(sub (@values) {
    ([+] @values) / @values
});
my $belt_labeled = $belt_avg.map({ "Belt: $_" });
my $samples = Supply.interval(5).map({ rand });
my $samples_labeled = $samples.map({ "Sample: $_" });
my $merged = $belt_labeled.merge($samples_labeled);
$merged.tap(&say);
sleep 20;

Notice how it’s not actually that much harder than code that maps and greps lists we already had – and yet we’re managing to deal with both time and concurrently arriving data.

The future

Supplies are one of the most recently implemented things in Rakudo, and what’s done so far works on Rakudo on the JVM. With time, we’ll flesh out the range of combinators and factories, keep growing our test coverage, and deliver this functionality on Rakudo on MoarVM too, for those who don’t want to use the JVM.

Day 14 – Asynchronous Programming: Promises and Channels

Some of the most exciting progress in Perl 6 over the last year has been in the area of asynchronous, concurrent and parallel programming. In this post, we’ll take a look at two of the language features that relate to this: promises and channels. But first…

A Little Design Philosophy

Threads and locks are the assembly language of parallel programming. In the spirit of “make the hard things possible”, Perl 6 does let you spawn a thread and provide you with a Lock primitive. But these are absolutely aimed at those doing the hard things. I’ve written, code-reviewed and taught parallel programming in languages where these were the primary primitives for a while. Doing code reviews was often a fairly depressing affair. It’s not just that there were bugs, it’s that often it felt like the approach taken by the code’s author was, “just throw locks all over the place and all will be well”.

In this post, I’ll focus on the things we have in Perl 6 to help make the easy things easy. They are designed around a number of principles:

  • The paradigms we provide should have a strong focus on being composable, to make it easy to extend, re-use and refactor code
  • Furthermore, it should be easy to compose the various paradigms together, as well as having ways to move between the synchronous and asynchronous worlds where needed
  • Both asynchrony and synchronization should be explicit, happen at clearly defined boundaries, and be done at a fairly high level

In general, the Perl 6 approach is that you achieve concurrency by decomposing a problem into many pieces, communicating through the provided synchronization mechanisms (those in the language, and no doubt a bunch of extra ones that will be provided by the module ecosystem over time). The approach is not about mutating shared memory. That’s decidedly in the “hard things possible” category. The fact that it’s really hard to get right is the main problem, but from a performance perspective, lots of threads competing to write to the same bit of memory is the worst case for CPU caches – which really matter these days.

Promises

A promise is a synchronization primitive for a piece of asynchronous work that will produce a single result at some point in the future, or fail to do so because something went wrong. Different languages have evolved different terms for this idea, or use the terms with different nuances. Both “future” and “task” are often used.

The easiest way to create a promise is:

my $p10000 = start {
    (1..Inf).grep(*.is-prime)[9999]
}

This schedules the work in the block to be done. By default, this means it will be scheduled to run on a pool of threads. Thus, start introduces asynchrony into a program. We continue by executing the next line of code, and the work we specified will be done on another thread. If it runs to completion and produces a result, we say that the promise was kept. If, by contrast, it dies by throwing an exception, then we say the promise was broken.

So, what can you do with a promise? Well, you can ask it for the result:

say $p10000.result;

This blocks until the promise is kept or broken. If it is kept, the value it produced is returned. If it’s broken, the exception is thrown. There’s a neater way to write this:

say await $p10000;

This may take many promises, and so you can do things like:

my @quotes = await @currency_exchanges.map(-> $ex { start { $ex.get_quote($val) } });

Although this will throw an exception if any of them fail. Thus, we may wish to wait on all of them, then just extract those that produced a result:

my @getting = @currency_exchanges.map(-> $ex { start { $ex.get_quote($val) } });
await Promise.allof(@getting);
my @quotes = @getting.grep(*.status == Kept).map(*.result);

There’s something a little interesting in there: allof. This is an example of a promise combinator: something that takes one or more promises as its arguments and creates some kind of composite promise that relates to them. And this brings us to the next interesting and important thing: a promise need not be backed by a piece of asynchronously executing code! For example, we can create a promise that will be kept after a certain amount of time has elapsed:

my $kept_in_10 = Promise.in(10);

Thus, we might provide a basic timeout mechanism, making sure any exchange that doesn’t give us a result in 5 seconds doesn’t get blocked on:

my @getting = @currency_exchanges.map(-> $ex { start { $ex.get_quote($val) } });
await Promise.anyof(Promise.allof(@getting), Promise.in(5));
my @quotes = @getting.grep(*.status == Kept).map(*.result);

Of course, sitting around and waiting for results is just one thing we can do with a promise. We can also provide things that should be done upon the promise being completed. These will also be scheduled and run asynchronously. For example:

my $p10000 = start {
    (1..Inf).grep(*.is-prime)[9999]
}
my $base16 = $p10000.then(sub ($res) {
    $res.result.base(16)
});
my $pwrite = $base16.then(sub ($res) {
    spurt 'p10000.txt', $res.result;
    return 'p10000.txt';
});

Here, we use then in order to specify something that should be done after the promise is kept or broken. This also returns a promise, meaning you can chain another operation into the process. And you can call then multiple times on one promise too, giving a kind of one-off publish/subscribe mechanism (see a future article on supplies for a much richer way to do this kind of thing, however). Note that promise takes care internally to make sure races work out OK (for example, the work being done in the promise is already completed by the time we call then).

You can also create your own promises, keeping or breaking them as you desire. This is as simple as:

# Create the promise.
my $p = Promise.new;

# Take the "vow" object, used to keep/break it.
my $v = $p.vow;

# Later, one of...
$v.keep($result);
$v.break($exception_or_message);

Thus, you can write your own promise factories and combinators too.

Channels

A promise is OK for conveying a single result, but what about producer/consumer scenarios where the producer will produce many values over time, and the consumer will process them as they are available? This is where a channel can come in useful.

Let’s say we want to read in a bunch of INI configuration files, parse each one using a grammar, and then flatten the configuration results into a single hash. There are three distinct steps here, in a producer/consumer relationship, which we can do in parallel. While the final result is a single value, and so a promise feels suitable, there are many files to read and parse. This is where channels come in. Let’s explore them using this example.

First, here is the top level of the program:

sub MAIN() {
    loop {
        my @files = prompt('Files: ').words;
        read_all(@files);
    }
}

This prompts the user for a bunch of filenames, then calls read_all. This is a little more interesting:

sub read_all(@files) {
    my $read = Channel.new;
    my $parsed = Channel.new;
    read_worker(@files, $read);
    parse_worker($read, $parsed) for 1..2;
    my %all_config = await config_combiner($parsed);
    say %all_config.perl;
}

This creates two channels, $read and $parsed. The $read channel will be used by read_worker in order to send the contents of each of the files it reads in along to the parse_worker. Here is read_worker:

sub read_worker(@files, $dest) {
    start {
        for @files -> $file {
            $dest.send(slurp($file));
        }
        $dest.close();
        CATCH { $dest.fail($_) }
    }
}

It uses the send method in order to send along the contents of each file it slurps. After slurping them all, it calls last on the channel to indicate there will be no more. The CATCH block calls fail on the channel to indicate that the producer failed. This will, when reached, throw an exception in the consumer. A channel that has had last or fail called on it can no longer be used to send values. Finally, the whole thing is wrapped in a start block so it is done on a thread in the thread pool.

The parse_worker is a little more interesting:

sub parse_worker($source, $dest) {
    my grammar INIFile {
        token TOP {
            ^
            <entries>
            <section>+
            $
        }

        token section {
            '[' ~ ']' <key> \n
            <entries>
        }

        token entries {
            [
            | <entry> \n
            | \n
            ]*
        }

        rule entry { <key> '=' <value> }

        token key   { \w+ }
        token value { \N+ }

        token ws { \h* }
    }

    my class INIFileActions {
        method TOP($/) {
            my %result;
            %result<_> = $<entries>.ast;
            for @<section> -> $sec {
                %result{$sec<key>} = $sec<entries>.ast;
            }
            make %result;
        }

        method entries($/) {
            my %entries;
            for @<entry> -> $e {
                %entries{$e<key>} = ~$e<value>;
            }
            make %entries;
        }
    }

    start {
        loop {
            winner $source {
                more $source {
                    if INIFile.parse($_, :actions(INIFileActions)) -> $parsed {
                        $dest.send($parsed.ast);
                    }
                    else {
                        $dest.fail("Could not parse INI file");
                        last;
                    }
                }
                done $source { last }
            }
        }
        $dest.close();
        CATCH { $dest.fail($_) }
    }
}

It starts off with a grammar and actions class for INI files. We then sit in a loop, watching the $source channel, which is the one that read_worker is placing results in. If a channel has one more value available, then the more block will be called. Inside it, $_ will contain the slurped contents of an INI file. We then parse it, and provided this worked out send along the hash of hashes representing the INI file’s content (sections at the top level, then key/value pairs). Again, we take care to call fail and last appropriately.

Finally, config_combiner takes each of those hash of hashes, and does the work to combine them into a single hash. It uses a promise to convey the final, single, result.

sub config_combiner($source) {
    my $p = Promise.new;
    my $v = $p.vow;
    start {
        my %result;
        loop {
            winner $source {
                more $source {
                    for %^content.kv -> $sec, %kvs {
                        for %kvs.kv -> $k, $v {
                            %result{$sec}{$k} = $v;
                        }
                    }
                }
                done $source { last }
            }
        }
        $v.keep(%result);
        CATCH { $v.break($_) }
    }
    return $p;
}

And there we have it: a program using promises and channels happily together, in a producer/consumer, map/reduce style.

Day 03 – Rakudo Perl 6 on the JVM

There have been a number of exciting developments for Perl 6 during 2013. In this post, we’ll take a look at one of them in some detail: running Perl 6 on the JVM (Java Virtual Machine).

Why the JVM?

There are many reasons for a language to have an implementation that targets the JVM. Here are some that drove us to bring Perl 6 to this platform.

  • The JVM is a stable, widely deployed,  trusted-in-the-enterprise platform. There are places where they don’t mind which language you write it, but they do care that it can run on the JVM.
  • The JVM has been very well optimized over the years. It most certainly isn’t fast to get started – but for long running things it typically performs well.
  • These days, the JVM is most certainly not just for Java. In fact, the commitment to run other languages – including those very different to Java – is serious. For example, there’s now a yearly JVM Language Summit, and the invokedynamic instruction and infrastructure was added in JDK7, and being improved in JDK8. Since Perl 6 is a gradually typed language, a VM that can play host to both static and dynamic languages is a good fit. Furthermore, every other major dynamic language is on the JVM. So, why not Perl too?
  • The JVM has widely used, well exercised support for concurrent, parallel and asynchronous programming. A wide range of primitives are available. Given that before this year, the Perl 6 story in these areas was also rather weak so far as implementation went, being on the JVM would provide an opportunity for fast prototyping and exploration, to help drive things forward.

But…another Perl 6 implementation?!

Implementing Perl 6 is a large undertaking – as those of us who got sucked into the process along the way have discovered. Many languages have got to the JVM by having a JVM-specific implementation of the language: JRuby, Jython, Nashorn, etc. For Perl 6, we’ve taken a different path.

The Rakudo Perl 6 compiler may only have targeted Parrot for much of its life, but those designing it have had VM portability in mind for a good while. Furthermore, the basic architecture has always been to have strongly isolated compilation stages, communicating by well-defined data structures. This put Rakudo in a good place to gain a JVM backend – at least, in theory. Over the course of the last year, what we hoped would work out well in theory has played out very nicely in practice.

The vast majority of the Rakudo codebase is not in any way VM-specific. Better still, the bits that need to change most often and that undergo most active development are almost always VM-independent. Many developers working on Rakudo test their changes against a single backend, and it’s relatively uncommon to find breakage on the other backend as a result. That said, we have automated daily spectest runs to catch any regressions.

Status

First, let’s consider the specification test suite. You might think at this point I’d mention how close Rakudo on JVM is to passing the number of specification tests that Rakudo on Parrot does. In fact, we need to do it the other way around these days: Rakudo on Parrot passes 99.64% of the spectests that Rakudo on the JVM does. “Huh,” you might think. “How’d the JVM backend come out ahead?” The answer is relatively simple: the JVM backend runs a bunch of concurrency tests that we don’t run on the Parrot backend. There actually are a small number of tests (tens rather than hundreds) that only pass on Rakudo on Parrot, largely due to “interesting” edge-case behaviors that have yet to be hunted down. However, these days the vast majority of programs run unmodified on both.

In the wider ecosystem, things are not quite so polished yet. Panda, the module installer, runs on Rakudo on the JVM. However, a number of modules depend on the NativeCall library, for calling into native code. The NativeCall porting effort is very much underway; last time I looked at it I could do basic things, like calling simple Win32 APIs. But it’s not all the way there yet. This is, however, really the last major missing piece. To say this time last year, we couldn’t run Perl 6 on the JVM at all, we’ve come a very long way.

Is it faster?

Well, it depends. For quick one liners and short-running scripts? No, startup will kill you. For something long running? Yes, usually it’s faster, and sometimes it’s significantly faster (perhaps five times or even forty times). And that’s before we’ve really done a great deal of optimization work on the JVM backend; the focus thus far has largely been “make it work”.

Can I call Java libraries?

Yes, but… :-) We do have some basic interop support in place already. Here’s an example:

use java::util::zip::CRC32:from<java>;

my $crc = CRC32.new();
for 'Hello, Java'.encode('utf-8') {
    $crc.'method/update/(B)V'($_);
}
say $crc.getValue();

It doesn’t look all that bad until you hit the method call in the loop. What’s that funny method/update/(B)V thing about? In Java you can statically overload methods. When there’s no overloading, we quite happily give you a short name. When there’s multiple, for now you need to use the JVM’s method descriptors to indicate the desired one. We’ll improve that, and many other aspects of interop, over the coming months. In summary, it’s often quite possible to call code from Java libraries today, it’s just not pleasant yet.

The future

Much has been done, yet of course there’s still plenty to do. Once NativeCall support is in shape, we’ll be able to add the JVM as an option to the Rakudo Star distribution release (for now, it’s only available in compiler releases – or fresh from Git, of course). Beyond that, the main areas of focus will be convergence, Java interop and performance. Given that this year took us from zero JVM support to Rakudo on JVM being the implementation passing the most spectests, it’s exciting to think where we’ll be in another year from now.