Thomas Leonard's blog

Asynchronous Python vs OCaml

I've now migrated the asynchronous download logic in 0install from Python to OCaml + Lwt. This post records my experiences using Lwt, plus some comparisons with Python's coroutines. As usual, the examples will be based on the real-world case of 0install, rather than on idealised text-book examples.

Table of Contents

The problem

What happens when you download a program using 0install? To make this concrete, let's look at the downloads that happen when you enter the command:

$ 0launch http://simamo.de/0install/armagetronad.xml

to make this happen:

If the software is cached, we run immediately. If not, we need to download some things first. The steps are (you don't need to remember this!):

  1. Download http://simamo.de/0install/armagetronad.xml. This feed just points us at various sub-feeds that say where to get implementations for each platform.
  2. I'm on Linux (64 bit), so on my computer we next download (concurrently) the four suggested sub-feeds:
  3. Once we have these, we find a dependency on the library http://simamo.de/0install/armagetronad-libs-Linux-x86_64.xml, so we download that XML file too.
  4. Once all the XML downloads have finished, we select a version of Armagetron (0.2.8.3.2 in my case) and its (single) declared library (version 0.3-pre0.1570 here).
  5. We download these two tar.bz2 files (concurrently) and unpack them to the cache.
  6. Finally, we run (as described in previous posts).

That's the overall process. It's not totally trivial, but in fact some of the steps are complex in themselves. For example, to download a single feed (XML) file:

  1. We download the feed from the given URL.
  2. We check the GPG signature on the feed. If we don't have the GPG key, we must download that next.
  3. Once we've checked that the signature is valid, we need to decide whether to trust it. We download information from the key information server.
  4. Depending on the user's configuration and the key information server's response, we may show a confirmation dialog to the user.

In addition:

  1. If the primary site (simamo.de) fails, we try the mirror site https://roscidus.com/0mirror/ instead.
  2. If the primary site is slow, we ask the mirror too. If the primary then succeeds, we cancel the mirror download. If the mirror succeeds first, we use the information we got from it to find more required downloads, but also continue waiting for the primary (which may have more up-to-date information).
  3. If the key information server is slow, we display the dialog to the user anyway, but update the display if the information arrives while the user is pondering.
  4. We never use more than 2 HTTP connections per site at the same time. Further requests are queued.

In other words, there's quite a bit of logic here (and there's still the archive downloads too...). How can we make sure all these operations happen at the right time and that errors are handled correctly?

Solutions

Note that the challenge here is not to use multiple CPUs in parallel to perform some calculation faster, but to schedule and manage multiple concurrent operations. The effects of concurrency will be visible (i.e. the behaviour of the code, such as whether we decide to contact the mirror server or not, depends on how quickly things happen). Therefore, some non-determinism is unavoidable. However, we want to minimise it.

Most languages provide some kind of low-level preemptive multi-threading support, e.g. Python's threading.create, Haskell's forkIO, OCaml's Thread.create, Java's java.lang.Thread and Go's go. In these cases, all threads always run in parallel by default. If two threads access a shared or global variable without appropriate locking, the program will occasionally fail in ways that are hard to reproduce or diagnose.

Of course, these languages provide mutexes, channels, etc to make correct code possible, but this style is unsafe by default. For example, if a multi-threaded program uses some library from multiple threads, and the author of the library was only thinking about single-threaded use, then you likely have a subtle, hard-to-trigger bug.

Let's consider a simplified example: we want to fetch information from the key information server, parse it, and confirm the key with the user (this server says things like "This key belongs to a registered Debian developer"). Within each thread, we might do something like this:

1
2
3
4
5
def confirm_key(feed):
	data = download_key_info(feed.sig)
	info = parse_key_info(data)
	ok = confirm_with_gui(feed, info)
	if ok: ...

Probably this code will crash if two keys are downloaded close together, because the graphical toolkit library used to show the GUI isn't thread-safe. But there could be similar issues with any code we call (is parse_key_info thread-safe, for example? What about the XML parser it uses? etc).

So how can we avoid these problems? Rust uses its linear types to prevent concurrent access to mutable state, which looks very useful. For other languages, we can use cooperative multi-threading.

The idea here is that instead of running threads in parallel by default and remembering to add locks wherever necessary, we run only one thread at a time, switching between threads only at explicitly marked points.

The two schemes have different failure modes. If you forget the locking in preemptive code, you get subtle bugs. If you forget to allow task switching in a cooperative system, the program may run slower (waiting when it could be getting on with something). For an application like 0install, cooperative makes far more sense. Just making downloads and GUI interaction alone concurrent is really all we need.

