[MLton-user] simple, portable, asynchronous programming in SML

Stephen Weeks sweeks@sweeks.com
Wed, 19 Jul 2006 15:08:14 -0700


Lately I've been thinking about asynchronous programming in SML
(e.g. network or GUI programming), where computations occur in
response to external events that can occur at any time, possibly after
some long delay (e.g. waiting for input from a socket or for keyboard
input).  An obvious answer to the question of how to write such code
is to use something like Concurrent ML (CML), which has first-class
events and preemptive threads.  This allows one to write code in a
natural style, where a thread blocks until an event becomes enabled
and other threads can continue in the interim.

Unfortunately, there are several problems with using CML.  First and
foremost, programming with preemptive threads is significantly more
challenging than single-threaded programming.  Second, while MLton has
some support for CML, the basis library has not yet been updated to be
thread safe.  Doing this would be difficult, and would be further
complicated by the desire to not impose a performance hit on
single-threaded code.  Third, CML is not implemented with portable SML
code -- its implementation requires (at least) primitives for grabbing
the current thread (or continuation).  These primitives are available
in MLton and SML/NJ but not in other SML implementations.

It would be nice to have a portable SML library for asynchronous
programming that lets one keep the simple reasoning of single-threaded
programming.  In addition to simplifying user code,
single-threadedness would enable the use of existing library code
unchanged, including the MLton basis library.  What follows is an
explanation of such a library.

This is my first write-up of this idea and code, so there are surely
errors and places for improvement.  Please send your thoughts.  My
hope for sending this note is to get good feedback, and then to polish
the code and explanation, adding them to the wiki and possibly the
MLton library.  The code is under the MLton license, so feel free to
try it out in your own projects.


If you are familiar with CML, the basic idea is derived from the
following equation:

  CML = Events + Threads + Preemption

The idea is to directly implement just the "event" part of CML in
portable SML, leaving threads and preemption as (non-portable) add-ons.
Almost all of the CML abstractions (channel, ivar, mvar, mailbox,
multicast) make perfect sense without threads and preemption, and can
easily be implemented in portable SML.  As a separable decision, if
the SML implementation supports threads (or continuations), one can
trivially use them with events (the key operation be Cml.sync).
Further, as a separable decision, if the SML implementation supports
preemption (e.g. via a timer signal), one can choose to use that.  But
a big benefit of this approach is to avoid preemption, and to live
with just events, or possibly events plus non-preemptive threads.

Here is the signature I have in mind for events.

--------------------------------------------------------------------------------
signature EVENT =
  sig
     type 'a t

     val always: 'a -> 'a t
     val choose: 'a t list -> 'a t
     val channel: unit -> {get: 'a t, put: 'a -> unit t}
     val every: 'a t * ('a -> unit) -> unit
     val ivar: unit -> {get: 'a t, put: 'a -> unit}
     val mailbox: unit -> {get: 'a t, put: 'a -> unit}
     val multicast: unit -> {get: unit -> 'a t, put: 'a -> unit}
     val mvar: unit -> {get: 'a t, put: 'a -> unit}
     val never: unit -> 'a t
     val runHandlers: unit -> unit
     val when: 'a t * ('a -> unit) -> unit
     val wrap: 'a t * ('a -> 'b) -> 'b t
  end
--------------------------------------------------------------------------------

If you're familiar with CML, this should look somewhat, although not
completely, familiar.  In addition to throwing out parts of CML, I've
also MLton-ized the signature and cleaned up things.  In any case, I
will explain from the ground up.

An event of type 'a t is a first-class object that represents the
possibility of communicating a value of type 'a from one part of the
program to another.  At any point in time, an event is either enabled
or disabled, depending on whether it is able to supply a value.

Events naturally express asynchronicity.  The simplest possible
example might be a timeout event that becomes enabled at some future
time (and remains enabled thereafter).  As another example, one might
have an event that represents the bytes coming over a socket.  The
event is enabled when bytes are available, and disabled if not.  As
another example, one might have an event that represents keyboard
input, and becomes enabled with the key value of each keypress.

The key operation on events that allows one to consume the value
supplied by an event is "when".

  val when: 'a t * ('a -> unit) -> unit

