Day 13 – Audio Streaming done Completely Wrong

Starting out

I made the audio streaming source client Audio::Libshout about a year and a half ago, it works quite well: with the speed improvements in Rakudo I have been able to stream 320kb/s MP3 without a problem, but it always annoyed me that it was difficult to test properly and even to test it at all required an Icecast server. Even with an icecast server that I could stream to, it would be necessary to actually listen to the stream in order to determine whether the stream was actually functioning correctly.

This all somewhat came to a head earlier in the year when I discovered that even the somewhat rudimentary tests that I had been using for the asynchronous streaming support had failed to detect that the stream wasn’t being fed at all. What I really needed was a sort of dumb streaming server that could act in place of the real icecast and could be instrumented to determine whether a connection was being made and that the correct audio data was being received. How hard could it be? After all it was just a special kind of web server.

I should have known from experience that this was a slippery slope, but hey.

A little aside about Audio Streaming

An icecast streaming server is basically an HTTP server that feeds a continuous stream of encoded audio data to the client listeners who connect with an HTTP GET request, the data to be streamed is typically provided by a source client which will connect to the server, probably authenticate using HTTP Authentication, and start sending the data at a steady rate that is proportional to the bitrate of the stream. libshout connects with a custom request method of SOURCE which is inherited from its earlier shoutcast origins, though icecast itself understands PUT as well for the source client. Because it is the responsibility of the listening client to supply the decoded audio data to the soundcard at exactly the right rate and the encoded data contains the bitrate of the stream as transmitted from the source the timing demands on the server are not too rigorous: it just has to be consistent and fast enough that a buffer on the client can be kept sufficiently full to supply the audio data to the sound card. Icecast does a little more in detail to, for instance, adjust for clients that don’t seem to be reading fast enough and so forth, but in principle it’s all relatively simple.

Putting it together

As might be obvious by now, an audio streaming server differs from a typical HTTP server in that rather than serving some content from disk or generated by the program itself for example, it needs to share data received on one client connection with one or more other client connections. In the simplest C implementation one might have a set of shared buffers, one of which is being populated from the source connection at any given time, whilst the others are being consumed by the client connections, alternating on filling and depletion. Whether the implementation settles on a non-blocking or threaded source possibly the most critical part of the code will be the synchronisation between the source writer and the client readers to ensure that a buffer is not being read from and written to at the same time.

In Perl 6 of course you’d hope you didn’t need to worry about these kind of annoying details as there are well thought out concurrency features that abstract away most of the nasty details.

From one to many

Perhaps the simplest program that illustrates how easy this might be would be this standalone version of the old Unix service chargen :

use v6.c;

my $supplier = Supplier.new;

start {
    loop {
        for ( 33 ... 126 ).map( { Buf.new($_) }) -> $c {
            sleep 0.05;
            $supplier.emit($c);
        }
    }
}

my $sig = signal(SIGPIPE).tap( -> $v {
    $sig.close;
});

react {
    whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
        my $p = Promise.new;
        CATCH {
            when /'broken pipe'/ {
                if $p.status !~~ Kept {
                    $p.keep: "done";
                }
            }
        }

        my $write = $supplier.Supply.tap( -> $v {
            if $p.status ~~ Planned {
                $conn.write: $v;
            }
            else {
                $write.close;
            }
        });
    }
}

In this the Supplier is being fed asynchronously to stand in for the possible source client in our streaming server, and each client that connects on port 3333 will receive the same stream of characters – if you connect with two clients (telnet or netcat for instance,) you will see they are getting the same data at roughly the same time.

The Supplier provides a shared sequence of data of which there can be many consumers, so each connection provided by the IO::Socket::Async will be fed the data emitted to the Supplier starting at the point the client connected.

The CATCH here is to deal with the client disconnecting, as the first our code will know about this is when we try to write to the connection, we’re not expecting any input from the client we can check and besides the characters becoming available to write may happen sooner than the attempt to read may register the close, so, while it may seem like a bit of a hack, it’s the most reliable and simple way of doing this: protecting from further writes with a Promise. If, by way of experiment, you were to omit the CATCH you would find that the server would quit without warning the first time the first client disconnected.

I’ll gloss over the signal outside the react as that only seems necessary in the case where we didn’t get any input data on the first connection.

Making something nearly useful

The above example is almost all we need to make something that you might be able to use, all we need is for it to handle an HTTP connection from the clients and get a source of actual MP3 data into the Supply and we’re good. To handle the HTTP parts we’ll just use the handy HTTP::Server::Tiny which will conveniently take a Supply that will feed the output data, so in fact we end up with quite a tiny program:

use HTTP::Server::Tiny;