Callbacks

The simplest scheme to implement uses callbacks. You tell the system to start an operation, and give it a function to call on success:

1
2
3
4
5
6
7
8
9
def confirm_key(feed):
	download_key_info(feed.sig, key_info_downloaded)

def key_info_downloaded(data):
	info = parse_key_info(data)
	confirm_with_gui(feed, info, trust_confirmed)

def trust_confirmed(ok):
	if ok: ...

Here, we don't know what other functions may be called in the time between us calling download_key_info and the key_info_downloaded callback, but while our code is executing we know that we have complete control. For example, it's not a problem if parse_key_info here only supports single threading.

Callbacks have two major problems:

  • They make the code messy and hard to read.
  • They handle exceptions poorly.

Imagine that download_key_info has succeeded. It calls the key_info_downloaded callback. That calls parse_key_info, which throws an exception. The exception gets returned to download_key_info which can't do anything useful with it. Probably, it gets logged and the program hangs, waiting for a call to trust_confirmed that will never happen.

Promises

Promises are a nice alternative to callbacks. When you start an operation, you get a promise for the result. A promise is a place-holder for a result that will arrive in the future. Without any special syntax, using promises might look something like this:

1
2
3
4
5
6
7
8
9
10
11
def confirm_key(feed):
	data_promise = download_key_info(feed.sig)
	return data_promise.when_fulfilled(key_info_downloaded)

def key_info_downloaded(data):
	info = parse_key_info(data)
	confirmation_promise = confirm_with_gui(feed, info)
	return confirmation_promise.when_fulfilled(trust_confirmed)

def trust_confirmed(ok):
	if ok: ...

The function promise.when_fulfilled(callback) immediately returns a new promise for the (future) result of the callback.

Internally, a promise initially contains a queue for callbacks. When the promise is (eventually) resolved to a value, the callbacks are all run and the queue is replaced by the value. Attempting to attach any further callbacks just runs them immediately on the value.

Promises have a number of advantages over callbacks. For example, you can store promises of results in lists, pass them to other functions, etc. One particular advantage is exception handling. Consider our previous example:

  1. confirm_key returns a promise for the result of key_info_downloaded.
  2. download_key_info downloads the data successfully, fulfilling data_promise.
  3. key_info_downloaded is called (it was attached to data_promise as a callback).
  4. parse_key_info throws an exception, which is caught by the promise system.
  5. This "breaks" the promise returned by confirm_key.
  6. Whoever was waiting for confirm_key gets notified of the exception.

Notice that instead of propagating uncaught exceptions backwards (to download_key_info), we propagate them forwards (to whoever is waiting for the result). The result is that, as in synchronous programming, an exception is not lost just because someone in the chain doesn't handle it.

A natural next step is to introduce some simpler syntax for this...

OCaml Lwt

OCaml provides a couple of libraries for handling promises - Lwt and Jane Street's Async. I've only looked at Lwt, although they seem fairly similar.

The terminology I introduced above I learnt from E (which also has sophisticated distributed promises). I find the E terms more natural, but here's a conversion table:

E term Lwt term
Promise Thread
Fulfilled promise Returned thread
Broken promise Failed thread
Unresolved promise Sleeping thread
Resolver Waker

In particular, while a Lwt thread is still working to produce a result, the thread is said to be "sleeping", which I find rather awkward. A resolver/waker is the object used by the maker of the promise to resolve it.

Anyway, switching to OCaml and using Lwt without the syntax extensions, we get this:

1
2
3
4
5
6
7
8
9
let confirm_key feed =
  let data_promise = download_key_info feed.signature in 
  Lwt.bind data_promise (fun data ->
    let info = parse_key_info data in
    let confirmation_promise = confirm_with_gui feed info in
    Lwt.bind confirmation_promise (fun ok ->
      if ok then ...
    )
  )

Here, Lwt.bind promise callback is like our previous promise.when_fulfilled(callback). Again, confirm_key returns a promise (thread) for the final result.

To make things more convenient, you can enable the Lwt syntax extension. This provides thread-aware alternatives to several built-in OCaml keywords:

1
2
3
4
5
let confirm_key feed =
  lwt data = download_key_info feed.signature in 
  let info = parse_key_info data in
  lwt ok = confirm_with_gui feed info in
  if ok then ...

As if by magic, our asynchronous code now reads like the original synchronous code! lwt is the new way to do Lwt.bind, by analogy with the ordinary let construct. We just have to remember that we give up control between evaluating the right-hand side of the assignment (getting a thread/promise for the data) and assigning the actual data on the left-hand side. For example, another function might change a global variable while we're waiting for the promise to resolve.