If "e" is an event of type 'a t, and h: 'a -> unit, then

  when (e, h)

says that the next time "e" becomes enabled with value "a", for the
handler "h" to consume "a", i.e. run "h a".  "h" is referred to as a
"handler", since it is handling a value supplied by "e".  When does "h
a" actually run?  Since there are no threads or preemption, it doesn't
run immediately or in its own thread.  Rather, the handler is
"scheduled" to run at some later date.  This simply stores the handler
in some hidden state of the event module.  It is up to clients of the
event module to call "runHandlers"

  val runHandlers: unit -> unit

to run all scheduled handlers.  The expectation is not that user code
is littered with calls to runHandlers.  Rather, the expectation is
that the code that interfaces with the low-level asynchronous system
stuff (select, GUI callbacks, etc.) calls runHandlers whenever system
events enable new handlers.  At that point, a single call to
runHandlers suffices to propagate all the effects of the low-level
event throughout the SML world.

The benefit of this approach to handlers is preservation of the
non-preemptive programming model and the immensely easier reasoning
about programs that goes along with it.  Code that installs handlers,
i.e. calls "when", doesn't have to worry about the handlers running
while it is running and interfering with its state.  Handlers
themselves are assured to run to completion without interruption from
other handlers.  There is no unexpected context switching due to
preemption or due to blocking.  Code doesn't block -- it just installs
handlers.

The basic operation that allows two parts of the program to
communicate using events is a FIFO channel.

  val channel: unit -> {get: 'a t, put: 'a -> unit t}

If one constructs a channel with

  val {get, put} = channel ()

then "put" and "get" correspond to the two sides, input and output, of
the channel.  "get" is an event that is enabled whenever some other
part of the program is putting a value on the other side of the
channel, while "put a" is enabled whenever some other part of the
program is getting a value from the channel.  For example, suppose
that one does the following:

  when (put a, h1)
  when (get, h2)
  when (get, h3)

Then, "h1" is waiting for "put a" to be enabled, i.e. waiting for some
other part of the program to "get" from the channel.  Fortunately,
both "h2" and "h3" are waiting for a put.  Since both sides of the
channel are enabled, a communication happens.  Whether h2 or h3
receives "a" is unspecified; supposing it is h2, then the handler "h1
()" will run, as will "h2 a".  The term for this kind of communication
is "synchronous rendezvous" because both sides of the communication
wait until the other is enabled, and then both are simultaneously
committed to the communication occurring.  In this example, "h3" would
still be waiting for another value to be put on the channel.

In addition to channels, there are several event combinators for
creating basic events and building more complicated events from
simpler ones.

  val always: 'a -> 'a t
  val never: unit -> 'a t
  val choose: 'a t list -> 'a t
  val wrap: 'a t * ('a -> 'b) -> 'b t

"always a" returns an event that is always enabled with value "a".
"never ()", returns an event that is never enabled.  If "e1", ...,
"en" are events, then

  choose [e1, ..., en]

is a new event that is enabled whenever any of its constituent events
ei.  "choose" implements the idea of selective communication.  

If "e" is an event of type 'a t and f: 'a -> 'b, then

  wrap (e, f)

is enabled with value "f a" whenever "e" is enabled with value "a".
The computation "f a" only occurs, however, if and when the event is
committed to.

For events that repeatedly fire, a useful construct is "every"

  val every: 'a t * ('a -> unit) -> unit

The idea is that "every (e, h)" waits until "e" is enabled with value
"a", at which point it handles "a" with "h", and then repeats, waiting
for "e".  That is, "every (e, h)" is equivalent to the following
infinite expression E:

  E = when (e, fn a => (h a; E))

"every" is easily defined using recursion and "when".

  fun every (e, h) =
     let
        fun loop () = when (e, fn a => (h a; loop ()))
     in
        loop ()
     end

Finally, there are several other ways to communicate using events (all
of these appear in CML): mailboxes, ivars, mvars, and multicast
channels.

A mailbox is exactly like a channel except that "put" is buffered.

  val mailbox: unit -> {get: 'a t, put: 'a -> unit}