sub MAIN(Str $file) {

    my $supplier = Supplier.new;

    my $handle = $file.IO.open(:bin);

    my $control-promise = start {
        while !$handle.eof {
            my $c = $handle.read(1024);
            $supplier.emit($c);
        }
    }

    sub stream(%env) {
        return 200, [ Content-Type => 'audio/mpeg', Pragma => 'no-cache', icy-name => 'My First Streaming Server'], $supplier.Supply;
    }
    HTTP::Server::Tiny.new(port => 3333).run(&stream);
}

The HTTP::Server::Tiny will run the stream subroutine for every request and we need to do is return the status code, some headers, and a supply from which the output data will be read, the client connection will be closed when the Supply is done (that is when the done method is called on the source Supplier.) It couldn’t really be much more
simple.

Just start the program with the path to a file containing MP3 audio and then point your favourite streaming client at port 3333 on your localhost and you should get the stream, I say should as it makes no attempt to regulate the rate at which the audio data is fed to the client. But for constant bit-rate MP3 data and a client that will buffer as much as it
can get it works.

Of course a real streaming server would read the data frame by frame and adjust the rate according to the bit-rate of each frame. I actually have made (but not yet released,) Audio::Format::MP3::Frame to help do this, but it would be over-kill for this example.

Relaying a stream

Of course the original intent of this streaming server was to be able to test a streaming source client, so we are going to have to add another part that will recognise a source connection, read from that and relay it to the normal clients in a similar way to the above.

You’ll recall that the libshout client library will connect with a request method of SOURCE so we can adjust the file streaming example to identify the source connect and feed the Supplier with the data read from the connection:

use HTTP::Server::Tiny;

sub MAIN() {

    my $supplier = Supplier.new;

    my Bool $got-source = False;

    sub stream(%env) {
        if %env<REQUEST_METHOD> eq 'GET' {
            if $got-source {
                return 200, [ Content-Type => 'audio/mpeg', Pragma => 'no-cache', icy-name => 'My First Streaming Server'], $supplier.Supply;
            }
            else {
                return 404, [ Content-Type => 'text/plain'], "no stream connected";
            }
        }
        elsif %env<REQUEST_METHOD> eq 'SOURCE' {
            my $connection = %env<p6sgix.io>;

            my $finish-promise = Promise.new;

            $connection.Supply(:bin).tap(-> $v {
                $supplier.emit($v);
            }, done => -> { $finish-promise.keep: "done" });

            $got-source = True;
            return 200, [ Content-Type => 'audio/mpeg' ], supply { whenever $finish-promise { done; } };
        }
    }
    HTTP::Server::Tiny.new(port => 3333).run(&stream);
}

You’ll see immediately that nearly all of the action is happening in the request handler subroutine: the GET branch is almost unchanged from the previous example (except that it will bail with a 404 if the source isn’t connected,) the SOURCE branch replaces the reading of the file previously. HTTP::Server::Tiny makes reading the streamed data from the source client really easy as it provides the connected IO::Socket::Async to the handler in the p6sgix.io (which I understand was originally primarily to support the WebSocket module,) theSupply of which is tapped to feed the shared Supplier that
conveys the audio data to the clients. All else that is necessary is to return with a Supply that is not intended to actually provide any data but just to close when the client closes their connection.

Now all you have to do is run the script and feed it with some source client like the following:

use Audio::Libshout;

multi sub MAIN(Str $file, Int $port = 3333, Str $password = 'hackme', Str $mount = '/foo') {

    my $shout = Audio::Libshout.new(:$port, :$password, :$mount, format => Audio::Libshout::Format::MP3);
    $shout.open;
    my $fh = $file.IO.open(:bin);

    while not $fh.eof {
        my $buf = $fh.read(4096);
        say $buf.elems;
        $shout.send($buf);
        $shout.sync;
    }

    $fh.close;
    $shout.close;
}

(Providing the path to some MP3 file again,) then if you connect a streaming audio player you will be getting some audio again.

You might notice that the script can take a password and a mount which aren’t used in this case, this is because Audio::Libshout requires them and also because this is basically the script that I have been using to test streaming servers for the last year or so.

Surprisingly this tiny streaming server works quite well, in testing I found that it ran out of threads before it got too bogged down handling the streams with multiple clients, showing how relatively efficient and well thought out the Perl 6 asynchronous model is. And how simple it is to put together a program that would probably require a lot more
code in many other languages.

Where do we go from here

I’m pretty sure that you wouldn’t want to use this code to serve up a popular radio station, but it would definitely be sufficient for my original testing purposes with a little additional instrumentation.

Of course I couldn’t just stop there so I worked up the as yet unreleased Audio::StreamThing which uses the same basic design with shared supplies, but works more like Icecast in having multiple mounts for individual streams, provision for
authentication and better exception handling.

If you’d find it useful I might even release it.

Postscript

I’d just like to get a mention in for the fabulous DJ Mike Stern as I always use his recorded
sets for testing this kind of stuff for some reason.

Have fun and make more noise.