The other short-cuts are try_lwt, for_lwt, while_lwt and match_lwt, which do what you'd expect. As a bonus, try_lwt also adds a finally construct and for_lwt adds iteration over sequences - both are missing for the core OCaml language.

There are plenty of functions for combining or creating threads in various ways, including:

  • let thread, waker = Lwt.wait () explicitly creates a promise and a resolver for it.
  • Lwt.return value evaluates to a returned thread, which is useful if something needs a thread type but you already have the value.
  • Lwt.choose threads waits until one of the given threads is ready.
  • Lwt.join threads returns a single thread that returns when all of the given threads have returned.
  • Lwt_list.map_s fn items applies fn to each item, waiting for the resulting thread to resolve before doing the next item.
  • Lwt_list.map_p fn items as above, but runs all the threads in parallel.

Python generators

Python has an unusual solution to the problem, using its generator functions.

A generator is any function which contains a yield. Running such a function gets you an iterator. Each time you ask for a value from the iterator, the generator runs until the next yield to produce the result. It is suspended until the next call. Generators were originally just an easy way to produce sequences, for example:

1
2
3
4
5
6
7
8
def count():
  x = 0
  while True:
    yield x
    x += 1

for x in count():
  print(x)

However, this ability to suspend and resume functions is obviously useful for cooperative multi-threading too and, like many other people, I used them to create a such a system (back in 2004). The version used in the Python version of 0install was designed for Python 2.3, but since then Python has added many useful new features so I'll describe the recent Tulip/asyncio system rather than my own, even though I haven't actually used it much.

The idea is that every time you need to wait, you yield the promise ("future" in Python terminology) you're waiting for. When it's ready, the scheduler will resume your generator function with the result:

1
2
3
4
5
def confirm_key(feed):
	data = yield from download_key_info(feed.sig)
	info = parse_key_info(data)
	ok = yield from confirm_with_gui(feed, info)
	if ok: ...

Examples

Both systems (OCaml Lwt and Python generators) work very well in general. Here are some (slightly simplified) examples from 0install.

Following a recipe

Some downloads require collecting files from several places (e.g. an upstream tarball and some files to patch it with). We want to download the files in parallel, but execute the steps (e.g. unpacking downloads into the target directory) in series. My solution is that each download is a thread that performs the download and then returns a lazy thunk that applies it:

1
2
3
4
5
6
7
8
9
10
  (* Start all the downloads in parallel. *)
  let downloads = steps |> List.map do_step in

  (* Now iterate over the steps in series. *)
  downloads |> Lwt_list.iter_s (fun unpack ->
    (* Wait for download *)
    lwt unpack = unpack in
    (* Apply download to directory *)
    Lazy.force unpack
  )

Note that we start unpacking as soon as possible; we only wait when the next thing to unpack isn't downloaded yet.

This was my first attempt at a Python version with asyncio:

1
2
3
4
5
6
7
8
9
10
# Start all downloads in parallel
downloads = [do_step(step) for step in steps]

# Wait for all downloads to complete
tasks, _ = yield from asyncio.wait(downloads)

# Unpack each download in sequence
for task in tasks:
	unpack = task.result()
	yield from unpack()

An interesting difference is that OCaml threads, once started, continue to run by themselves even if no-one is waiting for the result. When the OCaml code is waiting for the first download to complete, the other downloads are still going on. But if we yield from just the first download in Python, only that download makes progress. In the code above, therefore, the Python waits for all downloads to complete before it starts unpacking.

You can fix this by wrapping the future with async:

1
2
3
4
5
6
7
8
9
# Start all downloads in parallel
downloads = [asyncio.async(do_step(step)) for step in steps]

# Now iterate over the steps in series.
for d in downloads:
	# Wait for download
	unpack = yield from d
	# Apply download to directory
	yield from unpack()

Downloading with libcurl

libcurl doesn't provide Lwt support. However, it is thread-safe. We can therefore use the Lwt_preemptive module to run each download in a real operating system thread and get a promise for its completion. In addition, we use a Lwt_pool to keep up to two Curl connections per site (queuing further requests).