For a mailbox, "get" is exactly the same as for channel -- it is
enabled when some value is in the mailbox.  The difference between
channel and mailbox is in the type of "put", which returns "unit t"
for channels and "unit" for mailboxes.  For a mailbox, put immediately
commits to putting a value, whereas for a channel, put returns an
event that will be enabled only when the getter is there.  Mailboxes
are easily implemented in terms of channels and "when".

  fun mailbox () =
     let
        val {get, put} = channel ()
     in
        {get = get,
         put = fn a => when (put a, ignore)}
     end

An ivar is a write-once variable.  Once it is written to, subsequent
writes fail and all reads succeed.

  val ivar: unit -> {get: 'a t, put: 'a -> unit}

An ivar is easily implemented in terms of a channel.  The idea is that
"put a" starts a loop that repeatedly puts "a" on the channel.

  fun ivar () =
     let
        val {get, put} = channel ()
        val isSet = ref false
        val put =
           fn a =>
           if !isSet then
              die "ivar"
           else
              every (put a, ignore)
     in
        {get = get,
         put = put}
     end

An mvar is a cell that is either empty or full, in which case it
holds a single value.  It is like an "'a option ref". 

  val mvar: unit -> {get: 'a t, put: 'a -> unit}

"put" stores a value in the cell.  It is an error to call put if the
cell is full.  "get" is enabled with value "a" when the cell is full,
holding value "a".  Handling a "get" removes the value from the cell.

A multicast is a FIFO channel to which one writes a sequence of
messages that any number of readers can see.  It is like a mailbox in
terms of put, but where each reader gets his own private output
channel to read all the messages.

  val multicast: unit -> {get: unit -> 'a t, put: 'a -> unit}

If one constructs a multicast channel with
  
  val {get, put} = multicast ()

then "put" adds another value to the sequence of messages.  A call to
"get" at time t returns an event that will receive (in order) every
value put after time t.

That covers all of the event module.  I've appended complete code
implementing it, less than 400 lines, to the end of this message.  The
code also includes a space-safe version of mutable queues, where enque
is curried so that one can hold on to the back of the queue without
holding on to the entire queue.

The main CML operation that is not implemented is "sync", which blocks
a thread on an event until the event becomes enabled.

  val sync: 'a t -> 'a

It is impossible to implement sync without support from the
compiler/runtime.  A simple way such support might be provided is via
a pause function.

  val pause: (('a -> unit) -> unit) -> 'a

The idea is that "pause f" pauses the currently running thread and
passes to "f" a function "g" such that "g a" will re-enable the thread
to continue with value "a".  With pause, sync can be easily
implemented.

  fun sync e = pause (fn k => when (e, k))

One could add sync like this, and program with non-preemptive threads
without changing the model much.  The main problem with sync is that
whether a function blocks or not is no longer reflected in the type
system.  Without sync, the only equivalent of blocking is for a
function to return an event, which is then reflected in its type.  The
problem with blocking is that it allows other threads/handlers to run,
and so complicates reasoning about a piece of code.  So, either one
must trust the documentation for a function when it says it doesn't
block, or one must assume that it does block and take into account all
possible interactions with other threads/handlers.  Thus, I think it
is much better to program without sync if possible.

If the compiler/runtime support timer signals, it is possible to use
them to add preemption to the above, but that seems like a bad idea to
me.  The simplicity of non-preemptive reasoning is gone, and the
implementation below is almost certainly broken, as it is not thread
safe.

Code follows.

--------------------------------------------------------------------------------

signature EVENT =
  sig
     type 'a t

     val always: 'a -> 'a t
     val choose: 'a t list -> 'a t
     val channel: unit -> {get: 'a t, put: 'a -> unit t}
     val every: 'a t * ('a -> unit) -> unit
     val ivar: unit -> {get: 'a t, put: 'a -> unit}
     val mailbox: unit -> {get: 'a t, put: 'a -> unit}
     val multicast: unit -> {get: unit -> 'a t, put: 'a -> unit}
     val mvar: unit -> {get: 'a t, put: 'a -> unit}
     val never: unit -> 'a t
     val runHandlers: unit -> unit
     val when: 'a t * ('a -> unit) -> unit
     val wrap: 'a t * ('a -> 'b) -> 'b t
  end

