[MLton-commit] r5445

Stephen Weeks sweeks at mlton.org
Sun Mar 18 15:25:24 PST 2007


Added Event, Channel, and Mailbox.  The implementation of channels is
new, using a pair of streams and a dedicated helper that looks for
enabled handlers to pair up.  It should easily generalize to n-way
synchronization.  I'm not sure I like it better than the old
implementation, though.

----------------------------------------------------------------------

U   mltonlib/trunk/com/sweeks/async/unstable/async.sig
U   mltonlib/trunk/com/sweeks/async/unstable/async.sml

----------------------------------------------------------------------

Modified: mltonlib/trunk/com/sweeks/async/unstable/async.sig
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sig	2007-03-18 14:55:33 UTC (rev 5444)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sig	2007-03-18 23:25:23 UTC (rev 5445)
@@ -1,17 +1,42 @@
 signature ASYNC = sig
+   exception Closed
+   exception Full
    val runHandlers: Unit.t -> Unit.t
    structure Deferred: sig
       type 'a t
 
       val upon: 'a t * ('a -> Unit.t) -> Unit.t
    end
+   structure Event: sig
+      type 'a t
+
+      val always: 'a -> 'a t
+      val any: 'a t List.t -> 'a t
+      val commit: 'a t -> 'a Deferred.t
+      val never: Unit.t -> 'a t
+   end
+   structure Channel: sig
+      type 'a t
+
+      val give: 'a t * 'a -> Unit.t Event.t
+      val new: Unit.t -> 'a t
+      val take: 'a t -> 'a Event.t
+   end
    structure Ivar: sig
       type 'a t
 
       val fill: 'a t * 'a -> Unit.t
+      (* may raise Full *)
       val new: Unit.t -> 'a t
       val read: 'a t -> 'a Deferred.t
    end
+   structure Mailbox: sig
+      type 'a t
+
+      val new: Unit.t -> 'a t
+      val send: 'a t * 'a -> Unit.t
+      val take: 'a t -> 'a Event.t
+   end
    structure Stream: sig
       type 'a t
 
@@ -27,8 +52,10 @@
       type 'a t
 
       val close: 'a t -> Unit.t
+      (* may raise Closed *)
       val new: Unit.t -> 'a t
       val reader: 'a t -> 'a Stream.t
       val send: 'a t * 'a -> Unit.t
+      (* may raise Closed *)
    end
 end

Modified: mltonlib/trunk/com/sweeks/async/unstable/async.sml
===================================================================
--- mltonlib/trunk/com/sweeks/async/unstable/async.sml	2007-03-18 14:55:33 UTC (rev 5444)
+++ mltonlib/trunk/com/sweeks/async/unstable/async.sml	2007-03-18 23:25:23 UTC (rev 5445)
@@ -1,14 +1,55 @@
 structure Async: ASYNC = struct
+   exception Closed
    exception Full
 
-   val todo = ref []
+   structure Queue:> sig
+      type 'a t
 
-   fun schedule (f, v) = todo := (fn () => f v) :: !todo
+      val deque: 'a t -> 'a Option.t
+      val enque: 'a t -> 'a -> Unit.t
+      val new: Unit.t -> 'a t
+   end = struct
+      structure Node = struct
+         datatype 'a t = T of ('a * 'a t) Option.t Ref.t
+            
+         fun new () = T (ref None)
+      end
+   
+      datatype 'a t = T of {back: 'a Node.t Ref.t,
+                            front: 'a Node.t Ref.t}
 
+      fun new () =
+         let
+            val n = Node.new ()
+         in
+            T {back = ref n, front = ref n}
+         end
+
+      fun enque (T {back, ...}) = fn a =>
+         let
+            val Node.T r = !back
+            val n = Node.new ()
+         in
+            r := Some (a, n);
+            back := n
+         end
+
+      fun deque (T {front, ...}) =
+         let
+            val Node.T r = !front
+         in
+            Option.map (!r, fn (a, n) => (front := n; a))
+         end
+   end
+
+   val todo = Queue.new ()
+
+   fun schedule (f, v) = Queue.enque todo (fn () => f v)
+
    fun runHandlers () =