When it's our turn to run, we also start a five second timer if the caller wanted to be notified if the download is slow. This is used when downloading the small XML metadata files so the mirror can be tried in parallel (for archives, we only try the mirror if the download actually fails).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
let make_site max_per_site =
  let create_connection () = Lwt.return (Curl.init ()) in
  let pool = Lwt_pool.create max_per_site create_connection in

  object
    method schedule_download ?if_slow channel url =
      Lwt_pool.use pool (fun connection ->
	let timeout = if_slow |> pipe_some (fun if_slow ->
	  let timeout = Lwt_timeout.create 5 if_slow in
	  Lwt_timeout.start timeout;
	  Some timeout;
	) in

	let download () =
	  download_in_thread connection channel url in

	try_lwt
	  Lwt_preemptive.detach download ()
	finally
	  timeout |> if_some Lwt_timeout.stop;
	  Lwt.return ()
    end

The download_in_thread function also needs to send progress notifications to back to Lwt, which it does using Lwt_preemptive.run_in_main.

Update: note that recent versions of ocurl support Lwt directly.

Python provides the ThreadPoolExecutor, which combines pooling and preemptive threading. This makes it a bit harder to start the timer (which should happen cooperatively), so we need to use call_soon_threadsafe, which is like Lwt's run_in_main. Python doesn't seem to provide a way to manage the HTTP connections with the pool - I guess you have to do that manually.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Site:
    def __init__(self, max_per_site=2):
        self.pool = futures.ThreadPoolExecutor(max_per_site)

    def schedule_download(self, channel, url, if_slow = None):
        thread_ready = asyncio.Future()
        def timer():
            # Wait for an executor to be ready...
            yield from thread_ready
            # Wait until 5 seconds into the download
            yield from asyncio.sleep(5)
            # Notify that the download is slow
            if_slow()

        def run_in_thread():
            connection = ...
            if if_slow:
                loop.call_soon_threadsafe(thread_ready.set_result,
					  True)
            download_in_thread(connection, channel, url)

        download = loop.run_in_executor(self.pool, run_in_thread)

        t = asyncio.async(timer())
	try:
	    yield from download
	finally:
	    t.cancel()

Error handling in key look-ups

Lwt does have a gotcha for error handling. Consider this code:

1
2
3
4
5
6
7
8
9
let confirm_key feed =
  lwt data =
    try
      download_key_info feed.signature
    with Failure msg ->
      log_warning "Failed to download key info: %s" msg;
      Lwt.return []
  in
  let info = parse_key_info data in

If querying the key info server fails, we want to log the error but continue with the confirmation, just with an empty list of hints.

Something I really dislike is code that looks right, compiles without warnings, works when you test it, and then fails in the field. Unfortunately, this code does just that. Even if you unit-test the error case!

The bug occurs because we accidentally used try rather than try_lwt. download_key_info successfully returns a promise for the information, so the with clause isn't triggered and we exit the try block. Then Lwt waits for the promise to resolve so it can set data. When unit-testing, you'll probably raise the test exception immediately and so the with block does get called.

By contrast, Python's generators have no such problems:

1
2
3
4
5
6
7
8
9
def confirm_key(feed):
	try:
		data = yield from download_key_info(feed.sig)
	except Exception as ex:
		logging.warning("Failed to download key info: %s", ex)
		data = []
	info = parse_key_info(data)
	ok = yield from confirm_with_gui(feed, info)
	if ok: ...

The other Lwt constructs don't have this problem because the type-system will detect the error (e.g. if you use match instead of match_lwt), but with try and try_lwt the type signatures are the same.

Failing over to a mirror

We start downloading each XML feed from its primary site, but trigger a timeout task if it takes too long. The timeout starts a download from the mirror, which happens in parallel with the original download attempt. We don't want to start the timer immediately because the download might get queued due to the rate limiting code, so we just pass the if_slow trigger to the download system (see above).

Because we need to report intermediate results (e.g. we have downloaded a possibly-slightly-old version from the mirror), we return a pair of the new result and a promise for the next update (or None if this is the last). In a similar way, we return errors as a pair of the current error (e.g. "mirror failed") and a promise for the other result.

Lwt.choose selects the result of the first task from a list to resolve. When choosing between the primary and the mirror however we ignore the result and test explicitly, because we need to know which one it was.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
let timeout_task, timeout_waker = Lwt.wait () in
let if_slow () = Lwt.wakeup timeout_waker `timeout in

let primary = do_primary_download ~if_slow feed in

(* Download just the upstream feed, unless it takes too long... *)
match_lwt Lwt.choose [primary; timeout_task] with
| `ok result ->
    `update (result, None) |> Lwt.return
| `problem msg ->
    let mirror = do_mirror_download () in
    `problem (msg, Some (wait_for_mirror mirror)) |> Lwt.return
| `timeout ->
    (* OK, maybe it's just being slow... *)
    log_info "Feed download from %s is taking a long time." feed_url;

    (* Start downloading from mirror... *)
    let mirror = do_mirror_download () in

    (* Wait for a result from either *)
    lwt _ = Lwt.choose [primary; mirror] in

    match Lwt.state primary with
    | Lwt.Fail msg -> raise msg
    | Lwt.Sleep ->
	(* The mirror finished first *)
	begin match_lwt mirror with
	| `ok result ->
	    log_info "Mirror succeeded, but will continue to wait for primary";
	    `update (result, Some (wait_for_primary primary)) |> Lwt.return
	| `problem msg ->
	    log_info "Mirror download failed: %s" msg;
	    wait_for_primary primary end
    | Lwt.Return v ->
	(* The primary returned first *)
	match v with
	| `ok result ->
	    Lwt.cancel mirror;
	    `update (result, None) > Lwt.return
	| `problem msg ->
	    `problem (msg, Some (wait_for_mirror mirror)) |> Lwt.return

I'm too lazy to translate this into modern Python, but I think it's clear that a direct translation would be easy enough.

The main problem would be losing OCaml's type checking, which ensures that we handle all the possible error conditions. I simplified the outcomes into just ok and problem above, but in the real code we also distinguish replay_attack, aborted_by_user and no_trusted_keys, and handle them differently. For example, a "replay attack" from the mirror (where the mirror gives us a version older than one we've already seen) is ignored, whereas it's reported if it comes from the primary.

Switches

The Lwt_switch module provides a way to group a set of activities together so you can stop them all at once. You create a switch and pass it to all the various setup functions you call. When you're done, call Lwt_switch.turn_off to kill everything. For example, each download goes to a temporary file. To ensure they're all deleted afterwards:

1
2
3
4
5
6
let switch = Lwt_switch.create () in
try_lwt
  let tmpfile = download ~switch url in
  ...
finally
  Lwt_switch.turn_off switch

It's easy to attach whatever finalisation code you want to a switch, e.g.

1
2
3
4
5
6
7
let download ~switch url =
  let tmpfile = make_temp_file () in
  Lwt_switch.add_hook
    (Some switch)
    (fun () -> Unix.unlink tmpfile; Lwt.return ());
  ...
  Lwt.return tmpfile

Then it doesn't matter whether we download successfully, raise an exception inside of download, raise an exception after calling download, etc; the file always gets deleted.

Perhaps this is bad API design and I shouldn't rely on download's caller to clean up the file if download fails, but it does seem convenient. To avoid mistakes, I used ~switch to force the caller to pass a switch instead of the more normal ?switch (where use of a switch is optional).

Parallel tasks

Regular OCaml lets you assign multiple variables at once using and, so that all the expressions are evaluated in a context where none of them is bound. For example, to switch the names of two variables:

1
2
3
let x = y
and y = x in
...

Lwt uses this syntax with lwt to create multiple tasks in parallel and then wait for all of them. For example, when we run a command we may want to collect the standard output and standard error separately but in parallel (if we did them in series, the process might get stuck trying to write its stderr while we were trying to read its stdout, if the Unix pipe gets full). With this syntax, we can get the two strings with just:

1
2
3
lwt stdout = Lwt_io.read child#stdout
and stderr = Lwt_io.read child#stderr in
...

Conclusions

0install needs to manage several fairly complex concurrent download activities, including error handling, timeouts and mirrors. Cooperative multi-threading allows us to support this easily with a low risk of race conditions.

Python and OCaml both provide powerful and easy-to-use cooperative threading support. I think Python's generators are slightly easier to understand for beginners, but I find both quite easy to use. I find Lwt's terminology a little confusing, but thinking of threads as promises seems to help. Both systems handle exceptions sensibly.

Comparing Python and OCaml code, they're pretty similar. Both make it easy to start and manage cooperative threads, to interact with pools of preemptively threaded code (e.g. libcurl) and to handle errors. Using OCaml variants for network errors rather than exceptions is useful; this ensures that all such errors are handled. If you rely on exceptions instead then things mostly work, but watch out for using try rather than try_lwt.

The old 0install Python code used a custom system built on top of Python's generators, but Python's new asyncio module provides a standardised replacement (asyncio will be added to the standard library in Python 3.4). Lwt has been around for a while and is already available from Linux distribution repositories.

Lwt also integrates with several other libraries, including GTK, OBus (D-BUS bindings) and React. Lwt seems very reliable. The only bug I found in Lwt so far was a pipe read failure on Windows, which they quickly fixed.