structure List =
   struct
      fun foreach (l, f) = List.app f l
   end

structure Util =
  struct
     fun const c _ = c

     fun die s = raise Fail s

     fun pass a f = f a

     fun recur (a, f) =
        let
           fun loop a = f (a, loop)
        in
           loop a
        end
  end

structure Queue:>
  sig
     type 'a t

     val deque: 'a t -> 'a option
     val enque: 'a t -> 'a -> unit
     val make: unit -> 'a t
  end =
  struct
     structure Node =
        struct
           datatype 'a t = T of ('a * 'a t) option ref

           fun make () = T (ref NONE)
        end

     datatype 'a t = T of {back: 'a Node.t ref,
                           front: 'a Node.t ref}

     fun make () =
        let
           val n = Node.make ()
        in
           T {back = ref n, front = ref n}
        end

     fun enque (T {back, ...}) = fn a =>
        let
           val Node.T r = !back
           val n = Node.make ()
        in
           r := SOME (a, n);
           back := n
        end

     fun deque (T {front, ...}) =
        let
           val Node.T r = !front
        in
           case !r of
              NONE => NONE
            | SOME (a, n) => (front := n; SOME a)
        end
  end

structure Event:> EVENT =
struct

open Util

val handlers: (unit -> unit) Queue.t = Queue.make ()

val scheduleHandler: (unit -> unit) -> unit = Queue.enque handlers

fun runHandlers () =
  recur ((), fn ((), loop) =>
         case Queue.deque handlers of
            NONE => ()
          | SOME h => (h (); loop ()))

structure Handler:
  sig
     type 'a t

     val hasBeenScheduled: 'a t -> bool
     val make: ('a -> unit) -> 'a t
     val precompose: 'a t * ('b -> 'a) -> 'b t
     (* It is an error to call Handler.schedule h if Handler.hasBeenScheduled h.
      *)
     val schedule: 'a t -> 'a -> unit
  end =
  struct
     datatype 'a t = T of {handler: 'a -> unit,
                           hasBeenScheduled: bool ref}

     fun make f =
        T {handler = f,
           hasBeenScheduled = ref false}

     fun hasBeenScheduled (T {hasBeenScheduled = h, ...}) = !h

     fun schedule (T {handler, hasBeenScheduled}) =
        if !hasBeenScheduled then
           die "Handler.schedule of handler that hasBeenScheduled"
        else
           let
              val () = hasBeenScheduled := true
           in
              fn a => scheduleHandler (fn () => handler a)
           end

     fun precompose (T {handler, hasBeenScheduled}, f) =
        T {handler = handler o f,
           hasBeenScheduled = hasBeenScheduled}
  end