-      case !todo of
-         [] => ()
-       | t :: ts => (todo := ts; t (); runHandlers ())
+      case Queue.deque todo of
+         None => ()
+       | Some t => (t (); runHandlers ())
 
    structure Deferred = struct
       datatype 'a v = Filled of 'a | Unfilled of ('a -> Unit.t) List.t
@@ -40,7 +81,7 @@
 
       fun new () = T (Ivar.new ())
 
-      fun read (T d) = Ivar.read d
+      fun read (T i) = Ivar.read i
 
       val recur = fn (t, b, done, step) =>
          recur ((t, b), fn ((t, b), loop) =>
@@ -60,7 +101,7 @@
 
       fun fill (T i, v) = Ivar.fill (i, v)
 
-      fun close t = fill (t, None)
+      fun close t = fill (t, None) handle Full => raise Closed
 
       fun extend (t, v) = let
          val t' = new ()
@@ -81,6 +122,8 @@
    structure Tail = struct
       datatype 'a t = T of 'a Stream.t Ref.t
 
+      fun toStream (T r) = !r
+
       fun new () = T (ref (Stream.new ()))
 
       fun extend (t as T r, v) = r := Stream.extend (!r, v)
@@ -95,4 +138,113 @@
 
       val send = extend
    end
+
+   structure Handler: sig
+      type 'a t
+
+      val ignore: Unit.t -> 'a t
+      val isScheduled: 'a t -> Bool.t
+      val new: ('a -> Unit.t) -> 'a t
+      val maybeSchedule: 'a t * 'a -> Unit.t
+      val precompose: 'a t * ('b -> 'a) -> 'b t
+      (* It is an error to call Handler.schedule h if Handler.isScheduled h.
+       *)
+      val schedule: 'a t * 'a -> Unit.t
+   end = struct
+      datatype 'a t = T of {handler: 'a -> Unit.t,
+                            isScheduled: Bool.t Ref.t}
+
+      fun new f =
+         T {handler = f,
+            isScheduled = ref false}
+
+      val ignore = fn () => new ignore
+
+      fun isScheduled (T {isScheduled = h, ...}) = !h
+
+      val schedule = fn (T {handler, isScheduled}: 'a t, a: 'a) =>
+         if !isScheduled then
+            die "Handler.schedule of handler that isScheduled"
+         else
+            (isScheduled := true; schedule (handler, a))
+
+      fun maybeSchedule (h, a) = if isScheduled h then () else schedule (h, a)
+
+      fun precompose (T {handler, isScheduled}, f) =
+         T {handler = handler o f,
+            isScheduled = isScheduled}
+   end
+
+   structure Event = struct
+      datatype 'a t = T of 'a Handler.t -> Unit.t
+      (* Invariant: we never pass a Handler that isScheduled *)
+
+      fun send (T f, h) = f h
+
+      fun commit t = let
+         val i = Ivar.new ()
+         val () = send (t, Handler.new (fn v => Ivar.fill (i, v)))
+      in
+         Ivar.read i
+      end
+
+      fun any ts =
+         T (fn h =>
+            List.recur (ts, (), ignore, fn (t, (), k) =>
+                        if Handler.isScheduled h then
+                           ()
+                        else
+                           (send (t, h); k ())))
+
+      fun always a = T (fn h => Handler.maybeSchedule (h, a))
+         
+      fun never () = T ignore
+
+      fun map (t, f) = T (fn h => send (t, Handler.precompose (h, f)))
+   end
+
+   structure Channel = struct
+      datatype 'a t = T of {givers: ('a * Unit.t Handler.t) Tail.t,
+                            takers: 'a Handler.t Tail.t}
+
+      fun 'a new () = let
+         val givers: ('a * Unit.t Handler.t) Tail.t = Tail.new ()
+         val takers = Tail.new ()
+         fun loop (gs, ts) =
+            upon (Stream.read gs, fn opt =>
+                  Option.for (opt, fn (g, gs) => loopG (g, gs, ts)))
+         and loopG (g, gs, ts) =
+            upon (Stream.read ts, fn opt =>
+                  Option.for (opt, fn (t, ts) => loopGT (g, gs, t, ts)))
+         and loopT (gs, t, ts) =
+            upon (Stream.read gs, fn opt =>
+                  Option.for (opt, fn (g, gs) => loopGT (g, gs, t, ts)))
+         and loopGT (g as (a, gh), gs, t, ts) =
+            case (Handler.isScheduled gh, Handler.isScheduled t) of
+               (false, false) =>
+                  (Handler.schedule (gh, ());
+                   Handler.schedule (t, a);
+                   loop (gs, ts))
+             | (false, true) => loopG (g, gs, ts)
+             | (true, false) => loopT (gs, t, ts)
+             | (true, true) => loop (gs, ts)
+         val () = loop (Tail.toStream givers, Tail.toStream takers)
+      in
+         T {givers = givers,
+            takers = takers}
+      end
+
+      fun give (T {givers, ...}, a) =
+         Event.T (fn h => Tail.extend (givers, (a, h)))
+
+      fun take (T {takers, ...}) =
+         Event.T (fn h => Tail.extend (takers, h))
+   end
+
+   structure Mailbox = struct
+      open Channel
+
+      fun send (T {givers, ...}, a) =
+         Tail.extend (givers, (a, Handler.ignore ()))
+   end
 end




More information about the MLton-commit mailing list