structure Handlers:
  sig
     type ('a, 'b) t

     val add: ('a, 'b) t -> 'b -> 'a Handler.t -> unit
     val make: unit -> ('a, 'b) t
     val scheduleAll: ('a, unit) t -> 'a -> unit
     val scheduleOne: ('a, 'b) t -> (('a -> unit) * 'b) option
  end =
  struct
     datatype ('a, 'b) t = T of {extra: 'b,
                                 handler: 'a Handler.t} Queue.t

     fun make () = T (Queue.make ())

     fun add (T q) =
        let
           val enque = Queue.enque q
        in
           fn e => fn h => enque {extra = e, handler = h}
        end

     fun scheduleAll (T q) =
        let
           val hs =
              recur
              ([], fn (hs, loop) =>
               case Queue.deque q of
                  NONE => hs
                | SOME {handler = h, ...} =>
                     loop (if Handler.hasBeenScheduled h then
                              hs
                           else
                              Handler.schedule h :: hs))
        in
           fn a => List.foreach (hs, pass a)
        end

     fun scheduleOne (T q) =
        recur ((), fn ((), loop) =>
               case Queue.deque q of
                  NONE => NONE
                | SOME {extra, handler} =>
                     if Handler.hasBeenScheduled handler then
                        loop ()
                     else
                        SOME (Handler.schedule handler, extra))
  end

structure PollResult =
  struct
     datatype 'a t =
        Enabled of 'a
        (* Invariant: We never pass a handler that hasBeenScheduled to a
         * NotEnabled.
         *)
      | NotEnabled of 'a Handler.t -> unit
  end
datatype z = datatype PollResult.t

datatype 'a t = T of unit -> 'a PollResult.t

fun always a = T (const (Enabled a))

fun never () = T (const (NotEnabled ignore))

fun poll (T p) =
  case p () of
     Enabled a => SOME a
   | NotEnabled _ => NONE

fun addHandler (T p, h) =
  if Handler.hasBeenScheduled h then
     ()
  else
     case p () of
        Enabled a => Handler.schedule h a
      | NotEnabled f => f h

fun when (d, g) = addHandler (d, Handler.make g)

fun every (e, f) =
  recur ((), fn ((), loop) => when (e, fn a => (f a; loop ())))

fun wrap (T p, g) =
  T (fn () =>
     NotEnabled
     (fn h =>
      let
         val h = Handler.precompose (h, g)
      in
         case p () of
            Enabled a =>
               Handler.schedule (Handler.precompose (h, const a)) ()
          | NotEnabled f =>
               f h
      end))

fun ivar () =
  let
     val getters = Handlers.make ()
     val r = ref (NotEnabled (Handlers.add getters ()))
     val get = T (fn () => !r)
     fun put a =
        case !r of
           NotEnabled _ =>
              (r := Enabled a;
               Handlers.scheduleAll getters a)
         | _ => die "ivar put"
  in
     {get = get, put = put}
  end

fun mvar () =
  let
     val getters = Handlers.make ()
     val add = Handlers.add getters ()
     val r = ref NONE
     fun put a =
        if isSome (!r) then
           die "mvar put"
        else
           case Handlers.scheduleOne getters of
              NONE => r := SOME a
            | SOME (h, ()) => h a
     val get =
        T (fn () =>
           case !r of
              NONE => NotEnabled add
            | SOME a => (r := NONE; Enabled a))
  in
     {get = get, put = put}
  end

fun 'a channel () =
  let
     val getters: ('a, unit) Handlers.t = Handlers.make ()
     val putters: (unit, 'a) Handlers.t = Handlers.make ()
     val add = Handlers.add getters ()
     val get =
        T (fn () =>
           case Handlers.scheduleOne putters of
              NONE => NotEnabled add
            | SOME (h, a) => (h (); Enabled a))
     val add = Handlers.add putters
     fun put a =
        T (fn () =>
           case Handlers.scheduleOne getters of
              NONE => NotEnabled (add a)
            | SOME (h, ()) => (h a; Enabled ()))
  in
     {get = get, put = put}
  end

fun mailbox () =
  let
     val {get, put} = channel ()
  in
     {get = get,
      put = fn a => when (put a, ignore)}
  end

fun 'a multicast () =
  let
     datatype 'a n = T of 'a * 'a n t
     local
        val {get, put} = ivar ()
     in
        val getters: 'a n t ref = ref get
        val putter: ('a n -> unit) ref = ref put
     end
     fun put a =
        let
           val {get = g, put = p} = ivar ()
           val () = !putter (T (a, g))
           val () = getters := g
           val () = putter := p
        in
           ()
        end
     fun get () =
        let
           val getters = !getters
           val {get, put} = channel ()
           val () =
              recur (getters, fn (g, loop) =>
                     when (g, fn T (a, g) =>
                           when (put a, fn () => loop g)))
        in
           get
        end
  in
     {get = get, put = put}
  end

fun choose es =
  T (fn () =>
     recur
     ((es, []), fn ((es, fs), loop) =>
      case es of
         [] =>
            NotEnabled
            (fn h =>
             recur (fs, fn (fs, loop) =>
                    case fs of
                       [] => ()
                     | f :: fs =>
                          (f h;
                           if Handler.hasBeenScheduled h then
                              ()
                           else
                              loop fs)))
       | T p :: es =>
            case p () of
               Enabled a => Enabled a
             | NotEnabled f => loop (es, f :: fs)))

end