[MLton] cvs commit: MAIL: Initial port of CML from SML/NJ to MLton.

Matthew Fluet fluet@mlton.org
Sat, 1 May 2004 13:16:52 -0700


fluet       04/05/01 13:16:51

  Added:       lib/cml  README TODO cml.cm
               lib/cml/cml-lib cml-lib.cm multicast.sig multicast.sml
                        result.sig result.sml simple-rpc.sig simple-rpc.sml
                        trace-cml.cm trace-cml.sig trace-cml.sml
               lib/cml/core-cml channel.sig channel.sml cml.sig cml.sml
                        core-cml.cm cvar.sig cvar.sml event.sig event.sml
                        mailbox.sig mailbox.sml rep-types.sml run-cml.sig
                        run-cml.sml running.sml scheduler-hooks.sig
                        scheduler-hooks.sml scheduler.sig scheduler.sml
                        sync-var.sig sync-var.sml thread-id.sig
                        thread-id.sml thread.sig thread.sml timeout.sig
                        timeout.sml trans-id.sig trans-id.sml version.sig
                        version.sml
               lib/cml/tests exit.cm exit.sml primes-multicast.cm
                        primes-multicast.sml primes.cm primes.sml
                        run-main.sml timeout.cm timeout.sml
               lib/cml/util assert.sig assert.sml critical.sig critical.sml
                        debug.sig debug.sml fun-priority-queue.fun
                        fun-priority-queue.sig fun-queue.sig fun-queue.sml
                        imp-queue.sig imp-queue.sml local-assert.fun
                        local-debug.fun timeit.sig timeit.sml util.cm
  Log:
  MAIL: Initial port of CML from SML/NJ to MLton.
  
  See cml/README for more details. (Which ought to be the first file in
  this commit message.)

Revision  Changes    Path
1.1                  mlton/lib/cml/README

Index: README
===================================================================

This is an initial port of CML from SML/NJ to MLton.

The implementation of CML in SML/NJ is built upon the first-class
continuations of the SMLofNJ.Cont module:
  type 'a cont
  val callcc : ('a cont -> 'a) -> 'a
  val throw : 'a cont -> 'a -> 'b
  val isolate : ('a -> unit) -> 'a cont

The implementation of CML in MLton is built upon the first-class
threads of the MLton.Thread module:
  type 'a t
  val new : ('a -> unit) -> 'a t
  val switch : ('a t -> 'b t * 'b) -> 'a

The port is relatively straightforward, because CML throws to a
continuation at most once.  Hence, an "abstract" implementation of
CML could be built upon first-class one-shot continuations, which map
equally well to SML/NJ's continuations and MLton's threads.

The "essence" of the port is to transform:
  ... callcc (fn k => ... throw k' v') ...
to
  ... switch (fn t => ... (t', v')) ...
which suffices for the vast majority of the CML implementation.

There was only one complicated transformation: blocking multiple base
events.  In SML/NJ CML, the representation of base events is given by:
    datatype 'a event_status
      = ENABLED of {prio : int, doFn : unit -> 'a}
      | BLOCKED of {
	    transId : trans_id ref, 
            cleanUp : unit -> unit, 
            next : unit -> unit
	  } -> 'a
    type 'a base_evt = unit -> 'a event_status
When synchronizing on a set of base events, which are all blocked, we
must invoke each BLOCKED function with the same transId and cleanUp
(the transId is (checked and) set to CANCEL by the cleanUp function,
which is invoked by the first enabled event; this "fizzles" every
other event in the synchronization group that later becomes enabled).
However, each BLOCKED function is implemented by a callcc, so that
when the event is enabled, it throws back to the point of
synchronization.  Hence, the next function (which doesn't return) is
invoked by the BLOCKED function to escape the callcc and continue in
the thread performing the synchronization.  In SML/NJ this is
implemented as follows:
	  fun ext ([], blockFns) = callcc (fn k => let
		val throw = throw k
		val (transId, setFlg) = mkFlg()
		fun log [] = S.atomicDispatch ()
		  | log (blockFn :: r) =
		      throw (blockFn {
			  transId = transId,
			  cleanUp = setFlg,
			  next = fn () => log r
			})
		in
		  log blockFns; error "[log]"
		end)
(Note that S.atomicDispatch invokes the continuation of the next
continuation on the ready queue.)  This doesn't map well to the MLton
thread model.  Although it follows the 
  ... callcc (fn k => ... throw k v) ...
model, the fact that blockFn will also attempt to do
  ... callcc (fn k' => ... next ()) ...
the naive transformation will result in nested switch-es.

We need to think a little more about what this code is trying to do.
Essentially, each blockFn wants to capture this continuation, hold on
to it until the event is enabled, and continue with next; when the
event is enabled, before invoking the continuation and returning to
the synchronization point, the cleanUp and other event specific
operations are performed.

To accomplish the same effect in the MLton thread implemenation, we
have the following:
      datatype 'a status =
	 ENABLED of {prio : int, doitFn : unit -> 'a}
       | BLOCKED of {transId : trans_id,
		     cleanUp : unit -> unit,
		     next : unit -> rdy_thread} -> 'a
      type 'a base = unit -> 'a status 
and
	    fun ext ([], blockFns) : 'a =
	       S.atomicSwitch
               (fn (t : 'a S.thread) =>
		let
		   val (transId, cleanUp) = TransID.mkFlg ()
		   fun log blockFns : S.rdy_thread =
		      case blockFns of
			 [] => S.next ()
		       | blockFn::blockFns =>
			    S.new
			    (fn _ => fn () =>
			     let 
				val () = S.atomicBegin ()
				val x = blockFn {transId = transId,
						 cleanUp = cleanUp,
						 next = fn () => log blockFns}
			     in S.switch(fn _ => (t, x))
			     end)
		in
		   (log blockFns, ())
		end)
To avoid the nested switch-es, I run the blockFn in it's own thread,
whose only purpose is to return to the synchronization point.  This
corresponds to the  throw (blockFn {...})  in the SML/NJ
implementation.  I'm worried that this implementation might be a
little expensive, starting a new thread for each blocked event (when
there are only multiple blocked events in a synchronization group).
But, I don't see another way of implementing this behavior in the
MLton thread model.

Note that another way of thinking about what is going on is to
consider each blockFn as prepending a different set of actions to the
thread t.  It might be possible to give a MLton.Thread.unsafePrepend:
fun unsafePrepend (T r: 'a t, f: 'b -> 'a): 'b t =
   let
      val t =
	 case !r of
	    Dead => raise Fail "prepend to a Dead thread"
	  | New g => New (g o f)
	  | Paused (g, t) => Paused (fn h => g (f o h), t)
   in (* r := Dead; *)
      T (ref t)
   end
I have commented out the r := Dead, which would allow multiple
prepends to the same thread (i.e., not destroying the original thread
in the process).  Of course, only one of the threads could be run: if
the original thread were in the Paused state, then multiple threads
would share the underlying runtime/primitive thread.  Now, this
matches the "one-shot" nature of CML continuations/threads, but I'm
not comfortable with extending the MLton.Thread module with such an
unsafe operation.


Other than this complication with blocking multiple base events, the
port was quite routine.  (As a very pleasant surprise, the CML
implementation in SML/NJ doesn't use any SML/NJ-isms.)  There is a
slight difference in the way in which critical sections are handled in
SML/NJ and MLton; since MLton.Thread.switch _always_ leaves a critical
section, it is sometimes necessary to add additional atomicBegin/End-s
to ensure that we remain in a critical section after a thread switch.

While looking at virtually every file in the core-CML implementation,
I took the liberty of simplifying things where it seemed possible; in
terms of style, the implementation is about half-way between Reppy's
original and MLton's.  

Some changes of note:
 * util/ contains all pertinant data-structures: (functional and
     imperative) queues, (functional) priority queues.  Hence, it
     should be easier to switch in more efficient or real-time
     implementations.
 * core-cml/scheduler.sml: in both implementations, this is where most
     of the interesting action takes place.  I've made the connection
     between MLton.Thread.t-s and ThreadId.thread_id-s more abstract
     than it is in the SML/NJ implemenation, and encapsulated all of
     the MLton.Thread operations in this module.
 * eliminated all of the "by hand" inlining


All of the core CML functionality is present:
  signature CML; structure CML : CML
  signature CVAR; structure CVar : CVAR
  signature MAILBOX; structure Mailbox : MAILBOX
  signature MULTICAST; structure Multicast : MULTICAST
  signature SIMPLE_RPC; structure SimpleRPC : SIMPLE_RPC
A minimal RunCML : RUN_CML structure is present:
  signature RUN_CML =
     sig
        val isRunning : unit -> bool
        val doit : (unit -> unit) * Time.time option -> OS.Process.status
        val shutdown : OS.Process.status -> 'a
     end
This does not include all of the cleanup and logging operations of the
SML/NJ RunCML structure.  However, it does include the CML.timeOutEvt
and CML.atTimeEvt functions, and a preemptive scheduler that knows to
sleep when there are no ready threads and some threads blocked on time
events.

None of the Standard ML Basis Library has been made either
MLton.Thread or CML safe.  Much of the IO and OS structures have event
based equivalents, which should be implemented.

For now, I've extended the CML signature with 
  val print : string -> unit
which executes in a critical region.  For now, this is the "right
thing" for interfacing with the Basis Library and the underlying
system calls.  (Using Posix.Error.SysCall, it is sufficient to execute
Basis Library code in a critical section; things that get interrupted
will automatically restart with signals blocked.)  It also has the
nice property that one can write a program with open CML whose only IO
(while executing RunCML.doit) is print, and it will compile under both
MLton and SML/NJ.


Some thoughts on future extensions.  The CML documentation says the
following:
      CML.joinEvt: thread_id -> unit event
          joinEvt tid
          creates an event value for synchronizing on the termination of
	  the thread with the ID tid.
	  There are three ways that a thread may terminate: the
          function that was passed to spawn (or spawnc) may return; it
	  may call the exit function, or it may have an uncaught
          exception.
	  Note that joinEvt does not distinguish between these cases;
          it also does not become enabled if the named thread
          deadlocks (even if it is garbage collected).
I believe that the MLton.Finalizable might be able to relax that last
restriction.  Upon the creation of a 'a Scheduler.thread, we could
attatch a finalizer to the underlying 'a MLton.Thread.t that enables
the joinEvt (in the associated ThreadID.thread_id) when the 'a
MLton.Thread.t becomes unreachable.


I don't know why CML doesn't have
      CML.kill: thread_id -> unit
which has a fairly simple implementation -- setting a kill flag in the
thread_id and adjusting the scheduler to discard any killed threads
that it takes off the ready queue.  The fairness of the scheduler
ensures that a killed thread will eventually be discarded.  The
semantics are little murky for blocked threads that are killed,
though.  For example, consider a thread blocked on SyncVar.mTake mv
and a thread blocked on SyncVar.mGet mv.  If the first thread is
killed while blocked, and a third thread does SyncVar.mPut (mv, x),
then we might expect that we'll enable the second thread, and never
the first.  But, when only the ready queue is able to discard killed
threads, then the SyncVar.mPut could enable the first thread (putting
it on the ready queue, from which it will be discarded) and leave the
second thread blocked.  We could solve this by adjusting the
TransID.trans_id types and the "cleaner" functions to look for both
cancelled transactions and transactions on killed threads.

Between CML.timeOutEvt and CML.kill, one could give an efficient
solution to the recent comp.lang.ml post about terminating a function
that doesn't complete in a given time:

  fun timeOut (f : unit -> 'a, t : Time.time) : 'a option =
    let
       val iv = SyncVar.iVar ()
       val tid = CML.spawn (fn () => SyncVar.iPut (iv, f ()))
    in
       CML.select 
       [CML.wrap (CML.timeOutEvt t, fn () => (CML.kill tid; NONE)),
        CML.wrap (SyncVar.iGetEvt iv, fn x => SOME x)]
    end




1.1                  mlton/lib/cml/TODO

Index: TODO
===================================================================

* Implement Okasaki real-time data-structures.
  + mailbox.sml -- needs functional queue
  + scheduler.sml -- needs imperative queue
  + timeout.sml -- needs functional priority queue



1.1                  mlton/lib/cml/cml.cm

Index: cml.cm
===================================================================
Group is
  core-cml/core-cml.cm
  cml-lib/cml-lib.cm



1.1                  mlton/lib/cml/cml-lib/cml-lib.cm

Index: cml-lib.cm
===================================================================
Group is
  multicast.sig
  multicast.sml
  simple-rpc.sig
  simple-rpc.sml




1.1                  mlton/lib/cml/cml-lib/multicast.sig

Index: multicast.sig
===================================================================
(* multicast.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* multicast-sig.sml
 *
 * COPYRIGHT (c) 1990 by John H. Reppy.  See COPYRIGHT file for details.
 *
 * Asynchronous multicast (one-to-many) channels.
 *)

signature MULTICAST =
   sig
      type 'a mchan
      type 'a port
      type 'a event = 'a CML.event
	  
      (* create a new multicast channel *)
      val mChannel : unit -> 'a mchan
      (* create a new output port on a channel *)
      val port : 'a mchan -> 'a port
      (* create a new output port on a channel that has the same state as the
       * given port.  I.e., the stream of messages seen on the two ports will
       * be the same.
       * NOTE: if two (or more) independent threads are reading from the
       * same port, then the copy operation may not be accurate.
       *)
      val copy : 'a port -> 'a port
      (* receive a message from a port *)
      val recv : 'a port -> 'a
      val recvEvt : 'a port -> 'a event
      (* send a message to all of the ports of a channel *)
      val multicast : ('a mchan * 'a) -> unit
   end



1.1                  mlton/lib/cml/cml-lib/multicast.sml

Index: multicast.sml
===================================================================
(* multicast.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* multicast.sml
 *
 * COPYRIGHT (c) 1994 AT&T Bell Laboratories.
 *
 * Asynchronous multicast (one-to-many) channels.  This implementation
 * is based on a condition variable implementation of multicast channels.
 * See Chapter 5 of "Concurrent Programming in ML" for details.
 *)

structure Multicast : MULTICAST =
   struct

      structure SV = SyncVar

      type 'a event = 'a CML.event

      datatype 'a request = 
	 Message of 'a 
       | NewPort 
      datatype 'a mc_state = MCState of ('a * 'a mc_state SV.ivar)
      datatype 'a port =
	 Port of (('a * 'a mc_state SV.ivar) CML.chan * 'a mc_state SV.ivar SV.mvar)
      datatype 'a mchan = 
	 MChan of ('a request CML.chan * 'a port CML.chan)

    fun mkPort cv = 
       let
	  val outCh = CML.channel()
	  val stateVar = SV.mVarInit cv
	  fun tee cv = 
	     let
		val (MCState(v, nextCV)) = SV.iGet cv
	     in
		CML.send (outCh, (v, nextCV))
		; tee nextCV
	     end
	  val _ = CML.spawn (fn () => tee cv)
       in
	  Port(outCh, stateVar)
       end
    
    fun mChannel () = 
       let
          val reqCh = CML.channel() 
	  and replyCh = CML.channel()
          fun server cv = 
	     case (CML.recv reqCh) of 
		NewPort => 
		   (CML.send (replyCh, mkPort cv)
		    ; server cv)
	      | (Message m) => 
		   let
		      val nextCV = SV.iVar()
		   in
		      SV.iPut (cv, MCState(m, nextCV))
		      ; server nextCV
		   end
	  val _ = CML.spawn (fn () => server (SV.iVar()))
       in
	  MChan(reqCh, replyCh)
       end

    fun multicast (MChan(ch, _), m) = CML.send (ch, Message m)

    fun port (MChan(reqCh, replyCh)) = 
       (CML.send (reqCh, NewPort)
	; CML.recv replyCh)
       
    fun copy (Port(_, stateV)) = mkPort(SV.mGet stateV)
       
    fun recvMsg stateV (v, nextCV) = 
       let val _ = SV.mSwap (stateV, nextCV)
       in v
       end
       
    fun recv (Port(ch, stateV)) = recvMsg stateV (CML.recv ch)
    fun recvEvt (Port(ch, stateV)) = CML.wrap(CML.recvEvt ch, recvMsg stateV)
   end




1.1                  mlton/lib/cml/cml-lib/result.sig

Index: result.sig
===================================================================
(* result.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* result.sml
 *
 * COPYRIGHT (c) 1996 AT&T Research.
 *
 *)

signature RESULT = 
   sig
      type 'a result
	 
      val result : unit -> 'a result
      val put    : ('a result * 'a) -> unit
      val putExn : ('a result * exn) -> unit
      val get    : 'a result -> 'a
      val getEvt : 'a result -> 'a CML.event
   end



1.1                  mlton/lib/cml/cml-lib/result.sml

Index: result.sml
===================================================================
(* result.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* result.sml
 *
 * COPYRIGHT (c) 1996 AT&T Research.
 *
 *)

structure Result :> RESULT =
   struct

      structure SV = SyncVar

      datatype 'a result_val = EXN of exn | RES of 'a

      type 'a result = 'a result_val SV.ivar

      fun result () = SV.iVar()
      fun put (iv, v) = SV.iPut(iv, RES v)
      fun putExn (iv, ex) = SV.iPut(iv, EXN ex)
      fun wrap (RES v) = v
	| wrap (EXN ex) = raise ex
      fun get iv = wrap(SV.iGet iv)
      fun getEvt iv = CML.wrap(SV.iGetEvt iv, wrap)
  end



1.1                  mlton/lib/cml/cml-lib/simple-rpc.sig

Index: simple-rpc.sig
===================================================================
(* simple-rpc.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* simple-rpc-sig.sml
 *
 * COPYRIGHT (c) 1997 AT&T Labs Research.
 *
 * Generators for simple RPC protocols.
 *)

signature SIMPLE_RPC = 
   sig
      type 'a event = 'a CML.event

      val mkRPC : ('a -> 'b) -> 
	 {call     : 'a -> 'b,
	  entryEvt : unit event}
	 
      val mkRPC_In : (('a * 'c) -> 'b) -> 
	 {call     : 'a -> 'b,
	  entryEvt : 'c -> unit event}

      val mkRPC_Out : ('a -> ('b * 'c)) -> 
	 {call     : 'a -> 'b,
	  entryEvt : 'c event}

      val mkRPC_InOut : (('a * 'c) -> ('b * 'd)) -> 
	 {call     : 'a -> 'b,
	  entryEvt : 'c -> 'd event}
  end



1.1                  mlton/lib/cml/cml-lib/simple-rpc.sml

Index: simple-rpc.sml
===================================================================
(* simple-rpc.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* simple-rpc.sml
 *
 * COPYRIGHT (c) 1997 AT&T Labs Research.
 *
 * Generators for simple RPC protocols.
 *)

structure SimpleRPC : SIMPLE_RPC =
   struct

      type 'a event = 'a CML.event

      fun call reqMB arg = 
	 let val replV = SyncVar.iVar()
	 in
	    Mailbox.send(reqMB, (arg, replV))
	    ; SyncVar.iGet replV
	 end

      fun mkRPC f = 
	 let
	    val reqMB = Mailbox.mailbox()
	    val entryEvt = 
	       CML.wrap 
	       (Mailbox.recvEvt reqMB, fn (arg, replV) => 
		SyncVar.iPut(replV, f arg))
	 in
	    {call = call reqMB, entryEvt = entryEvt}
	 end

      fun mkRPC_In f = 
	 let
	    val reqMB = Mailbox.mailbox()
	    val reqEvt = Mailbox.recvEvt reqMB
	    fun entryEvt state = 
	       CML.wrap 
	       (reqEvt, fn (arg, replV) => 
		SyncVar.iPut(replV, f(arg, state)))
	 in
	    {call = call reqMB, entryEvt = entryEvt}
	 end

      fun mkRPC_Out f = 
	 let
	    val reqMB = Mailbox.mailbox()
	    val reqEvt = Mailbox.recvEvt reqMB
	    val entryEvt = 
	       CML.wrap 
	       (reqEvt, fn (arg, replV) => 
		let val (res, state') = f arg
		in SyncVar.iPut(replV, res); state'
		end)
	 in
	    {call = call reqMB, entryEvt = entryEvt}
	 end

      fun mkRPC_InOut f = 
	 let
	    val reqMB = Mailbox.mailbox()
	    val reqEvt = Mailbox.recvEvt reqMB
	    fun entryEvt state = 
	       CML.wrap 
	       (reqEvt, fn (arg, replV) => 
		let val (res, state') = f(arg, state)
		in SyncVar.iPut(replV, res); state'
		end)
	 in
	    {call = call reqMB, entryEvt = entryEvt}
	 end
   end



1.1                  mlton/lib/cml/cml-lib/trace-cml.cm

Index: trace-cml.cm
===================================================================
(* trace-cml.cm
 *
 * COPYRIGHT (c) 1996 AT&T Research.
 *
 * The TraceCML library module needs access to CML internals, so we package
 * it up into a sub-group.
 *)
Group (cm-descr/trace-cml.cm)
  (* Notice that the "owner" specification above gets ignored by the old
   * CM.  Under the old CM clients use _this_ file to refer to the
   * trace library, under the new CM clients use the description file
   * in cm-descr/trace-cml.cm.
   * This is done to avoid a file-naming conflict for smlnj-lib.cm.
   * The conflict is caused by the old CM's path-search mechanism and
   * does not occur under the new CM. *)
  signature TRACE_CML
  structure TraceCML
is
#if defined (NEW_CM)
  $/smlnj-lib.cm
  $cml/basis.cm
  $cml/core-cml.cm
  $cml/cml.cm
#else
  smlnj-lib.cm
  ../src/basis.cm
  ../src/core-cml.cm
  ../src/cml.cm
#endif

  trace-cml-sig.sml
  trace-cml.sml



1.1                  mlton/lib/cml/cml-lib/trace-cml.sig

Index: trace-cml.sig
===================================================================
(* trace-cml-sig.sml
 *
 * COPYRIGHT (c) 1992 AT&T Bell Laboratories
 *
 * This module provides rudimentary debugging support in the form of mechanisms
 * to control debugging output, and to monitor thread termination.  This
 * version of this module is adapted from Cliff Krumvieda's utility for tracing
 * CML programs.  It provides three facilities: trace modules, for controlling
 * debugging output; thread watching, for detecting thread termination; and
 * a mechanism for reporting uncaught exceptions on a per thread basis.
 *)

signature TRACE_CML =
  sig

  (** Trace modules **
   *
   * The basic idea is that one defines a heirarchy of ``trace
   * modules,'' which provide valves for debugging output.
   *)

    type trace_module

  (* where to direct trace output to *)
    datatype trace_to
      = TraceToOut
      | TraceToErr
      | TraceToNull
      | TraceToFile of string
      | TraceToStream of TextIO.outstream

    val setTraceFile : trace_to -> unit
	(* Direct the destination of trace output.  Note: TraceToStream
	 * can only be specified as a destination if CML is running.
	 *)

    val traceRoot : trace_module
	(* the root module of the trace hierarchy *)

    exception NoSuchModule

    val traceModule : (trace_module * string) -> trace_module
    val nameOf : trace_module -> string
	(* return the name of the module *)
    val moduleOf : string -> trace_module
	(* return the module specified by the given string, or raise
	 * NoSuchModule if none exists.
	 *)

    val traceOn : trace_module -> unit
	(* turn tracing on for a module and its descendents *)
    val traceOff : trace_module -> unit
	(* turn tracing off for a module and its descendents *)
    val traceOnly : trace_module -> unit
	(* turn tracing on for a module (but not for its descendents) *)
    val amTracing : trace_module -> bool
	(* return true if this module is being traced *)

    val status : trace_module -> (trace_module * bool) list
	(* return a list of the registered modules dominated by the given
	 * module, and their status.
	 *)

    val trace : (trace_module * (unit -> string list)) -> unit
	(* conditionally generate tracing output *)


  (** Thread watching **)
  
    val watcher : trace_module
	(* controls printing of thread watching messages; the module's name
	 * is "/ThreadWatcher/"
	 *)
    val watch : (string * CML.thread_id) -> unit
	(* watch the given thread for unexpected termination *)
    val unwatch : CML.thread_id -> unit
	(* stop watching the named thread *)

  (** Uncaught exception handling **)

    val setUncaughtFn : ((CML.thread_id * exn) -> unit) -> unit
	(* this sets the default uncaught exception action. *)
    val setHandleFn : ((CML.thread_id * exn) -> bool) -> unit
	(* add an additional uncaught exception action.  If the action returns
	 * true, then no further action is taken.  This can be used to handle
	 * application specific exceptions.
	 *)
    val resetUncaughtFn : unit -> unit
	(* this resets the default uncaught exception action to the system default,
	 * and removes any layered actions.
	 *)

  end; (* TRACE_CML *)




1.1                  mlton/lib/cml/cml-lib/trace-cml.sml

Index: trace-cml.sml
===================================================================
(* trace-cml.sml
 *
 * COPYRIGHT (c) 1992 AT&T Bell Laboratories
 *
 * This module provides rudimentary debugging support in the form of mechanisms
 * to control debugging output, and to monitor thread termination.  This
 * version of this module is adapted from Cliff Krumvieda's utility for tracing
 * CML programs.  It provides three facilities: trace modules, for controlling
 * debugging output; thread watching, for detecting thread termination; and
 * a mechanism for reporting uncaught exceptions on a per thread basis.
 *)

structure TraceCML : TRACE_CML =
  struct

    structure SV = SyncVar

  (* where to direct trace output to *)
    datatype trace_to
      = TraceToOut
      | TraceToErr
      | TraceToNull
      | TraceToFile of string
      | TraceToStream of TextIO.outstream

    exception NoSuchModule

  (** Trace Modules **)
    datatype trace_module = TM of {
	full_name : string,
	label : string,
	tracing : bool ref,
	children : trace_module list ref
      }

    val traceRoot = TM{
	    full_name = "/",
	    label = "",
	    tracing = ref false,
	    children = ref []
	  }

    fun forAll f = let
	  fun for (tm as TM{children, ...}) = (f tm; forChildren(!children))
	  and forChildren [] = ()
	    | forChildren (tm::r) = (for tm; forChildren r)
	  in
	    for
	  end

    structure SS = Substring

    fun findTraceModule name = let
	  fun eq ss (TM{label, ...}) = (SS.compare(SS.all label, ss) = EQUAL)
	  fun find ([], tm) = SOME tm
	    | find (arc::rest, tm as TM{label, children, ...}) = let
		val eqArc = eq arc
		fun findChild [] = NONE
		  | findChild (c::r) =
		      if (eqArc c) then find(rest, c) else findChild r
		in
		  findChild (!children)
		end
	  in
	    find (
	      SS.tokens (fn #"/" => true | _ => false) (SS.all name),
	      traceRoot)
	  end

    fun traceModule' (TM parent, name) = let
	  fun checkChildren [] = let
		val tm = TM{
		        full_name = (#full_name parent ^ name),
		        label = name,
			tracing = ref(!(#tracing parent)),
		        children = ref []
		      }
		in
		  (#children parent) := tm :: !(#children parent);
		  tm
		end
	    | checkChildren((tm as TM{label, ...})::r) =
		if (label = name) then tm else checkChildren r
	  in
	    checkChildren (! (#children parent))
	  end

  (* return the name of the module *)
    fun nameOf (TM{full_name, ...}) = full_name

  (* return the module specified by the given string *)
    fun moduleOf' name = (case findTraceModule name
           of NONE => raise NoSuchModule
            | (SOME tm) => tm
          (* end case *))

  (* turn tracing on for a module and its descendents *)
    val traceOn' = forAll (fn (TM{tracing, ...}) => tracing := true)

  (* turn tracing off for a module and its descendents *)
    val traceOff' = forAll (fn (TM{tracing, ...}) => tracing := false)

  (* turn tracing on for a module (but not for its descendents) *)
    fun traceOnly' (TM{tracing, ...}) = tracing := true

  (* return true if this module is being traced *)
    fun amTracing (TM{tracing, ...}) = !tracing

  (* return a list of the registered modules dominated by the given
   * module, and their status.
   *)
    fun status' root = let
	  fun list (tm as TM{tracing, children, ...}, l) =
		listChildren (!children, (tm, !tracing)::l)
	  and listChildren ([], l) = l
	    | listChildren (c::r, l) = listChildren(r, list(c, l))
	  in
	    rev (list (root, []))
	  end

  (** Trace printing **)
    val traceDst = ref TraceToOut
    val traceCleanup = ref (fn () => ())

    fun setTraceFile'  t = traceDst := t

(** NOTE: there are bookkeeping bugs, when changing the trace destination
 ** from TraceToStream to something else (where the original destination 
 ** was TraceToFile).
 **)
    fun tracePrint s = let
	  fun output strm = (TextIO.output(strm, s); TextIO.flushOut strm)
	  in
	    case !traceDst
	     of TraceToOut => output TextIO.stdOut
	      | TraceToErr => output TextIO.stdErr
	      | TraceToNull => ()
	      | (TraceToFile fname) => let
		  val dst = let
			val strm = TextIO.openOut fname
			in
			  traceCleanup := (fn () => TextIO.closeOut strm);
			  TraceToStream strm
			end handle _ => (
			  Debug.sayDebug(concat[
			      "TraceCML: unable to open \"", fname,
			      "\", redirecting to stdout"
			    ]);
			  TraceToOut)
		  in
		    setTraceFile' dst;
		    tracePrint s
		  end
	        | (TraceToStream strm) => output strm
	    (* end case *)
	  end

  (** Trace server **)
    val traceCh : (unit -> string list) CML.chan = CML.channel()
    val traceUpdateCh : (unit -> unit) CML.chan = CML.channel()

    fun traceServer () = let
	  val evt = [
		  CML.wrap(CML.recvEvt traceCh, fn f => tracePrint(concat(f()))),
		  CML.wrap(CML.recvEvt traceUpdateCh, fn f => f())
		]
	  fun loop () = (CML.select evt; loop())
	  in
	    loop()
	  end (* traceServer *)

    fun tracerStart () = (CML.spawn traceServer; ())
    fun tracerStop () = ((!traceCleanup)(); traceCleanup := (fn () => ()))

    val _ = (
	  RunCML.logChannel ("TraceCML:trace", traceCh);
	  RunCML.logChannel ("TraceCML:trace-update", traceUpdateCh);
	  RunCML.logServer ("TraceCML:trace-server", tracerStart, tracerStop))

    local
      fun carefully f = if RunCML.isRunning()
	    then CML.send(traceUpdateCh, f)
	    else f()
      fun carefully' f = if RunCML.isRunning()
	      then let
	        val reply = SV.iVar()
	        in
	          CML.send (traceUpdateCh, fn () => (SV.iPut(reply, f())));
		  SV.iGet reply
	        end
	      else f()
    in
    fun traceModule arg = carefully' (fn () => traceModule' arg)
    fun moduleOf name = carefully' (fn () => moduleOf' name)
    fun traceOn tm = carefully (fn () => traceOn' tm)
    fun traceOff tm = carefully (fn () => traceOff' tm)
    fun traceOnly tm = carefully (fn () => traceOnly' tm)
    fun setTraceFile f = carefully (fn () => setTraceFile' f)
    fun status root = carefully' (fn () => status' root)
    end (* local *)

    fun trace (TM{tracing, ...}, prFn) =
	  if (RunCML.isRunning() andalso (!tracing))
	    then CML.send(traceCh, prFn)
	    else ()


  (** Thread watching **)

  (* controls printing of thread watching messages *)
    val watcher = traceModule (traceRoot, "ThreadWatcher")
    val _ = traceOn watcher

    datatype watcher_msg
      = WATCH of (CML.thread_id * unit CML.chan)
      | UNWATCH of (CML.thread_id * unit SV.ivar)

    val watcherMb : watcher_msg Mailbox.mbox = Mailbox.mailbox ()

  (* stop watching the named thread *)
    fun unwatch tid = let
	  val ackV = SV.iVar()
	  in
	    Mailbox.send(watcherMb, UNWATCH(tid, ackV));
	    SV.iGet ackV
	  end

  (* watch the given thread for unexpected termination *)
    fun watch (name, tid) = let
	  val unwatchCh = CML.channel()
	  fun handleTermination () = (
		trace (watcher, fn () => [
		    "WARNING!  Watched thread ", name, CML.tidToString tid,
		    " has died.\n"
		  ]);
		unwatch tid)
	  fun watcherThread () = (
		Mailbox.send (watcherMb, WATCH(tid, unwatchCh));
		CML.select [
		    CML.recvEvt unwatchCh,
		    CML.wrap (CML.joinEvt tid, handleTermination)
		  ])
	  in
	    CML.spawn (watcherThread); ()
	  end

    structure TidTbl = HashTableFn (
      struct
	type hash_key = CML.thread_id
	val hashVal = CML.hashTid
	val sameKey = CML.sameTid
      end)

  (* the watcher server *)
    fun startWatcher () = let
	  val tbl = TidTbl.mkTable (32, Fail "startWatcher")
	  fun loop () = (case (Mailbox.recv watcherMb)
		 of (WATCH arg) => TidTbl.insert tbl arg
		  | (UNWATCH(tid, ack)) => (
		    (* notify the watcher that the thread is no longer being
		     * watched, and then acknowledge the unwatch command.
		     *)
		      CML.send(TidTbl.remove tbl tid, ())
			handle _ => ();
		    (* acknowledge that the thread has been removed *)
		      SV.iPut(ack, ()))
		(* end case *);
		loop ())
	  in
	    CML.spawn loop; ()
	  end

    val _ = (
	  RunCML.logMailbox ("TraceCML:watcherMb", watcherMb);
	  RunCML.logServer ("TraceCML:watcher-server", startWatcher, fn () => ()))


  (** Uncaught exception handling **)

    fun defaultHandlerFn (tid, ex) = let
	  val raisedAt = (case (SMLofNJ.exnHistory ex)
		 of [] => ["\n"]
		  | l => [" raised at ", List.last l, "\n"]
		(* end case *))
	  in
	    Debug.sayDebug (concat ([
	        CML.tidToString tid, " uncaught exception ",
	        exnName ex, " [", exnMessage ex, "]"
	      ] @ raisedAt))
	  end

    val defaultHandler = ref defaultHandlerFn
    val handlers = ref ([] : ((CML.thread_id * exn) -> bool) list)

  (* this sets the default uncaught exception action. *)
    fun setUncaughtFn' action = defaultHandler := action

  (* add an additional uncaught exception action.  If the action returns
   * true, then no further action is taken.  This can be used to handle
   * handle application specific exceptions.
   *)
    fun setHandleFn' action = handlers := action :: !handlers

  (* this resets the default uncaught exception action to the system default,
   * and removes any layered actions.
   *)
    fun resetUncaughtFn' () = (defaultHandler := defaultHandlerFn; handlers := [])

    val exnUpdateCh : (unit -> unit) CML.chan = CML.channel()

    fun exnServerStartup () = let
	  val errCh = Mailbox.mailbox()
	(* this function is installed as the default handler for threads;
	 * it sends the thread ID and uncaught exception to the ExnServer.
	 *)
	  fun threadHandler exn = Mailbox.send(errCh, (CML.getTid(), exn))
	(* invoke the hsndler actions on the uncaught exception *)
	  fun handleExn arg = let
		val hdlrList = !handlers and dfltHndlr = !defaultHandler
		fun loop [] = dfltHndlr arg
		  | loop (hdlr::r) = if (hdlr arg) then () else loop r
		in
		  CML.spawn (fn () => ((loop hdlrList) handle _ => (dfltHndlr arg)));
		  ()
		end
	  val event = [
		  CML.wrap (CML.recvEvt exnUpdateCh, fn f => f()),
		  CML.wrap (Mailbox.recvEvt errCh, handleExn)
		]
	  fun server () = (CML.select event; server())
	  in
	    Thread.defaultExnHandler := threadHandler;
	    CML.spawn server; ()
	  end

    val _ = (
	  RunCML.logChannel ("TraceCML:exnUpdateCh", exnUpdateCh);
	  RunCML.logServer ("TraceCML", exnServerStartup, fn () => ()))

    local
      fun carefully f = if RunCML.isRunning() then CML.send(exnUpdateCh, f) else f()
    in
    fun setUncaughtFn arg = carefully (fn () => setUncaughtFn' arg)
    fun setHandleFn arg = carefully (fn () => setHandleFn' arg)
    fun resetUncaughtFn arg = carefully (fn () => resetUncaughtFn' arg)
    end (* local *)

  end; (* TraceCML *)



1.1                  mlton/lib/cml/core-cml/channel.sig

Index: channel.sig
===================================================================
(* channel.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* channel-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The representation of synchronous channels.
 *)

signature CHANNEL =
   sig
      type 'a chan

      val channel : unit -> 'a chan	 
      val sameChannel : ('a chan * 'a chan) -> bool

      val send : ('a chan * 'a) -> unit
      val recv : 'a chan -> 'a
	 
      val sendEvt  : ('a chan * 'a) -> unit Event.event
      val recvEvt  : 'a chan -> 'a Event.event

      val sendPoll : ('a chan * 'a) -> bool
      val recvPoll : 'a chan -> 'a option
  end

signature CHANNEL_EXTRA =
   sig
      include CHANNEL
      val resetChan : 'a chan -> unit
   end


1.1                  mlton/lib/cml/core-cml/channel.sml

Index: channel.sml
===================================================================
(* channel.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* channel.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The representation of synchronous channels.
 *
 * To ensure that we always leave the atomic region exactly once, we
 * require that the blocking operation be responsible for leaving the
 * atomic region (in the event case, it must also execute the clean-up
 * action).  The doitFn always transfers control to the blocked thread
 * without leaving the atomic region.  Note that the send (and sendEvt)
 * blockFns run using the receiver's thread ID.
 *)

structure Channel : CHANNEL_EXTRA =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)
	 
      structure Q = ImpQueue
      structure S = Scheduler
      structure E = Event
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)
	 
      datatype trans_id = datatype TransID.trans_id
      datatype trans_id_state = datatype TransID.trans_id_state


      datatype 'a chan = 
	 CHAN of {prio : int ref,
		  inQ  : (trans_id * 'a S.thread) Q.t,
		  outQ : (trans_id * 'a S.thread S.thread) Q.t}

      fun resetChan (CHAN {prio, inQ, outQ}) = 
	 (prio := 1
	  ; Q.reset inQ
	  ; Q.reset outQ)

      fun channel () = CHAN {prio = ref 1, inQ = Q.new (), outQ = Q.new ()}

      (* sameChannel : ('a chan * 'a chan) -> bool *)
      fun sameChannel (CHAN {prio = prio1, ...}, CHAN {prio = prio2, ...}) =
	 prio1 = prio2


      (* bump a priority value by one, returning the old value *)
      fun bumpPriority (p as ref n) = (p := n+1; n)

      (* functions to clean channel input and output queues *)
      local
	 fun cleaner (TXID txst, _) = 
	    case !txst of CANCEL => true | _ => false
      in
	 fun cleanAndChk (prio, q) : int =
	    (Q.clean (q, cleaner)
	     ; if Q.empty q
		  then 0
		  else bumpPriority prio)
	 fun cleanAndDeque q =
	    Q.cleanAndDeque (q, cleaner)
	 fun enqueAndClean (q, item) =
	    Q.enqueAndClean (q, item, cleaner)
      end

      fun send (CHAN {prio, inQ, outQ}, msg) = 
	 let
	    val () = Assert.assertNonAtomic' "Channel.send"
	    val () = debug' "send(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.send(1)"
	    val () = S.atomicBegin ()
	    val () = debug' "send(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Channel.send(2)", SOME 1)
	    val () = 
	       case cleanAndDeque inQ of 
		  SOME (rtxid, rt) => 
		     let
			val () = debug' "send(3.1.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.send(3.1.1)", SOME 1)
			val () =
			   S.readyAndSwitch
			   (fn () =>
			    (prio := 1
			     ; TransID.force rtxid
			     ; (rt, msg)))
			val () = debug' "send(3.1.2)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.send(3.1.2)"
		     in
			()
		     end
		| NONE => 
		     let
			val () = debug' "send(3.2.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.send(3.2.1)", SOME 1)
			val rt = 
			   S.atomicSwitchToNext
			   (fn st => Q.enque (outQ, (TransID.mkTxId (), st)))
			val () = debug' "send(3.2.2)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.send(3.2.2)", SOME 1)
			val () = S.atomicReadyAndSwitch (fn () => (rt, msg))
			val () = debug' "send(3.2.3)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.send(3.2.2)"
		     in
			()
		     end
	    val () = debug' "send(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.send(4)"
	 in
	    ()
	 end

      fun sendEvt (CHAN {prio, inQ, outQ}, msg) =
	 let
	    fun doitFn () = 
	       let
		  val () = Assert.assertAtomic' ("Channel.sendEvt.doitFn", NONE)
		  val (rtxid, rt) = valOf (Q.deque inQ)
		  val () = debug' "sendEvt(3.1.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.sendEvt(3.1.1)", SOME 1)
		  val () =
		     S.readyAndSwitch
		     (fn () =>
		      (prio := 1
		       ; TransID.force rtxid
		       ; (rt, msg)))
		  val () = debug' "sendEvt(3.1.2)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Channel.sendEvt(3.1.2)"
	       in
		  ()
	       end
	    fun blockFn {transId, cleanUp, next} = 
	       let
		  val () = Assert.assertAtomic' ("Channel.sendEvt.blockFn", NONE)
		  val () = debug' "sendEvt(3.2.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.1)", SOME 1)
		  val rt = 
		     S.atomicSwitch
		     (fn st =>
		      (enqueAndClean (outQ, (transId, st))
		       ; (next (), ())))
		  val () = debug' "sendEvt(3.2.2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.sendEvt(3.2.2)", SOME 1)
		  val () = cleanUp ()
		  val () = S.atomicReadyAndSwitch (fn () => (rt, msg))
		  val () = debug' "sendEvt(3.2.3)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Channel.sendEvt(3.2.2)"
	       in
		  ()
	       end
	  fun pollFn () = 
	     let
		val () = Assert.assertAtomic' ("Channel.sendEvt.pollFn", NONE)
		val () = debug' "sendEvt(2)" (* Atomic 1 *)
		val () = Assert.assertAtomic' ("Channel.sendEvt(2)", SOME 1)
	     in
		case cleanAndChk (prio, inQ) of
		   0 => E.blocked blockFn
		 | prio => E.enabled {prio = prio, doitFn = doitFn}
	     end
	 in
	    E.bevt pollFn
	 end
      
      fun sendPoll (CHAN {prio, inQ, ...}, msg) = 
	 let
	    val () = Assert.assertNonAtomic' "Channel.sendPoll"
	    val () = debug' "sendPoll(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.sendPoll(1)"
	    val () = S.atomicBegin ()
	    val () = debug' "sendPoll(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Channel.sendPoll(1)", SOME 1)
	    val b = 
	       case cleanAndDeque inQ of 
		  SOME (rtxid, rt) => 
		     let
			val () = debug' "sendPoll(3.1.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.sendPoll(3.1.1)", SOME 1)
			val () =
			   S.readyAndSwitch
			   (fn () =>
			    (prio := 1
			     ; TransID.force rtxid
			     ; (rt, msg)))
			val b = true
			val () = debug' "sendPoll(3.1.2)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.sendPoll(3.1.2)"
		     in
			b
		     end
		| NONE => 
		     let
			val () = debug' "sendPoll(3.2.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.1)", SOME 1)
			val b = false
			val () = debug' "sendPoll(3.2.2)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.sendPoll(3.2.2)", SOME 1)
			val () = S.atomicEnd ()
			val () = debug' "sendPoll(3.2.3)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.sendPoll(3.2.2)"
		     in
			b
		     end
	    val () = debug' "sendPoll(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.sendPoll(4)"
	 in
	    b
	 end
	 
      fun recv (CHAN {prio, inQ, outQ}) =
	 let
	    val () = Assert.assertNonAtomic' "Channel.recv"
	    val () = debug' "recv(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.recv(1)"
	    val () = S.atomicBegin ()
	    val () = debug' "recv(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Channel.recv(2)", SOME 1)
	    val msg = 
	       case cleanAndDeque outQ of
		  SOME (stxid, st) =>	
		     let
			val () = debug' "recv(3.1.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.recv(3.1.1)", SOME 1)
			val msg =
			   S.switch
			   (fn rt =>
			    (prio := 1
			     ; TransID.force stxid
			     ; (st, rt)))
			val () = debug' "recv(3.1.2)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.recv(3.1.1)"
		     in
			msg
		     end
		| NONE =>
		     let
			val () = debug' "recv(3.2.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1)
			val msg =
			   S.atomicSwitchToNext
			   (fn rt => enqueAndClean (inQ, (TransID.mkTxId (), rt)))
			val () = debug' "recv(3.2.2)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.recv(3.2.2)", SOME 1)
			val () = S.atomicEnd ()
			val () = debug' "recv(3.2.3)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.recv(3.2.3)"
		     in
			msg
		     end
	    val () = debug' "recv(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.recv(4)"
	 in
	    msg
	 end
	 
      fun recvEvt (CHAN {prio, inQ, outQ}) = 
	 let
	    fun doitFn () = 
	       let
		  val () = Assert.assertAtomic' ("Channel.recvEvt.doitFn", NONE)
		  val (stxid, st) = valOf (Q.deque outQ)
		  val () = debug' "recvEvt(3.1.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.recvEvt(3.1.1)", SOME 1)
		  val msg =
		     S.switch
		     (fn rt =>
		      (prio := 1
		       ; TransID.force stxid
		       ; (st, rt)))
		  val () = debug' "recvEvt(3.1.2)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Channel.recvEvt(3.1.1)"
	       in
		  msg
	       end
	    fun blockFn {transId, cleanUp, next} = 
	       let
		  val () = Assert.assertAtomic' ("Channel.recvEvt.blockFn", NONE)
		  val () = debug' "recvEvt(3.2.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.1)", SOME 1)
		  val msg =
		     S.atomicSwitch
		     (fn rt =>
		      (enqueAndClean (inQ, (transId, rt))
		       ; (next (), ())))
		  val () = debug' "recvEvt(3.2.2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.recvEvt(3.2.2)", SOME 1)
		  val () = cleanUp ()
		  val () = S.atomicEnd ()
		  val () = debug' "recvEvt(3.2.3)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Channel.recvEvt(3.2.3)"
	       in
		  msg
	       end
	    fun pollFn () = 
	       let
		  val () = Assert.assertAtomic' ("Channel.recvEvt.pollFn", NONE)
		  val () = debug' "recvEvt(2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Channel.recvEvt(2)", SOME 1)
	       in
		  case cleanAndChk (prio, outQ) of 
		     0 => E.blocked blockFn
		   | prio => E.enabled {prio = prio, doitFn = doitFn}
	       end
	 in
	    E.bevt pollFn
	 end
      
      fun recvPoll (CHAN {prio, outQ, ...}) = 
	 let
	    val () = Assert.assertNonAtomic' "Channel.recvPoll"
	    val () = debug' "recvPoll(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.recvPoll(1)"
	    val () = S.atomicBegin ()
	    val () = debug' "recvPoll(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Channel.recvPoll(2)", SOME 1)
	    val msg =
	       case cleanAndDeque outQ of 
		  SOME (stxid, st) => 
		     let
			val () = debug' "recvPoll(3.1.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.recvPoll(3.1.1)", SOME 1)
			val msg =
			   S.switch
			   (fn rt =>
			    (prio := 1
			     ; TransID.force stxid
			     ; (st, rt)))
			val msg = SOME msg
			val () = debug' "recvPoll(3.1.2)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.recvPoll(3.1.1)"
		     in
			msg
		     end
		| NONE => 
		     let
			val () = debug' "recv(3.2.1)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.recv(3.2.1)", SOME 1)
			val msg = NONE
			val () = debug' "recvPoll(3.2.2)" (* Atomic 1 *)
			val () = Assert.assertAtomic' ("Channel.recvPoll(3.2.2)", SOME 1)
			val () = S.atomicEnd ()
			val () = debug' "recvPoll(3.2.3)" (* NonAtomic *)
			val () = Assert.assertNonAtomic' "Channel.recvPoll(3.2.3)"
		     in
			msg
		     end
	    val () = debug' "recvPoll(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.recvPoll(4)"
	 in
	    msg
	 end
   end



1.1                  mlton/lib/cml/core-cml/cml.sig

Index: cml.sig
===================================================================
(* cml.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* cml-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The interface to the core CML features.
 *)

signature CML =
  sig
     include VERSION
     
     include THREAD
     include CHANNEL
     include EVENT
     include TIME_OUT

     val print : string -> unit
  end



1.1                  mlton/lib/cml/core-cml/cml.sml

Index: cml.sml
===================================================================
(* cml.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* cml.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure CML : CML =
   struct
      open Version
      open Thread
      open Channel
      open Event
      open TimeOut

      fun print s =
	 (Scheduler.atomicBegin ();
	  TextIO.print s;
	  Scheduler.atomicEnd ())
   end




1.1                  mlton/lib/cml/core-cml/core-cml.cm

Index: core-cml.cm
===================================================================
Group is
  ../util/util.cm
  rep-types.sml
  running.sml
  trans-id.sig
  trans-id.sml
  cvar.sig
  cvar.sml
  thread-id.sig
  thread-id.sml
  scheduler-hooks.sig
  scheduler-hooks.sml
  scheduler.sig
  scheduler.sml
  event.sig
  event.sml
  thread.sig
  thread.sml
  channel.sig
  channel.sml
  timeout.sig
  timeout.sml
  version.sig
  version.sml
  cml.sig
  cml.sml

  mailbox.sig
  mailbox.sml
  sync-var.sig
  sync-var.sml

  run-cml.sig
  run-cml.sml



1.1                  mlton/lib/cml/core-cml/cvar.sig

Index: cvar.sig
===================================================================
(* cvar.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* ???
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature CVAR =
   sig
      datatype cvar = datatype RepTypes.cvar
      datatype cvar_state = datatype RepTypes.cvar_state

      val new : unit -> cvar
   end



1.1                  mlton/lib/cml/core-cml/cvar.sml

Index: cvar.sml
===================================================================
(* cvar.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* ???
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure CVar : CVAR =
   struct
      structure R = RepTypes


      (* Condition variables are essentially unit valued ivars, and 
       * are used for various internal synchronization conditions 
       * (e.g., nack events, I/O synchronization, and thread termination).
       *)
      datatype cvar = datatype R.cvar
      datatype cvar_state = datatype R.cvar_state

      fun new () = CVAR (ref (CVAR_unset []))
   end



1.1                  mlton/lib/cml/core-cml/event.sig

Index: event.sig
===================================================================
(* event.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* events-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The representation of event values and the event combinators.
 *)

signature EVENT =
   sig
      type 'a event

      val never     : 'a event
      val alwaysEvt : 'a -> 'a event

      val wrap        : ('a event * ('a -> 'b)) -> 'b event
      val wrapHandler : ('a event * (exn -> 'a)) -> 'a event

      val guard    : (unit -> 'a event) -> 'a event
      val withNack : (unit event -> 'a event) -> 'a event

      val choose : 'a event list -> 'a event
      val sync : 'a event -> 'a
      val select : 'a event list -> 'a
   end

signature EVENT_EXTRA =
   sig
      include EVENT

      type 'a status
      val enabled : {prio : int, doitFn : unit -> 'a} -> 'a status
      val blocked : ({transId : TransID.trans_id,
		      cleanUp : unit -> unit,
		      next : unit -> Scheduler.rdy_thread} -> 'a) -> 'a status
      val bevt : (unit -> 'a status) -> 'a event

      val atomicCVarSet : CVar.cvar -> unit
      val cvarGetEvt    : CVar.cvar -> unit event
   end



1.1                  mlton/lib/cml/core-cml/event.sml

Index: event.sml
===================================================================
(* event.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* event.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The representation of event values and the event combinators.
 *
 * Some important requirements on the implementation of base event values:
 *
 *  1)	The pollFn, doitFn, and blockFn are always called from inside
 *	atomic regions.
 *
 *  2)	The pollFn returns an integer priority: this is 0 when not enabled,
 *	~1 for fixed priority, and a positive value for dynamic priority.
 *	The standard scheme is to associate a counter with the underlying
 *	synchronization object, and to increase it by one for each
 *	synchronization attempt.
 *
 *  3)  The blockFn is responsible for exiting the atomic region; the doitFns
 *	should NOT leave the atomic region.
 *
 *  4)  The blockFn is responsible for executing the "cleanUp" action
 *	prior to leaving the atomic region.
 *)

structure Event : EVENT_EXTRA =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)

      structure S = Scheduler
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)

      datatype trans_id = datatype TransID.trans_id
      datatype trans_id_state = datatype TransID.trans_id_state
      datatype cvar = datatype CVar.cvar
      datatype cvar_state = datatype CVar.cvar_state


      datatype status = datatype RepTypes.status
      val enabled = ENABLED
      val blocked = BLOCKED

      type 'a base = 'a RepTypes.base

      datatype event = datatype RepTypes.event
      val bevt = fn pollFn => BEVT [pollFn]

      datatype 'a group =
	 BASE of 'a base list
       | GRP of 'a group list
       | NACK of cvar * 'a group


      (** Condition variables.  Because these variables are set inside atomic
       ** regions, we have to use different conventions for clean-up, etc.
       ** Instead of requiring the blockFn continuation to call the cleanUp
       ** action and to leave the atomic region, we call the cleanUp function
       ** when setting the condition variable (in atomicCVarSet), and have the
       ** invariant that the blockFn continuation is dispatched outside the
       ** atomic region.
       **)

      (* set a condition variable; 
       * we assume that this function is always executed in an atomic region.
       *)
      fun atomicCVarSet (CVAR state) : unit =
	 let
	    val () = Assert.assertAtomic' ("Event.atomicCVarSet", NONE)
	    val () = debug' "atomicCVarSet" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Event.atomicCVarSet", SOME 1)
	 in
	    case !state of
	       CVAR_unset waiting =>
		  let
		     fun add waiting : unit =
			case waiting of 
			   [] => ()
			 | ({transId = TXID txst, cleanUp, thread}::waiting) =>
			      (case !txst of 
				  CANCEL => ()
				| TRANS =>
				     (txst := CANCEL
				      ; cleanUp ()
				      ; S.ready thread)
			       ; add waiting)
		  in
		     state := CVAR_set 1
		     ; add waiting
		  end
	     | _ => raise Fail "atomicCVarSet"
	 end

      (* the event constructor for waiting on a cvar.
       *)
      fun cvarGetEvt (CVAR state) : unit event =
	 let
	    fun doitFn () =
	       let
		  val () = Assert.assertAtomic' ("Event.cvarGetEvt.doitFn", NONE)
		  val () = debug' "cvarGetEvt(3.1.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.cvarGetEvt(3.1.1)", SOME 1)
		  val () = state := CVAR_set 1
		  val () = S.atomicEnd ()
		  val () = debug' "cvarGetEvt(3.1.2)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Event.cvarGetEvt(3.1.2)"
	       in
		  ()
	       end
	    fun blockFn {transId, cleanUp, next} =
	       let
		  val () = Assert.assertAtomic' ("Event.cvarGetEvt.blockFn", NONE)
		  val () = debug' "cvarGetEvt(3.2.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.cvarGetEvt(3.2.1)", SOME 1)
		  val () =
		     S.atomicSwitch
		     (fn t =>
		      let
			 val item = {transId = transId,
				     cleanUp = cleanUp,
				     thread = t}
			 val waiting =
			    case !state of
			       CVAR_unset waiting => waiting
			     | _ => raise Fail "cvarGetEvt:blockFn"
		      in
			 state := CVAR_unset (item::waiting)
			 ; (next (), ())
		      end)
		  val () = debug' "cvarGetEvt(3.2.2)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Event.cvarGetEvt(3.2.2)"
	       in
		  ()
	       end
	    fun pollFn () =
	       let
		  val () = Assert.assertAtomic' ("Event.cvarGetEvt.pollFn", NONE)
		  val () = debug' "cvarGetEvt(2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.cvarGetEvt(2)", SOME 1)
	       in
		  case !state of
		     CVAR_set n =>
			(state := CVAR_set (n + 1)
			 ; enabled {prio = n, doitFn = doitFn})
		   | _ => blocked blockFn
	       end
	 in
	    bevt pollFn
	 end


      (* event combinators *)
      val never : 'a event = 
	 BEVT []
      fun alwaysEvt (v : 'a) : 'a event =
	 let
	    fun doitFn () =
	       let
		  val () = Assert.assertAtomic' ("Event.alwaysEvt.doitFn", NONE)
		  val () = debug' "alwaysEvt(3.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.alwaysEvt(3.1)", SOME 1)
		  val () = S.atomicEnd ()
		  val () = debug' "alwaysEvt(3.2)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "Event.alwaysEvt(3.2)"
	       in
		  v
	       end
	    fun pollFn () =
	       let
		  val () = Assert.assertAtomic' ("Event.alwaysEvt.pollFn", NONE)
		  val () = debug' "alwaysEvt(2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.alwaysEvt(2)", SOME 1)
	       in
		  enabled {prio = ~1, doitFn = doitFn}
	       end
	 in
	    bevt pollFn
	 end

      fun wrap (evt : 'a event, wfn : 'a -> 'b) : 'b event =
	 let
	    fun wrapF f x = wfn (f x)
	    fun wrapBaseEvt pollFn () =
	       case pollFn () of
		  ENABLED {prio, doitFn} => 
		     ENABLED {prio = prio, doitFn = wrapF doitFn}
		| BLOCKED blockFn => 
		     BLOCKED (wrapF blockFn)
	    fun wrap' evt =
	       case evt of
		  BEVT bevts => 
		     BEVT(List.map wrapBaseEvt bevts)
		| CHOOSE evts =>
		     CHOOSE(List.map wrap' evts)
		| GUARD g =>
		     GUARD(fn () => wrap (g (), wfn))
		| WNACK f =>
		     WNACK(fn evt => wrap (f evt, wfn))
	 in
	    wrap' evt
	 end
      fun wrapHandler (evt : 'a event, hfn : exn -> 'a) : 'a event =
	 let
	    fun wrapF f x = (f x) handle exn => hfn exn
	    fun wrapBaseEvt pollFn () =
	       case pollFn () of
		  ENABLED {prio, doitFn} => 
		     ENABLED {prio = prio, doitFn = wrapF doitFn}
		| BLOCKED blockFn => 
		     BLOCKED (wrapF blockFn)
	    fun wrap' evt =
	       case evt of
		  BEVT bevts => 
		     BEVT(List.map wrapBaseEvt bevts)
		| CHOOSE evts =>
		     CHOOSE(List.map wrap' evts)
		| GUARD g =>
		     GUARD(fn () => wrapHandler (g (), hfn))
		| WNACK f =>
		     WNACK(fn evt => wrapHandler (f evt, hfn))
	 in
	    wrap' evt
	 end

      val guard = GUARD
      val withNack = WNACK

      fun choose (evts : 'a event list) : 'a event =
	 let
	    val () = Assert.assertNonAtomic' "Event.choose"
	    val () = debug' "choose(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.choose(1)"
	    fun gatherBEvts (evts, bevts') =
	       case (evts, bevts') of
		  ([], bevts') => BEVT bevts'
	        | ((BEVT bevts)::evts, bevts') => gatherBEvts (evts, bevts @ bevts')
		| (evts, []) => gather (evts, [])
		| (evts, bevts') => gather (evts, [BEVT bevts'])
	    and gather (evts, evts') =
	       case (evts, evts') of
		  ([], [evt']) => evt'
		| ([], evts') => CHOOSE evts'
		| ((CHOOSE cevts)::evts, evts') => 
		     gather (evts, cevts @ evts')
		| ((BEVT [])::evts, evts') =>
		     gather (evts, evts')
		| ((BEVT bevts)::evts, (BEVT bevts')::evts') =>
		     gather (evts, BEVT (bevts @ bevts')::evts')
		| (evt::evts, evts') => 
		     gather (evts, evt::evts')
	    val evt = gatherBEvts (List.rev evts, [])
	 in
	    evt
	 end
      

      local
	 val cnt = ref 0
	 fun random i =
	    let val j = !cnt
	    in
	       if j = 1000000 then cnt := 0 else cnt := j + 1
	       ; Int.rem (j, i)
	    end
      in
      fun selectDoitFn (doitFns : {prio : int, doitFn : 'a} list) : 'a =
	 let
	    val () = Assert.assertAtomic' ("Event.selectDoitFn", NONE)
	    val () = debug' "selectDoitFn(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Event.selectDoitFn(2)", SOME 1)
	 in
	    case doitFns of
	       [{doitFn, ...}] => doitFn
	     | doitFns =>
		  let
		     fun select (doitFns, maxP,
				 doitFnsMaxP, numMaxP,
				 doitFnsFixed, numFixed) =
			case doitFns of
			   [] => (case (doitFnsMaxP, doitFnsFixed) of
				     ([doitFn], []) => doitFn
				   | ([], [doitFn]) => doitFn
				   | (doitFnsMaxP, doitFnsFixed) =>
					let
					   val bias = 2
					   val num = numFixed + bias * numMaxP
					   val k = random num
					in
					   if k < numFixed
					      then List.nth (doitFnsFixed, k)
					      else List.nth (doitFnsMaxP, 
							     Int.mod(k - numFixed, numMaxP))
					end)
			 | {prio, doitFn}::doitFns =>
			      if prio = ~1
				 then select(doitFns, maxP, 
					     doitFnsMaxP, numMaxP,
					     doitFn::doitFnsFixed, numFixed + 1)
				 else if prio > maxP 
					 then select(doitFns, prio, 
						     [doitFn], 1,
						     doitFnsFixed, numFixed)
					 else if prio = maxP
						 then select(doitFns, maxP, 
							     doitFn::doitFnsMaxP, numMaxP + 1,
							     doitFnsFixed, numFixed)
						 else select(doitFns, maxP, 
							     doitFnsMaxP, numMaxP,
							     doitFnsFixed, numFixed)
		  in
		     select (doitFns, 0, [], 0, [], 0)
		  end
	 end
      end

      fun syncOnBEvt (pollFn : 'a base) : 'a =
	 let
	    val () = Assert.assertNonAtomic' "Event.syncOnBEvt"	
	    val () = debug' "syncOnBEvt(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.syncOnBEvt(1)"
	    val () = S.atomicBegin ()	
	    val () = debug' "syncOnBEvt(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Event.syncOnBEvt(2)", SOME 1)
	    val x = 
	       case pollFn () of
		  ENABLED {doitFn, ...} => doitFn ()
		| BLOCKED blockFn => 
		     let val (transId, cleanUp) = TransID.mkFlg ()
		     in blockFn {transId = transId,
				 cleanUp = cleanUp,
				 next = S.next}
		     end
	    val () = debug' "syncOnBEvt(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.syncOnBEvt(4)"
	 in
	    x
	 end
		  
      (* this function handles the case of synchronizing on a list of 
       * base events (w/o any negative acknowledgements).   It also handles 
       * the case of syncrhonizing on NEVER.
       *) 
      fun syncOnBEvts (bevts : 'a base list) : 'a =
	 let
	    val () = Assert.assertNonAtomic' "Event.syncOnBEvts"
	    val () = debug' "syncOnBEvts(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.syncOnBEvts(1)"
	    fun ext (bevts, blockFns) : 'a =
	       let
		  val () = debug' "syncOnBEvts(2).ext" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext", SOME 1)
	       in
		  case bevts of
		     [] =>
			let
			   val () = debug' "syncOnBEvts(2).ext([])" (* Atomic 1 *)	
			   val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext([])", SOME 1)
			in
			   S.atomicSwitch
			   (fn (t : 'a S.thread) =>
			    let
			       val (transId, cleanUp) = TransID.mkFlg ()
			       fun log blockFns : S.rdy_thread =
				  let
				     val () = debug' "syncOnBEvts(2).ext([]).log" (* Atomic 1 *)	
				     val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).ext([]).log", SOME 1)
				  in
				     case blockFns of
					[] => S.next ()
				      | blockFn::blockFns =>
					   S.new
					   (fn _ => fn () =>
					    let 
					       val () = S.atomicBegin ()
					       val x = blockFn {transId = transId,
								cleanUp = cleanUp,
								next = fn () => log blockFns}
					    in S.switch(fn _ => (t, x))
					    end)
				  end
			    in
			       (log blockFns, ())
			    end)
			end
		   | pollFn::bevts =>
			(case pollFn () of
			    ENABLED doitFn => extRdy (bevts, [doitFn])
			  | BLOCKED blockFn => ext (bevts, blockFn::blockFns))
	       end
	    and extRdy (bevts, doitFns) : 'a =
	       let
		  val () = debug' "syncOnBEvts(2).extRdy" (* Atomic 1*)
		  val () = Assert.assertAtomic' ("Event.syncOnBEvts(2).extRdy", SOME 1)
	       in
		  case bevts of
		     [] => 
			let val doitFn = selectDoitFn doitFns
			in doitFn ()
			end
		   | pollFn::bevts =>
			(case pollFn () of
			    ENABLED doitFn => extRdy (bevts, doitFn::doitFns)
			  | _ => extRdy (bevts, doitFns))
	       end
	    val x =
	       case bevts of
		  [] => S.switchToNext (fn _ => ())
		| [bevt] => syncOnBEvt bevt
		| bevts => (S.atomicBegin (); ext (bevts, []))
	    val () = debug' "syncOnBEvts(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.syncOnBEvts(4)"
	 in
	    x
	 end

      (* walk the event group tree, collecting the base events (with associated
       * ack flags), and a list of flag sets.  A flag set is a (cvar * ack flag list)
       * pairs, where the flags are those associated with the events covered by the
       * nack cvar.
       *)
      type ack_flg = bool ref
      type ack_flgs = ack_flg list
      type 'a back = 'a base * ack_flg
      type 'a backs = 'a back list
      type flg_set = cvar * ack_flg list
      type flg_sets = flg_set list
      fun collect (gevt : 'a group) : 'a backs * flg_sets = 
	 let
	    fun gatherWrapped (gevt : 'a group, backs : 'a backs, flgSets : flg_sets) :
                              'a backs * flg_sets = 
	       let
		  fun gather (gevt : 'a group, backs : 'a backs, 
			      ackFlgs : ack_flgs, flgSets : flg_sets) :
		             'a backs * ack_flgs * flg_sets =
		     case gevt of
			BASE bevts =>
			   let
			      fun append (bevts, backs, ackFlgs) =
				 case bevts of
				    [] => (backs, ackFlgs)
				  | bevt::bevts =>
				       let val ackFlg = ref false
				       in append (bevts, (bevt, ackFlg)::backs, ackFlg::ackFlgs)
				       end
			      val (backs', ackFlgs') = append (bevts, backs, ackFlgs)
			   in
			      (backs', ackFlgs', flgSets)
			   end
		      | GRP gevt =>
			   let
			      fun f (gevt', (backs', ackFlgs', flgSets')) =
				 gather (gevt', backs', ackFlgs', flgSets')
			   in List.foldl f (backs, ackFlgs, flgSets) gevt
			   end
		      | NACK (cvar, gevt) => 
			   let
			      val (backs', ackFlgs', flgSets') =
				 gather (gevt, backs, [], flgSets)
			   in
			      (backs', ackFlgs' @ ackFlgs, (cvar, ackFlgs')::flgSets')
			   end
		  val (backs, _, flgSets) = gather (gevt, backs, [], flgSets)
	       in
		  (backs, flgSets)
	       end
	 in
	    case gevt of
	       GRP _ => 
		  let
		     val ackFlg = ref false
		     fun gather (gevt : 'a group, backs : 'a backs, flgSets : flg_sets) :
			        'a backs * flg_sets =
			case gevt of
			   BASE bevts => 
			      let
				 fun append (bevts, backs) =
				    case bevts of
				       [] => backs
				     | bevt::bevts => append (bevts, (bevt, ackFlg)::backs)
			      in 
				 (append (bevts, backs), flgSets)
			      end
			 | GRP gevt =>
			      let
				 fun f (gevt', (backs', flgSets')) =
				    gather(gevt', backs', flgSets')
			      in List.foldl f (backs, flgSets) gevt
			      end
			 | NACK _ =>
			      gatherWrapped (gevt, backs, flgSets)
		  in
		     gather (gevt, [], [])
		  end
	     | gevt => gatherWrapped (gevt, [], [])
	 end

      (* this function handles the more complicated case of synchronization
       * on groups of events where negative acknowledgements are involved.
       *)
      fun syncOnGrp (gevt : 'a group) : 'a =
	 let
	    val () = Assert.assertNonAtomic' "Event.syncOnGrp"
	    val () = debug' "syncOnGrp(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.syncOnGrp(1)"
	    val (backs, flgSets) = collect gevt
	    fun chkCVars () =
	       let
		  val () = debug' "syncOnGrp.chkCVars" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.syncOnGrp.chkCVars", SOME 1)
		  (* chkCVar checks the flags of a flag set.
		   * If they are all false, then the corresponding cvar
		   * is set to signal the negative ack.
		   *)
		  fun chkCVar (cvar, flgs) =
		     if List.exists ! flgs
			then ()
			else atomicCVarSet cvar
	       in
		  List.app chkCVar flgSets
	       end
	    fun ext (backs, blockFns) : 'a =
	       let
		  val () = debug' "syncOnGrp(2).ext" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext", SOME 1)
	       in
		  case backs of
		     [] =>
			let
			   val () = debug' "syncOnGrp(2).ext([])" (* Atomic 1 *)	
			   val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext([])", SOME 1)
			in
			   S.atomicSwitch
			   (fn (t : 'a S.thread) =>
			    let
			       val (transId, cleanUp) = TransID.mkFlg ()
			       val cleanUp = fn flg => fn () =>
				  (cleanUp ()
				   ; flg := true
				   ; chkCVars ())
			       fun log blockFns : S.rdy_thread =
				  let
				     val () = debug' "syncOnGrp(2).ext([]).log" (* Atomic 1 *)	
				     val () = Assert.assertAtomic' ("Event.syncOnGrp(2).ext([]).log", SOME 1)
				  in
				     case blockFns of
					[] => S.next ()
				      | (blockFn,ackFlg)::blockFns =>
					   S.new
					   (fn _ => fn () =>
					    let 
					       val () = S.atomicBegin ()
					       val x = blockFn {transId = transId,
								cleanUp = cleanUp ackFlg,
								next = fn () => log blockFns}
					    in S.switch(fn _ => (t, x))
					    end)
				  end
			    in
			       (log blockFns, ())
			    end)
			end
		   | (pollFn,ackFlg)::backs =>
			(case pollFn () of
			    ENABLED {prio, doitFn} => 
			       extRdy (backs, [{prio = prio,doitFn = (doitFn, ackFlg)}])
			  | BLOCKED blockFn => ext (backs, (blockFn,ackFlg)::blockFns))
	       end
	    and extRdy (backs, doitFns) : 'a =
	       let
		  val () = debug' "syncOnGrp.extRdy(2)" (* Atomic 1*)
		  val () = Assert.assertAtomic' ("Event.syncOnGrp.extRdy(2)", SOME 1)
	       in
		  case backs of
		     [] => let
			      val (doitFn, flg) = selectDoitFn doitFns
			   in
			      flg := true
			      ; chkCVars ()
			      ; doitFn ()
			   end
		   | (pollFn,ackFlg)::backs =>
			   (case pollFn () of
			       ENABLED {prio, doitFn} => 
				  extRdy (backs, {prio = prio, doitFn = (doitFn, ackFlg)}::doitFns)
			     | _ => extRdy (backs, doitFns))
	       end
	    val x =
	       case backs of
		  [(bevt, _)] => syncOnBEvt bevt
		| _ => (S.atomicBegin (); ext (backs, []))
	    val () = debug' "syncOnGrp(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Event.syncOnGrp(4)"
	 in
	    x
	 end

      local
	 (* force the evaluation of any guards in an event collection,
	  * returning an event group.
	  *)
	 fun forceBL (evts : 'a event list, bevts : 'a base list) : 'a group =
	    case evts of
	       [] => BASE bevts
	     | evt::evts =>
		  (case force evt of
		      BASE bevts' => forceBL (evts, bevts' @ bevts)
		    | GRP gevts => forceGL (evts, if List.null bevts then gevts else gevts @ [BASE bevts])
		    | gevt => forceGL (evts, if List.null bevts then [gevt] else [gevt, BASE bevts]))
	 and forceGL (evts : 'a event list, gevts : 'a group list) : 'a group =
	    case (evts, gevts) of
	       ([], [gevt]) => gevt
	     | ([], gevts) => GRP gevts
	     | (evt::evts, gevts) =>
		  (case (force evt, gevts) of
		      (BASE [], gevts) =>
			 forceGL (evts, gevts)
		    | (BASE bevts', (BASE bevts)::gevts) =>
			 forceGL (evts, BASE (bevts' @ bevts)::gevts)
		    | (GRP gevts', gevts) =>
			 forceGL (evts, gevts' @ gevts)
		    | (gevt, gevts) => 
			 forceGL (evts, gevt::gevts))
	 and force (evt : 'a event) : 'a group =
	    let
	       val gevt =
		  case evt of
		     BEVT bevts => BASE bevts 	
		   | CHOOSE evts => forceBL (evts, [])
		   | GUARD g => force (g ())
		   | WNACK f =>
			let val cvar = CVar.new ()
			in NACK(cvar, force (f (cvarGetEvt cvar)))
			end
	    in
	       gevt
	    end
      in
	 fun sync evt =
	    let
	       val () = Assert.assertNonAtomic' "Event.sync"
	       val () = debug' "sync(1)" (* NonAtomic *)
	       val () = Assert.assertNonAtomic' "Event.sync(1)"
	       val x = 
		  case force evt of
		     BASE bevts => syncOnBEvts bevts
		   | gevt => syncOnGrp gevt
	       val () = debug' "sync(4)" (* NonAtomic *)
	       val () = Assert.assertNonAtomic' "Event.sync(4)"
	    in
	       x
	    end
	 fun select (evts : 'a event list) : 'a =
	    let
	       val () = Assert.assertNonAtomic' "Event.select"
	       val () = debug' "select(1)" (* NonAtomic *)
	       val () = Assert.assertNonAtomic' "Event.select(1)"
	       val x = 
		  case forceBL (evts, []) of
		     BASE bevts => syncOnBEvts bevts
		   | gevt => syncOnGrp gevt
	       val () = debug' "select(4)" (* NonAtomic *)
	       val () = Assert.assertNonAtomic' "Event.select(4)"
	    in
	       x
	    end
      end
   end



1.1                  mlton/lib/cml/core-cml/mailbox.sig

Index: mailbox.sig
===================================================================
(* mailbox.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* mailbox-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Asynchronous channels (called mailboxes).
 *)

signature MAILBOX =
   sig
      type 'a mbox
	 
      val mailbox     : unit -> 'a mbox
      val sameMailbox : ('a mbox * 'a mbox) -> bool
	 
      val send     : ('a mbox * 'a) -> unit
      val recv     : 'a mbox -> 'a
      val recvEvt  : 'a mbox -> 'a CML.event
      val recvPoll : 'a mbox -> 'a option
   end

signature MAILBOX_EXTRA =
   sig
      include MAILBOX
      val resetMbox : 'a mbox -> unit
   end



1.1                  mlton/lib/cml/core-cml/mailbox.sml

Index: mailbox.sml
===================================================================
(* mailbox.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* mailbox.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Asynchronous channels (called mailboxes).
 *)

structure Mailbox : MAILBOX_EXTRA =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)

      structure Q = FunQueue
      structure S = Scheduler
      structure E = Event
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)

      datatype trans_id = datatype TransID.trans_id
      datatype trans_id_state = datatype TransID.trans_id_state


      (* the state of a mailbox.  The queue of the NONEMPTY constructor should
       * never be empty (use EMPTY instead).
       *)
      datatype 'a state =
	 EMPTY of (TransID.trans_id * 'a S.thread) Q.t
       | NONEMPTY of (int * 'a Q.t)

      datatype 'a mbox = MB of 'a state ref

      fun resetMbox (MB state) = state := EMPTY (Q.new ())

      fun mailbox () = MB (ref (EMPTY (Q.new ())))

      fun sameMailbox (MB s1, MB s2) = (s1 = s2)

      local
	 fun cleaner (TXID txst, _) = 
	    case !txst of CANCEL => true | _ => false
      in
	 fun cleanAndDeque q =
	    Q.cleanAndDeque (q, cleaner)
      end

      fun send (MB state, x) = 
	 let
	    val () = Assert.assertNonAtomic' "Mailbox.send"
	    val () = debug' "send(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Mailbox.send(1)"
	    val () = S.atomicBegin ()
	    val () = debug' "send(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Mailbox.send(2)", SOME 1)
	    val () = 
	       case !state of 
		  EMPTY q => (case (cleanAndDeque q) of
				 (NONE, _) => 
				    (let val q = Q.new ()
				     in state := NONEMPTY (1, Q.enque (q, x))
				     end
				     ; S.atomicEnd())
			       | (SOME (transId', t'), q') =>
				    S.atomicReadyAndSwitch
				    (fn () =>
				     (state := EMPTY q'
				      ; TransID.force transId'
				      ; (t', x))))
		| NONEMPTY (p, q) => 
		     (* we force a context switch here to prevent 
		      * a producer from outrunning a consumer.
		      *)
		     S.atomicReadyAndSwitchToNext
		     (fn () => state := NONEMPTY (p, Q.enque (q, x)))
	    val () = debug' "send(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.send(4)"
	 in
	    ()
	 end

      fun getMsg (state, q) = 
	 let
	    val (msg, q') = 
	       case Q.deque q of
		  SOME (msg, q') => (msg, q')
		| NONE => raise Fail "Mailbox:getMsg"
	    val () = if Q.empty q'
			then state := EMPTY (Q.new ())
			else state := NONEMPTY (1, q')
	    val () = S.atomicEnd ()
	 in
	    msg
	 end

      fun recv (MB state) = 
	 let
	    val () = Assert.assertNonAtomic' "Mailbox.recv"
	    val () = debug' "recv(1)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Mailbox.recv(1)"
	    val () = S.atomicBegin ()
	    val () = debug' "recv(2)" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Mailbox.recv(2)", SOME 1)
	    val msg =
	       case !state of 
		  EMPTY q => 
		     let
			val msg = 
			   S.atomicSwitchToNext
			   (fn t => state := EMPTY (Q.enque (q, (TransID.mkTxId (), t))))
		     in
			S.atomicEnd()
			; msg
		     end
		| NONEMPTY (_, q) => getMsg (state, q)
	    val () = debug' "recv(4)" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Channel.recv(4)"
	 in
	    msg
	 end

      fun recvEvt (MB state) = 
	 let
	    fun blockFn {transId, cleanUp, next} = 
	       let
		  val q = 
		     case !state of
			EMPTY q => q
		      | _ => raise Fail "Mailbox:recvEvt:blockFn"
		  val msg = 
		     S.atomicSwitch
		     (fn t => (state := EMPTY (Q.enque (q, (transId, t)))
			       ; (next (), ())))
	       in
		  cleanUp()
		  ; S.atomicEnd()
		  ; msg
	       end
	    fun pollFn () = 
	       case !state of
		  EMPTY _ => E.blocked blockFn
		| NONEMPTY (prio, q) => 
		     (state := NONEMPTY (prio + 1, q)
		      ; E.enabled {prio = prio, 
				   doitFn = fn () => getMsg (state, q)})
	 in
	    E.bevt pollFn
	 end

      fun recvPoll (MB state) = 
	 (S.atomicBegin()
	  ; case !state of
	       EMPTY _ => (S.atomicEnd(); NONE)
	     | NONEMPTY (_, q) => SOME (getMsg (state, q)))
  end



1.1                  mlton/lib/cml/core-cml/rep-types.sml

Index: rep-types.sml
===================================================================
(* rep-types.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* rep-types.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * These are the concrete representations of the various CML types.
 * These types are abstract (or not even visible) outside this library.
 *)

structure RepTypes =
   struct
      (** transaction IDs -- see trans-id.sml *)
      datatype trans_id = TXID of trans_id_state ref
      and trans_id_state =
	 CANCEL
       | TRANS

      (** condition variables --- see cvar.sml and events.sml *)
      datatype cvar = CVAR of cvar_state ref
      and cvar_state =
	 CVAR_unset of {transId : trans_id,
			cleanUp : unit -> unit,
			thread : rdy_thread} list
       | CVAR_set of int

      (** thread IDs --- see thread-id.sml and threads.sml **)
      and thread_id = 
	 TID of {
		 (* an unique ID *)
		 id : int,
		 (* true, if there is a pending alert on this thread *)
		 alert : bool ref,
		 (* set this whenever this thread does some concurrency operation. *)
		 done_comm : bool ref,
		 (* root-level exception handler hook *)
		 exnHandler : (exn -> unit) ref, 
		 (* holds thread-local properties *)
		 props : exn list ref,
		 (* the cvar that becomes set when the thread dies *)
		 dead : cvar
		 }

      (** threads --- see scheduler.sml and threads.sml **)
      and 'a thread = THRD of thread_id * 'a MLton.Thread.t
      withtype rdy_thread = unit thread

      (** events --- see events.sml **)
      datatype 'a status =
	 ENABLED of {prio : int, doitFn : unit -> 'a}
       | BLOCKED of {transId : trans_id,
		     cleanUp : unit -> unit,
		     next : unit -> rdy_thread} -> 'a
      type 'a base = unit -> 'a status 
      datatype 'a event =
	 BEVT of 'a base list
       | CHOOSE of 'a event list
       | GUARD of unit -> 'a event
       | WNACK of unit event -> 'a event
   end



1.1                  mlton/lib/cml/core-cml/run-cml.sig

Index: run-cml.sig
===================================================================
(* run-cml.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* ???
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature RUN_CML =
   sig
      val isRunning : unit -> bool
      val doit : (unit -> unit) * Time.time option -> OS.Process.status
      val shutdown : OS.Process.status -> 'a
   end


1.1                  mlton/lib/cml/core-cml/run-cml.sml

Index: run-cml.sml
===================================================================
(* run-cml.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* run-cml-fn.sml
 *
 * COPYRIGHT (c) 1996 AT&T Research.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure RunCML : RUN_CML =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)

      structure R = Running
      structure S = Scheduler
      structure SH = SchedulerHooks
      structure TID = ThreadID
      structure TO = TimeOut
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)


      local
	 structure Signal = MLton.Signal
	 structure Itimer = MLton.Itimer
	    
	 fun getAlrmHandler () =
	    Signal.getHandler Posix.Signal.alrm
	 fun setAlrmHandler h =
	    Signal.setHandler (Posix.Signal.alrm, h)

	 fun setItimer t =
	    Itimer.set (Itimer.Real, {value = t, interval = t})
      in
	 fun prepareAlrmHandler tq =
	    let
	       val origAlrmHandler = getAlrmHandler ()
	       val tq = 
		  case tq of 
		     SOME tq => tq
		   | NONE => Time.fromMilliseconds 20
	    in
	       (fn alrmHandler =>
		(setAlrmHandler (Signal.Handler.handler (S.unwrap alrmHandler))
		 ; setItimer tq),
		fn () =>
		(setItimer Time.zeroTime
		 ; setAlrmHandler origAlrmHandler))
	    end
      end

      fun isRunning () = !R.isRunning

      fun reset running =
	 (S.reset running
	  ; SH.reset ()
	  ; TID.reset ()
	  ; TO.reset ())

      fun alrmHandler thrd =
	 let 
	    val () = Assert.assertAtomic' ("RunCML.alrmHandler", NONE)
	    val () = debug' "alrmHandler" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("RunCML.alrmHandler", SOME 1)
	    val () = S.preempt thrd
	    val () = ignore (TO.preempt ())
	 in 
	    S.next ()
	 end

      (* Note that SH.pauseHook is only invoked by S.next
       * when there are no threads on the ready queue;
       * Furthermore, note that alrmHandler always
       * enqueues the preepted thread (via S.preempt).
       * Hence, the ready queue is never empty
       * at the S.next in alrmHandler.  Therefore,
       * pauseHook is never run within alrmHandler.
       *)
      fun pauseHook () =
	 let
	    val () = Assert.assertAtomic' ("RunCML.pauseHook", NONE)
	    val () = debug' "pauseHook" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("RunCML.pauseHook", SOME 1)
	    val to = TO.preempt ()
	 in
	    case to of
	       NONE =>
		  (* no waiting threads *) 
		  S.prepend(!SH.shutdownHook, fn () => (true, OS.Process.failure))
	     | SOME NONE =>
		  (* enqueued a waiting thread *)
		  S.next ()
	     | SOME (SOME t) => 
		  (* a waiting thread will be ready in t time *)
		  (if Time.toSeconds t <= 0
		      then ()
		      else S.doMasked (fn () => OS.Process.sleep t)
		   ; pauseHook ())
	 end

      fun doit (initialProc: unit -> unit,
		tq: Time.time option) =
	 let
	    val () =
	       if isRunning ()
		  then raise Fail "CML is running"
		  else ()
	    val (installAlrmHandler, restoreAlrmHandler) = prepareAlrmHandler tq
	    val (cleanUp, status) =
	       S.switchToNext
	       (fn thrd => 
		let
		   val () = R.isRunning := true	
		   val () = reset true
		   val () = SH.shutdownHook := S.prepend (thrd, fn arg => (S.atomicBegin (); arg))
		   val () = SH.pauseHook := pauseHook
		   val () = installAlrmHandler alrmHandler
		   val () = ignore (Thread.spawn initialProc)
		in
		   ()
		end)
	    val () = restoreAlrmHandler ()
	    val () = reset false
	    val () = R.isRunning := false
	    val () = S.atomicEnd ()
	 in
	    status
	 end

      fun shutdown status =
	 if isRunning ()
	    then S.switch (fn _ => (!SH.shutdownHook, (true, status)))
	    else raise Fail "CML is not running"
   end



1.1                  mlton/lib/cml/core-cml/running.sml

Index: running.sml
===================================================================
(* running.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* running.sml
 *
 * COPYRIGHT (c) 1997 Bell Labs, Lucent Technologies.
 *
 * A flag to tell us if CML is running.  This gets set and cleared in the
 * RunCMLFn functor, but other modules need to test it.
 *)

structure Running =
  struct
    val isRunning = ref false
  end



1.1                  mlton/lib/cml/core-cml/scheduler-hooks.sig

Index: scheduler-hooks.sig
===================================================================
(* scheduler-hooks.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* scheduler.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature SCHEDULER_HOOKS =
   sig
      type 'a thread = 'a RepTypes.thread
      type rdy_thread = RepTypes.rdy_thread

      (* this hook gets invoked when the scheduler has nothing else to do;
       * it is invoked in an atomic region
       *)
      val pauseHook : (unit -> rdy_thread) ref

      (* this hook points to a thread that gets invoked when
       * the system is otherwise deadlocked.  It takes two arguments:
       * the first is a boolean flag that says weather to do clean-up,
       * and the second is the exit status.
       *)
      val shutdownHook : (bool * OS.Process.status) thread ref

      val reset : unit -> unit
   end



1.1                  mlton/lib/cml/core-cml/scheduler-hooks.sml

Index: scheduler-hooks.sml
===================================================================
(* scheduler-hooks.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* scheduler.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure SchedulerHooks: SCHEDULER_HOOKS =
   struct
      datatype thread = datatype RepTypes.thread
      type rdy_thread = RepTypes.rdy_thread

      val pauseHookDefault : unit -> rdy_thread =
	 fn _ => raise Fail "SchedulerHooks.pauseHook"
      val pauseHook = ref pauseHookDefault

      val shutdownHookDefault : (bool * OS.Process.status) thread =
	 THRD (ThreadID.bogus "shutdownHook", MLton.Thread.new (fn _ =>
	       raise Fail "SchedulerHooks.shutdownHook"))
      val shutdownHook = ref shutdownHookDefault

      fun reset () =
	 (pauseHook := pauseHookDefault
	  ; shutdownHook := shutdownHookDefault)
   end



1.1                  mlton/lib/cml/core-cml/scheduler.sig

Index: scheduler.sig
===================================================================
(* scheduler.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* scheduler.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature SCHEDULER =
   sig
      include CRITICAL

      type thread_id = ThreadID.thread_id
      type 'a thread = 'a RepTypes.thread
      type rdy_thread = RepTypes.rdy_thread

      val getThreadId : 'a thread -> thread_id
      val getCurThreadId : unit -> thread_id
      val tidMsg : unit -> string

      val ready : rdy_thread -> unit
      val next : unit -> rdy_thread
      val switch : ('a thread -> 'b thread * 'b) -> 'a
      val atomicSwitch : ('a thread -> 'b thread * 'b) -> 'a
      val switchToNext : ('a thread -> unit) -> 'a
      val atomicSwitchToNext : ('a thread -> unit) -> 'a
      val readyAndSwitch : (unit -> 'b thread * 'b) -> unit
      val atomicReadyAndSwitch : (unit -> 'b thread * 'b) -> unit
      val readyAndSwitchToNext : (unit -> unit) -> unit
      val atomicReadyAndSwitchToNext : (unit -> unit) -> unit

      val new : (thread_id -> 'a -> unit) -> 'a thread
      val prepend :'a thread * ('b -> 'a) -> 'b thread
      val unwrap : (rdy_thread -> rdy_thread) -> (unit MLton.Thread.t -> unit MLton.Thread.t)

      val reset : bool -> unit
      val preempt : rdy_thread -> unit
   end



1.1                  mlton/lib/cml/core-cml/scheduler.sml

Index: scheduler.sml
===================================================================
(* scheduler.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* scheduler.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * This module implements the scheduling queues and preemption
 * mechanisms.
 *)

structure Scheduler : SCHEDULER =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure GlobalDebug = Debug
      structure Debug = LocalDebug(val debug = false)

      open Critical

      structure Q = ImpQueue 
      structure T = MLton.Thread
      structure TID = ThreadID
      structure SH = SchedulerHooks

      type thread_id = ThreadID.thread_id
      datatype thread = datatype RepTypes.thread
      type rdy_thread = unit thread


      (* the dummy thread Id; this is used when an ID is needed to get
       * the types right
       *)
      val dummyTid = TID.bogus "dummy"
      (* the error thread.  This thread is used to trap attempts to run CML
       * without proper initialization (i.e., via RunCML).  This thread is
       * enqueued by reset.
       *)
      val errorTid = TID.bogus "error"
      val errorThrd : rdy_thread =
	 THRD (errorTid, T.new (fn () =>
	       (GlobalDebug.sayDebug 
		([fn () => "CML"], fn () => "**** Use RunCML.doit to run CML ****")
		; raise Fail "CML not initialized")))

      local
	 val curTid : thread_id ref = ref dummyTid
      in
	 fun getThreadId (THRD (tid, _)) = tid
	 fun getCurThreadId () = 
	    let
	       val tid = !curTid
	    in
	       tid
	    end
	 fun setCurThreadId tid = 
	    let
	       val () = Assert.assertAtomic' ("Scheduler.setCurThreadId", NONE)
	    in 
	       curTid := tid
	    end
      end
      fun tidMsg () = TID.tidToString (getCurThreadId ())
      fun debug msg = Debug.sayDebug ([atomicMsg, tidMsg], msg)
      fun debug' msg = debug (fn () => msg)

      local
	 val time = ref Time.zeroTime
      in
	 val atomicBegin = fn () =>
	    let
	       val () = atomicBegin ()
(*
	       val () = 
		  case MLton.Thread.atomicState () of
		     MLton.Thread.AtomicState.Atomic 1 => time := Time.now ()
		   | _ => ()
*)
	    in
	       ()
	    end
	 val atomicEnd = fn () =>
	    let
(*
	       val () = 
		  case MLton.Thread.atomicState () of
		     MLton.Thread.AtomicState.Atomic 1 => 
			let
			   val diff = Time.-(Time.now(), !time)
			in 
			   GlobalDebug.sayDebug
			   ([], fn () =>
			    concat [LargeInt.toString (Time.toMilliseconds diff), "ms"])
			end
		   | _ => ()
*)
	       val () = atomicEnd ()
	    in 
	       ()
	    end 
      end

      (* The thread ready queues:
       * rdyQ1 is the primary queue and rdyQ2 is the secondary queue.
       *)
      val rdyQ1 : rdy_thread Q.t = Q.new ()
      and rdyQ2 : rdy_thread Q.t = Q.new ()

      (* enqueue a thread in the primary queue *)
      fun enque1 thrd =
	 (Assert.assertAtomic' ("Scheduler.enque1", NONE)
	  ; Q.enque (rdyQ1, thrd))
      (* enqueue a thread in the secondary queue *)
      fun enque2 thrd =
	 (Assert.assertAtomic' ("Scheduler.enque2", NONE)
	  ; Q.enque (rdyQ2, thrd))
      (* dequeue a thread from the primary queue *)
      fun deque1 () =
	 (Assert.assertAtomic' ("Scheduler.deque1", NONE)
	  ; case Q.deque rdyQ1 of
	       NONE => deque2 ()
	     | SOME thrd => SOME thrd)
      (* dequeue a thread from the secondary queue *)
      and deque2 () =
	 (Assert.assertAtomic' ("Scheduler.deque2", NONE)
	  ; case Q.deque rdyQ2 of
	       NONE => NONE
	     | SOME thrd => SOME thrd)
      (* promote a thread from the secondary queue to the primary queue *)
      fun promote () =
	 (Assert.assertAtomic' ("Scheduler.promote", NONE)
	  ; case deque2 () of
	       NONE => ()
	     | SOME thrd => enque1 thrd)

      fun next () =
	 let
	    val () = Assert.assertAtomic' ("Scheduler.next", NONE)
	    val thrd =
	       case deque1 () of
		  NONE => !SH.pauseHook ()
		| SOME thrd => thrd
	 in
	    thrd
	 end
      fun ready thrd = 
	 let
	    val () = Assert.assertAtomic' ("Scheduler.ready", NONE)
	    val () = enque1 thrd
	 in
	    ()
	 end
      local
	 fun atomicSwitchAux msg f = 
	    (Assert.assertAtomic (fn () => "Scheduler." ^ msg, NONE)
	     ; T.atomicSwitch (fn t => 
			       let
				  val tid = getCurThreadId ()
				  val () = TID.mark tid
				  val (THRD (tid',t'), x') = f (THRD (tid, t))
				  val () = setCurThreadId tid'
			       in 
				  (t', x')
			       end))
      in
	 fun atomicSwitch f =
	    atomicSwitchAux "atomicSwitch" f
	 fun switch f =
	    (atomicBegin (); atomicSwitch f)
	 fun atomicSwitchToNext f =
	    atomicSwitchAux "atomicSwitchToNext" (fn thrd => (f thrd; (next (), ())))
	 fun switchToNext f =
	    (atomicBegin (); atomicSwitchToNext f)
	 fun atomicReadyAndSwitch f =
	    atomicSwitchAux "atomicReadyAndSwitch" (fn thrd => (ready thrd; f ()))
	 fun readyAndSwitch f =
	    (atomicBegin (); atomicReadyAndSwitch f)
	 fun atomicReadyAndSwitchToNext f =
	    atomicSwitchAux "atomicReadyAndSwitchToNext" (fn thrd => (ready thrd; f (); (next (), ())))
	 fun readyAndSwitchToNext f =
	    (atomicBegin (); atomicReadyAndSwitchToNext f)
      end

      fun new (f : thread_id -> 'a -> unit) : 'a thread =
	 let
	    val () = Assert.assertAtomic' ("Scheduler.new", NONE)
	    val tid = TID.new ()
	    val t = T.new (f tid)
	 in
	    THRD (tid, t)
	 end

      fun prepend (thrd : 'a thread, f : 'b -> 'a) : 'b thread =
	 let
	    val () = Assert.assertAtomic' ("Scheduler.prepend", NONE)
	    val THRD (tid, t) = thrd
	    val t = T.prepend (t, f)
	 in
	    THRD (tid, t)
	 end

      fun unwrap (f : rdy_thread -> rdy_thread) (t: unit T.t) : unit T.t =
	 let
	    val () = Assert.assertAtomic' ("Scheduler.unwrap", NONE)
	    val tid = getCurThreadId ()
	    val THRD (tid', t') = f (THRD (tid, t))
	    val () = setCurThreadId tid'
	 in
	    t'
	 end


      (* reset various pieces of state *)
      fun reset running = 
	 (atomicBegin ()
	  ; setCurThreadId dummyTid
	  ; Q.reset rdyQ1; Q.reset rdyQ2
	  ; if not running then ready errorThrd else ()
	  ; atomicEnd ())
      (* what to do at a preemption (with the current thread) *)
      fun preempt (thrd as THRD (tid, _)) =
	 let
	    val () = Assert.assertAtomic' ("Scheduler.preempt", NONE)
	    val () = debug' "Scheduler.preempt" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("Scheduler.preempt", SOME 1)
	    val () =
	       if TID.isMarked tid
		  then (TID.unmark tid
			; promote ()
			; enque1 thrd)
		  else enque2 thrd
	 in
	    ()
	 end

      val _ = reset false
   end


1.1                  mlton/lib/cml/core-cml/sync-var.sig

Index: sync-var.sig
===================================================================
(* sync-var.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* sync-var-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The implementation of Id-style synchronizing memory cells (I-structures
 * and M-structures).
 *)

signature SYNC_VAR =
   sig
      
      type 'a ivar  (* I-structure variable *)
      type 'a mvar  (* M-structure variable *)
	 
      exception Put (* raised on put operations to full cells *)
      
      val iVar     : unit -> 'a ivar
      val iPut     : ('a ivar * 'a) -> unit
      val iGet     : 'a ivar -> 'a
      val iGetEvt  : 'a ivar -> 'a CML.event
      val iGetPoll : 'a ivar -> 'a option
      val sameIVar : ('a ivar * 'a ivar) -> bool
	 
      val mVar      : unit -> 'a mvar
      val mVarInit  : 'a -> 'a mvar
      val mPut      : ('a mvar * 'a) -> unit
      val mTake     : 'a mvar -> 'a
      val mTakeEvt  : 'a mvar -> 'a CML.event
      val mTakePoll : 'a mvar -> 'a option
      val mGet      : 'a mvar -> 'a
      val mGetEvt   : 'a mvar -> 'a CML.event
      val mGetPoll  : 'a mvar -> 'a option
      val mSwap     : ('a mvar * 'a) -> 'a
      val mSwapEvt  : ('a mvar * 'a) -> 'a CML.event
      val sameMVar  : ('a mvar * 'a mvar) -> bool
   end



1.1                  mlton/lib/cml/core-cml/sync-var.sml

Index: sync-var.sml
===================================================================
(* sync-var.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* sync-var.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * The implementation of Id-style synchronizing memory cells.
 *)

structure SyncVar : SYNC_VAR =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)
	 
      structure Q = ImpQueue
      structure S = Scheduler
      structure E = Event
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
	 
      datatype trans_id = datatype TransID.trans_id
      datatype trans_id_state = datatype TransID.trans_id_state


      (* the underlying representation of both ivars and mvars is the same. *)
      datatype 'a cell = 
	 CELL of {prio  : int ref,
		  readQ : (trans_id * 'a S.thread) Q.t,
		  value : 'a option ref}

      type 'a ivar = 'a cell
      type 'a mvar = 'a cell

      exception Put

      fun newCell () = CELL {prio = ref 0, readQ = Q.new(), value = ref NONE}

      (* sameCell : ('a cell * 'a cell) -> bool *)
      fun sameCell (CELL {prio = prio1, ...}, CELL {prio = prio2, ...}) =
	 prio1 = prio2

      (* bump a priority value by one, returning the old value *)
      fun bumpPriority (p as ref n) = (p := n+1; n)

      (* functions to clean channel input and output queues *)
      local
	 fun cleaner (TXID txst, _) = 
	    case !txst of CANCEL => true | _ => false
      in
	 fun cleanAndDeque q =
	    Q.cleanAndDeque (q, cleaner)
	 fun enqueAndClean (q, item) =
	    Q.enqueAndClean (q, item, cleaner)
      end

      (* When a thread is resumed after being blocked on an iGet or mGet operation,
       * there may be other threads also blocked on the variable.  This function
       * is used to propagate the message to all of the threads that are blocked
       * on the variable (or until one of them takes the value in the mvar case).
       * It must be called from an atomic region; when the readQ is finally empty,
       * we leave the atomic region.  We must use "cleanAndDeque" to get items
       * from the readQ in the unlikely event that a single thread executes a
       * choice of multiple gets on the same variable.
       *)
      fun relayMsg (readQ, msg) = 
	 case (cleanAndDeque readQ) of
	    NONE => S.atomicEnd()
	  | SOME (txid, t) => 
	       S.readyAndSwitch
	       (fn () =>
		(TransID.force txid
		 ; (t, msg)))

      (** G-variables **)
      (* Generalized synchronized variables,
       * to factor out the common operations. 
       *)

      fun gPut (name, CELL {prio, readQ, value}, x) = 
	 let
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name])
	    val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
	    val () = S.atomicBegin()
	    val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
	    val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
	    val () = 
	       case !value of
		  NONE => 
		     let
			val () = debug (fn () => concat [name, "(3.1.1)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.1.1)"], SOME 1)
			val () = value := SOME x
			val () = 
			   case cleanAndDeque readQ of
			      NONE => S.atomicEnd ()
			    | SOME (rtxid, rt) =>
				 S.readyAndSwitch
				 (fn () =>
				  (prio := 1
				   ; TransID.force rtxid
				   ; (rt, x)))
			val () = debug (fn () => concat [name, "(3.1.2)"]) (* NonAtomic *)
			val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.1.2)"])
		     in
			()
		     end
		| SOME _ => 
		     let
			val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
			val () = S.atomicEnd ()
			val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
			val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
		     in 
			raise Put
		     end
	    val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
	 in
	    ()
	 end

      (* Swap the current contents of the cell with a new value;
       * it is guaranteed to be atomic.
       *)
      fun gSwap (name, doSwap, CELL {prio, readQ, value}) = 
	 let
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""])
	    val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
	    val () = S.atomicBegin()
	    val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
	    val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
	    val msg =
	       case !value of
		  NONE => 
		     let
			val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
			val msg = 
			   S.atomicSwitchToNext
			   (fn rt => enqueAndClean (readQ, (TransID.mkTxId (), rt)))
			val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
			val () = doSwap value
			val () = relayMsg (readQ, msg)
			val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
			val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
		     in
			msg
		     end
		| SOME x => 
		     let
			val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
			val () = prio := 1
			val () = doSwap value
			val () = S.atomicEnd ()
			val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
			val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
		     in
			x
		     end
	    val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
	 in
	    msg
	 end

      fun gSwapEvt (name, doSwap, CELL{prio, readQ, value}) = 
	 let
	    fun doitFn () =
	       let
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".doitFn"], NONE)
		  val x = valOf (!value)
		  val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
		  val () = prio := 1
		  val () = doSwap value
		  val () = S.atomicEnd ()
		  val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
		  val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
	       in
		  x
	       end
	    fun blockFn {transId, cleanUp, next} = 
	       let
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".blockFn"], NONE)
		  val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
		  val msg = 
		     S.atomicSwitch
		     (fn rt =>
		      (enqueAndClean (readQ, (transId, rt))
		       ; (next (), ())))
		  val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
		  val () = cleanUp ()
		  val () = doSwap value
		  val () = relayMsg (readQ, msg)
		  val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
		  val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
	       in
		  msg
	       end
	    fun pollFn () =
	       let
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, ".pollFn"], NONE)
		  val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
		  val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
	       in
		  case !value of
		     NONE => E.blocked blockFn
		   | SOME _ => E.enabled {prio = bumpPriority prio,
					  doitFn = doitFn}
	       end
	 in
	    E.bevt pollFn
	 end

      fun gSwapPoll (name, doSwap, CELL{prio, value, ...}) = 
	 let
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, ""])
	    val () = debug (fn () => concat [name, "(1)"]) (* NonAtomic *)
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(1)"])
	    val () = S.atomicBegin()
	    val () = debug (fn () => concat [name, "(2)"]) (* Atomic 1 *)
	    val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(2)"], SOME 1)
	    val msg =
	       case !value of
		  NONE => 
		     let
			val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
			val msg = NONE
			val () = debug (fn () => concat [name, "(3.2.2)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"], SOME 1)
			val () = S.atomicEnd ()
			val () = debug (fn () => concat [name, "(3.2.3)"]) (* NonAtomic *)
			val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.3)"])
		     in
			msg
		     end
		| SOME x => 
		     let
			val () = debug (fn () => concat [name, "(3.2.1)"]) (* Atomic 1 *)
			val () = Assert.assertAtomic (fn () => concat ["SyncVar.", name, "(3.2.1)"], SOME 1)
			val () = prio := 1
			val () = doSwap value
			val () = S.atomicEnd ()
			val () = debug (fn () => concat [name, "(3.2.2)"]) (* NonAtomic *)
			val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(3.2.2)"])
		     in
			SOME x
		     end
	    val () = debug (fn () => concat [name, "(4)"]) (* NonAtomic *)
	    val () = Assert.assertNonAtomic (fn () => concat ["SyncVar.", name, "(4)"])
	 in
	    msg
	 end


      (** I-variables **)

      val iVar = newCell
      val sameIVar = sameCell
	 
      fun iPut (cell, x) = gPut ("iPut", cell, x)
      local fun doGetSwap _ = ()
      in
	 fun iGet cell = gSwap ("iGet", doGetSwap, cell)
	 fun iGetEvt cell = gSwapEvt ("iGetEvt", doGetSwap, cell)
	 fun iGetPoll cell = gSwapPoll ("iGetPoll", doGetSwap, cell)
      end

      (** M-variables **)

      val mVar = newCell
      fun mVarInit x = CELL {prio = ref 0, readQ = Q.new(), value = ref (SOME x)}
      val sameMVar = sameCell

      fun mPut (cell, x) = gPut ("mPut", cell, x)
      local fun doTakeSwap value = value := NONE
      in
	 fun mTake cell = gSwap ("mTake", doTakeSwap, cell)
	 fun mTakeEvt cell = gSwapEvt ("mTakeEvt", doTakeSwap, cell)
	 fun mTakePoll cell = gSwapPoll ("mTakePoll", doTakeSwap, cell)
      end
      local fun doGetSwap _ = ()
      in
	 fun mGet cell = gSwap ("mGet", doGetSwap, cell)
	 fun mGetEvt cell = gSwapEvt ("mGetEvt", doGetSwap, cell)
	 fun mGetPoll cell = gSwapPoll ("mGetPoll", doGetSwap, cell)
      end
      local fun doSwapSwap x value = value := SOME x
      in
	 fun mSwap (cell, x) = gSwap ("mSwap", doSwapSwap x, cell)
	 fun mSwapEvt (cell, x) = gSwapEvt ("mSwap", doSwapSwap x, cell)
      end
   end



1.1                  mlton/lib/cml/core-cml/thread-id.sig

Index: thread-id.sig
===================================================================
(* thread-id.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* threads-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

type word = Word.word
signature THREAD_ID =
   sig
      datatype thread_id = datatype RepTypes.thread_id

      val sameTid    : (thread_id * thread_id) -> bool
      val compareTid : (thread_id * thread_id) -> order
      val hashTid    : thread_id -> word

      val tidToString : thread_id -> string
   end

signature THREAD_ID_EXTRA =
   sig
      include THREAD_ID
      val new : unit -> thread_id
      val bogus : string -> thread_id

      val mark     : thread_id -> unit
      val unmark   : thread_id -> unit
      val isMarked : thread_id -> bool

      val reset : unit -> unit
   end



1.1                  mlton/lib/cml/core-cml/thread-id.sml

Index: thread-id.sml
===================================================================
(* thread.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* thread.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure ThreadID : THREAD_ID_EXTRA =
   struct
      structure Assert = LocalAssert(val assert = true)

      structure R = RepTypes


      datatype thread_id = datatype R.thread_id

      fun sameTid (TID{id=a, ...}, TID{id=b, ...}) = a = b
      fun compareTid (TID{id=a, ...}, TID{id=b, ...}) = Int.compare (a, b)
      fun hashTid (TID{id, ...}) = Word.fromInt id

      fun tidToString (TID{id, ...}) =
	 concat["[", StringCvt.padLeft #"0" 6 (Int.toString id), "]"]

      fun exnHandler (_ : exn) = ()
      val defaultExnHandler = ref exnHandler

      fun new' n =
	 TID {id = n,
	      alert = ref false,
	      done_comm = ref false,
	      exnHandler = ref (!defaultExnHandler),
	      props = ref [],
	      dead = CVar.new ()}

      local
	 val tidCounter = ref 0
      in
	 fun new () =
	    let 
	       val _ = Assert.assertAtomic' ("ThreadID.newTid", NONE)
	       val n = !tidCounter
	       val _ = tidCounter := n + 1
	    in
	       new' n
	    end

	 fun reset () = tidCounter := 0
      end

      fun bogus s =
	 let val n = CharVector.foldr (fn (c, n) => 2 * n - Char.ord c) 0 s
	 in new' n
	 end

      fun mark (TID{done_comm, ...}) = 
	 (Assert.assertAtomic' ("ThreadID.mark", NONE)
	  ; done_comm := true)
      fun unmark (TID{done_comm, ...}) = 
	 (Assert.assertAtomic' ("ThreadID.unmark", NONE)
	  ; done_comm := false)
      fun isMarked (TID{done_comm, ...}) = !done_comm
   end



1.1                  mlton/lib/cml/core-cml/thread.sig

Index: thread.sig
===================================================================
(* thread.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* threads-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature THREAD =
  sig
     include THREAD_ID
     val getTid : unit -> thread_id

     val spawnc : ('a -> unit) -> 'a -> thread_id
     val spawn  : (unit -> unit) -> thread_id
     val exit   : unit -> 'a
     val yield  : unit -> unit	(* mostly for benchmarking *)

     val joinEvt : thread_id -> unit Event.event

     (* thread-local data *)
     val newThreadProp : (unit -> 'a) -> 
	{
	 clrFn : unit -> unit,	        (* clear's current thread's property *)
	 getFn : unit -> 'a,		(* get current thread's property; if *)
	                                (* the property is not defined, then *)
	                                (* it sets it using the initialization *)
					(* function. *)
	 peekFn : unit -> 'a option,	(* return the property's value, if any *)
	 setFn : 'a -> unit		(* set the property's value for the *)
					(* current thread. *)
	 }
     val newThreadFlag : unit -> {getFn : unit -> bool, setFn : bool -> unit}
  end




1.1                  mlton/lib/cml/core-cml/thread.sml

Index: thread.sml
===================================================================
(* thread.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* thread.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure Thread : THREAD =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)

      structure S = Scheduler
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)

      open ThreadID

      fun generalExit (tid', clr') =
	 let
	    val () = Assert.assertNonAtomic' "Thread.generalExit"
	    val () = debug' "generalExit" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Thread.generalExit"
	 in
	    S.switchToNext
	    (fn t =>
	     let
		val tid as TID {dead, props, ...} = S.getThreadId t
		val () = Assert.assert ([], fn () => 
					concat ["Thread.generalExit ",
						Option.getOpt (Option.map tidToString tid', "NONE"), 
						" <> ",
						tidToString tid], fn () =>
					 case tid' of NONE => true
				          | SOME tid' => sameTid (tid', tid))
		val () = if clr' then props := [] else ()
		val () = Event.atomicCVarSet dead
	     in
		()
	     end)
	 end

      fun doHandler (TID {exnHandler, ...}, exn) =
	 (debug (fn () => concat ["Exception: ", exnName exn, " : ", exnMessage exn])
	  ; ((!exnHandler) exn) handle _ => ())

      fun spawnc f x = 
	 let
	    val () = S.atomicBegin ()
	    fun thread tid () = 
	       ((f x) handle ex => doHandler (tid, ex)
		; generalExit (SOME tid, false))
	    val t = S.new thread
	    val tid = S.getThreadId t
	    val _ = S.ready t
	    val () = S.atomicEnd ()
	    val () = debug (fn () => concat ["spawnc ", tidToString tid])  (* NonAtomic *)
	 in
	    tid
	 end
      fun spawn f = spawnc f ()

      fun joinEvt (TID{dead, ...}) = Event.cvarGetEvt dead

      val getTid = S.getCurThreadId

      fun exit () = 
	 let
	    val () = Assert.assertNonAtomic' "Thread.exit"
	    val () = debug' "exit" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Thread.exit"
	 in
	    generalExit (NONE, true)
	 end

      fun yield () = 
	 let
	    val () = Assert.assertNonAtomic' "Thread.exit"
	    val () = debug' "yield" (* NonAtomic *)
	    val () = Assert.assertNonAtomic' "Thread.yield"
	 in
	    S.readyAndSwitchToNext (fn () => ())
	 end

      (* thread-local data *)
      local
	 fun mkProp () = 
	    let
	       exception E of 'a 
	       fun cons (a, l) = E a :: l 
	       fun peek [] = NONE
		 | peek (E a :: _) = SOME a
		 | peek (_ :: l) = peek l
	       fun delete [] = []
		 | delete (E _ :: r) = r
		 | delete (x :: r) = x :: delete r
	    in
	       {cons = cons, 
		peek = peek, 
		delete = delete}
	    end
	 fun mkFlag () = 
	    let
	       exception E
	       fun peek [] = false
		 | peek (E :: _) = true
		 | peek (_ :: l) = peek l
	       fun set (l, flg) = 
		  let
		     fun set ([], _) = if flg then E::l else l
		       | set (E::r, xs) = if flg then l else List.revAppend(xs, r)
		       | set (x::r, xs) = set (r, x::xs)
		  in
		     set (l, [])
		  end
	    in
	       {set = set, 
		peek = peek}
	    end
	 fun getProps () = 
	    let val TID {props, ...} = getTid () 
	    in props 
	    end
      in
	 fun newThreadProp (init : unit -> 'b) = 
	    let
	       val {peek, cons, delete} = mkProp() 
	       fun peekFn () = peek(!(getProps()))
	       fun getF () = 
		  let val h = getProps()
		  in
		     case peek(!h) of 
			NONE => let val b = init() 
				in h := cons(b, !h); b 
				end
		      | (SOME b) => b
		  end
	       fun clrF () = 
		  let val h = getProps()
		  in h := delete(!h)
		  end
	       fun setFn x = 
		  let val h = getProps()
		  in h := cons(x, delete(!h))
		  end
	    in
	       {peekFn = peekFn, 
		getFn = getF, 
		clrFn = clrF, 
		setFn = setFn}
	    end

	 fun newThreadFlag () = 
	    let
	       val {peek, set} = mkFlag() 
	       fun getF ()= peek(!(getProps()))
	       fun setF flg = 
		  let val h = getProps()
		  in h := set(!h, flg)
		  end
	    in
	       {getFn = getF, 
		setFn = setF}
	    end
      end
   end



1.1                  mlton/lib/cml/core-cml/timeout.sig

Index: timeout.sig
===================================================================
(* timeout.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* timeout-sig.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Exported interface for timeout synchronization.
 *)

signature TIME_OUT =
   sig
      val timeOutEvt : Time.time -> unit Event.event
      val atTimeEvt  : Time.time -> unit Event.event
   end

signature TIME_OUT_EXTRA =
   sig
      include TIME_OUT

      val reset : unit -> unit
      (* preepmt () == NONE  ==>  no waiting threads 
       * preepmt () == SOME NONE  ==>  enqueued a waiting thread
       * preepmt () == SOME (SOME t)  ==>  a waiting thread will be ready in t time
       *)
      val preempt : unit -> Time.time option option
   end



1.1                  mlton/lib/cml/core-cml/timeout.sml

Index: timeout.sml
===================================================================
(* timeout.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* timeout.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Events for synchronizing on timeouts.
 *)

structure TimeOut : TIME_OUT_EXTRA =
   struct
      structure Assert = LocalAssert(val assert = true)
      structure Debug = LocalDebug(val debug = false)

      structure S = Scheduler
      structure E = Event
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)

      datatype trans_id = datatype TransID.trans_id
      datatype trans_id_state = datatype TransID.trans_id_state


      (* this holds an approximation of the current time of day.  It is
       * cleared at each pre-emption, and initialized on demand (by getTime).
       *)
      val clock = ref NONE

      (* returns an approximation of the current time of day
       * (this is at least as accurate as the time quantum).
       *)
      fun getTime () = 
	 case !clock of
	    NONE => let val t = Time.now()
		    in clock := SOME t;  t
		    end
	  | SOME t => t
      fun preemptTime () = clock := NONE

      (* The queue of threads waiting for timeouts.
       * It is sorted in increasing order of time value.
       *)
      structure TQ = FunPriorityQueue(structure Key = struct open Time type t = time end)
      type item = trans_id * (unit -> unit) * S.rdy_thread
      val timeQ : item TQ.t ref = ref (TQ.new ())

      fun cleaner readied elt =
	 let 
	    val now = getTime ()
	    val (TXID txst, cleanUp, t) = TQ.Elt.value elt
	 in 
	    case !txst of 
	       CANCEL => true 
	     | _ => if Time.<=(TQ.Elt.key elt, now)
		       then (readied ()
			     ; S.ready t
			     ; cleanUp ()
			     ; true)
		       else false
	 end
 
      fun timeWait (time, txid, cleanUp, t) = 
	 (Assert.assertAtomic' ("TimeOut.timeWait", NONE)
	  ; timeQ := TQ.enqueAndClean(!timeQ, time, (txid, cleanUp, t), cleaner (fn () => ())))

      (** NOTE: unlike for most base events, the block functions of time-out
       ** events do not have to exit the atomic region or execute the clean-up
       ** operation.  This is done when they are removed from the waiting queue.
       **)
      fun timeOutEvt time = 
	 let
	    fun blockFn {transId, cleanUp, next} = 
	       let
		  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.blockFn", NONE)
		  val () = debug' "timeOutEvt(3.2.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(3.2.1)", SOME 1)
		  val () =
		     S.atomicSwitch
		     (fn t =>
		      (timeWait (Time.+(time, getTime ()), transId, cleanUp, t)
		       ; (next (), ())))
		  val () = debug' "timeOutEvt(3.2.3)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "TimeOut.timeOutEvt(3.2.3)"
	       in
		  ()
	       end
	    fun pollFn () = 
	       let
		  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.pollFn", NONE)
		  val () = debug' "timeOutEvt(2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(2)", SOME 1)
	       in
		  if Time.<=(time, Time.zeroTime)
		     then E.enabled {prio = ~1, doitFn = S.atomicEnd}
		     else E.blocked blockFn
	       end
	 in
	    E.bevt pollFn
	 end
       
      fun atTimeEvt time = 
	 let
	    fun blockFn {transId, cleanUp, next} = 
	       let
		  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.blockFn", NONE)
		  val () = debug' "atTimeEvt(3.2.1)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(3.2.1)", SOME 1)
		  val () =
		     S.atomicSwitch
		     (fn t =>
		      (timeWait (time, transId, cleanUp, t)
		       ; (next (), ())))
		  val () = debug' "atTimeEvt(3.2.3)" (* NonAtomic *)
		  val () = Assert.assertNonAtomic' "TimeOut.atTimeEvt(3.2.3)"
	       in
		  ()
	       end
	    fun pollFn () = 
	       let
		  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.pollFn", NONE)
		  val () = debug' "atTimeEvt(2)" (* Atomic 1 *)
		  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(2)", SOME 1)
	       in
		  if Time.<=(time, getTime())
		     then E.enabled {prio = ~1, doitFn = S.atomicEnd}
		     else E.blocked blockFn
	       end
	 in
	    E.bevt pollFn
	 end

      (* reset various pieces of state *)
      fun reset () = timeQ := TQ.new ()
      (* what to do at a preemption *)
      fun preempt () : Time.time option option = 
	 let 
	    val () = Assert.assertAtomic' ("TimeOut.preempt", NONE)
	    val () = debug' "TimeOut.preempt" (* Atomic 1 *)
	    val () = Assert.assertAtomic' ("TimeOut.preempt", SOME 1)
	    val () = preemptTime ()
	    val timeQ' = !timeQ
	 in
	    if TQ.empty timeQ'
	       then NONE
	       else let
		       val readied = ref false
		       val timeQ' = TQ.clean (timeQ', cleaner (fn () => readied := true))
		       val () = timeQ := timeQ'
		    in
		       if !readied
			  then SOME NONE
			  else case TQ.peek timeQ' of
			          NONE => NONE
				| SOME elt => SOME(SOME(Time.-(TQ.Elt.key elt, getTime ())))
		    end
	 end
   end



1.1                  mlton/lib/cml/core-cml/trans-id.sig

Index: trans-id.sig
===================================================================
(* trans-id.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* ???
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature TRANS_ID =
   sig
      datatype trans_id = datatype RepTypes.trans_id
      datatype trans_id_state = datatype RepTypes.trans_id_state

      (* create a new transaction ID. *)
      val mkTxId : unit -> trans_id
      (* create a transaction flag (ID and cleanUp). *)
      val mkFlg  : unit -> (trans_id * (unit -> unit))
      (* given a transaction ID, mark it cancelled. *)
      val force : trans_id -> unit

      val toString : trans_id -> string
   end



1.1                  mlton/lib/cml/core-cml/trans-id.sml

Index: trans-id.sml
===================================================================
(* trans-id.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* ???
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

structure TransID : TRANS_ID =
   struct 
      structure Assert = LocalAssert(val assert = true)

      structure R = RepTypes
 

      (* Transaction IDs are used to mark blocked threads in the various waiting
       * queues.  They are "cancelled" when some other event is selected.
       *)
      datatype trans_id = datatype R.trans_id
      datatype trans_id_state = datatype R.trans_id_state

      (* create a new transaction ID. *)
      fun mkTxId () = TXID(ref TRANS)

      (* create a transaction flag (ID and cleanUp). *)
      fun mkFlg () =
	 let 
	    val txid as TXID txst = mkTxId ()
	    val cleanUp = fn () =>
	       (Assert.assertAtomic' ("TransID.mkFlg.cleanUp", NONE)
		; txst := CANCEL)
	 in 
	    (txid, cleanUp)
	 end

      (* given a transaction ID, mark it cancelled. *)
      fun force (TXID txst) =
	 (Assert.assertAtomic' ("TransID.force", NONE)
	  ; case !txst of
	       TRANS => txst := CANCEL
	     | CANCEL => raise Fail "TransID.force")

      fun toString (TXID txst) =
	 case !txst of
	    TRANS => "TRANS"
	  | CANCEL => "CANCEL"
   end



1.1                  mlton/lib/cml/core-cml/version.sig

Index: version.sig
===================================================================
(* version.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* ???
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *)

signature VERSION =
   sig
      val version : {system : string, version_id : int list, date : string}
      val banner : string
   end


1.1                  mlton/lib/cml/core-cml/version.sml

Index: version.sml
===================================================================
(* version.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* version.sml
 *
 * COPYRIGHT (c) 1996 AT&T Research.
 *)

structure Version : VERSION = 
   struct
      val version = {
		     system = "Concurrent ML (MLton)",
		     version_id = [1, 0, 10],
		     date = "March, 2004"
		     }

      fun f ([], l) = l
	| f ([x], l) = (Int.toString x)::l
	| f (x::r, l) = (Int.toString x) :: "." :: f(r, l)

      val banner = 
	 concat (
		 #system version :: 
		 ", Version " ::
		 f (#version_id version, [", ", #date version])
		 )
   end




1.1                  mlton/lib/cml/tests/exit.cm

Index: exit.cm
===================================================================
Group is
  ../cml.cm
  exit.sml 
  run-main.sml


1.1                  mlton/lib/cml/tests/exit.sml

Index: exit.sml
===================================================================

structure Main =
struct
   open CML

   fun doit _ =
      CML.exit ()
end



1.1                  mlton/lib/cml/tests/primes-multicast.cm

Index: primes-multicast.cm
===================================================================
Group is
  ../cml.cm
  primes-multicast.sml 
  run-main.sml


1.1                  mlton/lib/cml/tests/primes-multicast.sml

Index: primes-multicast.sml
===================================================================

structure Main =
struct
   open CML
   structure MC = Multicast

   fun makeNatStream c =
      let
	 val mch = MC.mChannel ()
	 fun count i = (MC.multicast(mch, i)
			; count(i+1))
	 val _ = spawn (fn () => 
			(print (concat ["makeNatStream: ", 
					tidToString (getTid ()), 
					"\n"])
			 ; count c))
      in
	 mch
      end
   
   fun makeFilter (p, inMCh) =
      let
	 val inP = MC.port inMCh
	 val outMCh = MC.mChannel ()
	 fun loop () =
	    let
	       val i = sync (MC.recvEvt inP)
	    in
	       if ((i mod p) <> 0) 
		  then MC.multicast(outMCh, i)
		  else ()
	       ; loop ()
	    end
	 val _ = spawn loop
      in
	 outMCh
      end
   
   fun makePrimes () =
      let
	 val primes = MC.mChannel ()
	 fun head mch =
	    let
	       val p = MC.recv (MC.port mch)
	    in
	       MC.multicast(primes, p)
	       ; head (makeFilter (p, mch))
	    end
	 val _ = spawn (fn () => 
			(print (concat ["makePrimes: ", 
					tidToString (getTid ()), 
					"\n"])
			 ; head (makeNatStream 2)))
      in
	 primes
      end
   
   fun makeNatPrinter mch n =
      let
	 val p = MC.port mch
	 fun loop i =
	    if i > n then RunCML.shutdown OS.Process.success
	       else let
		       val m = MC.recv p
		       val m' = Int.toString m
		       fun loop' j =
			  if j > m then ()
			  else (print (m' ^ "\n")
				; loop' (j + 1))
		    in
		       loop' m
		       ; loop (i + 1)
		    end
	 val _ = spawn (fn () => 
			(print (concat ["makeNatPrinter: ", 
					tidToString (getTid ()), 
					"\n"])
			 ; loop 0))
      in
	 ()
      end

   fun doit' n =
      RunCML.doit
      (fn () =>
       let
	  val mch = makePrimes ()
	  val _ = makeNatPrinter mch n
       in
	  ()
       end,
       SOME (Time.fromMilliseconds 10))

   fun doit n =
      let
	 val x = doit' n
      in
	 x
      end
end



1.1                  mlton/lib/cml/tests/primes.cm

Index: primes.cm
===================================================================
Group is
  ../cml.cm
  primes.sml 
  run-main.sml


1.1                  mlton/lib/cml/tests/primes.sml

Index: primes.sml
===================================================================

structure Main =
struct
   open CML

   fun makeNatStream c =
      let
	 val ch = channel ()
	 fun count i = (send(ch, i)
			; count(i+1))
	 val _ = spawn (fn () => 
			(print (concat ["makeNatStream: ", 
					tidToString (getTid ()), 
					"\n"])
			 ; count c))
      in
	 ch
      end
   
   fun makeFilter (p, inCh) =
      let
	 val outCh = channel ()
	 fun loop () =
	    let
	       val i = sync (recvEvt inCh)
	    in
	       if ((i mod p) <> 0) 
		  then sync (sendEvt (outCh, i))
		  else ()
	       ; loop ()
	    end
	 val _ = spawn loop
      in
	 outCh
      end
   
   fun makePrimes () =
      let
	 val primes = channel ()
	 fun head ch =
	    let val p = recv ch
	    in
	       send(primes, p)
	       ; head (makeFilter (p, ch))
	    end
	 val _ = spawn (fn () => 
			(print (concat ["makePrimes: ", 
					tidToString (getTid ()), 
					"\n"])
			 ; head (makeNatStream 2)))
      in
	 primes
      end
   
   fun makeNatPrinter ch n =
      let
	 fun loop i =
	    if i > n then RunCML.shutdown OS.Process.success
	       else let
		       val m = recv ch
		       val m' = Int.toString m
		       fun loop' j =
			  if j > m then ()
			  else (print (m' ^ "\n")
				; loop' (j + 1))
		    in
		       loop' m
		       ; loop (i + 1)
		    end
	 val _ = spawn (fn () => 
			(print (concat ["makeNatPrinter: ", 
					tidToString (getTid ()), 
					"\n"])
			 ; loop 0))
      in
	 ()
      end

   fun doit' n =
      RunCML.doit
      (fn () =>
       let
	  val ch = makePrimes ()
	  val _ = makeNatPrinter ch n
       in
	  ()
       end,
       SOME (Time.fromMilliseconds 10))

   fun doit n =
      let
	 val x = doit' n
      in
	 x
      end
end



1.1                  mlton/lib/cml/tests/run-main.sml

Index: run-main.sml
===================================================================
val n =
   case CommandLine.arguments () of
      [] => 100
    | s::_ => (case Int.fromString s of
		  NONE => 100
		| SOME n => n)

val ts = Time.now ()
val _ = Main.doit n
val te = Time.now ()
val d = Time.-(te, ts)
val _ = TextIO.print (concat ["Time start: ", Time.toString ts, "\n"])
val _ = TextIO.print (concat ["Time end:   ", Time.toString te, "\n"])
val _ = TextIO.print (concat ["Time diff:  ", LargeInt.toString (Time.toMilliseconds d), "ms\n"])



1.1                  mlton/lib/cml/tests/timeout.cm

Index: timeout.cm
===================================================================
Group is
  ../cml.cm
  timeout.sml 
  run-main.sml


1.1                  mlton/lib/cml/tests/timeout.sml

Index: timeout.sml
===================================================================

structure Main =
struct
   open CML

   fun doit' n =
      RunCML.doit
      (fn () =>
       let
	  fun make m () =
	     (print (concat ["make: ", Int.toString m, " ",
			     tidToString (getTid ()), "\n"])
	      ; sync (timeOutEvt (Time.fromSeconds (Int.toLarge m)))
	      ; print (concat ["finish: ", Int.toString m, " ",
			       tidToString (getTid ()), "\n"]))
	  fun loop m =
	     if m <= 0
		then ()
		else let
			val _ = spawn (make m)
		     in
			loop (m - 10)
		     end
       in
	  loop n
       end,
       SOME (Time.fromMilliseconds 10))

   fun doit n =
      let
	 val x = doit' n
      in
	 x
      end
end



1.1                  mlton/lib/cml/util/assert.sig

Index: assert.sig
===================================================================
(* assert.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

signature ASSERT =
   sig
      val assert: (unit -> string) list * (unit -> string) * (unit -> bool) -> unit
      val assert': string * (unit -> bool) -> unit
      val assertAtomic: (unit -> string) * int option -> unit
      val assertNonAtomic: (unit -> string) -> unit
      val assertAtomic': string * int option -> unit
      val assertNonAtomic': string -> unit
   end


1.1                  mlton/lib/cml/util/assert.sml

Index: assert.sml
===================================================================
(* assert.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

structure Assert: ASSERT =
   struct
      structure C = Critical
      val assertFlg = true
      
      fun fail msg =
	 (C.atomicBegin ();
	  TextIO.print (concat ["ASSERT: ", msg, "\n"]);
	  OS.Process.exit OS.Process.failure)

      fun assert (msgs: (unit -> string) list, 
		  msg: unit -> string, 
		  f: unit -> bool): unit =
	 if assertFlg andalso not (f () handle _ => false)
	    then let
		    val msgs = List.map (fn f => f ()) msgs
		    val msg = concat [String.concatWith " " msgs, " :: ", msg ()]
		 in
		    fail msg
		 end
	    else ()
      fun assert' (msg: string, f: unit -> bool): unit =
	 assert ([], fn () => msg, f)

      datatype z = datatype MLton.Thread.AtomicState.t
      fun assertAtomic (msg: unit -> string, n: int option): unit =
	 assert ([C.atomicMsg], msg, fn () =>
		 case MLton.Thread.atomicState () of
		    Atomic m => (case n of NONE => true | SOME n => n = m)
		  | NonAtomic => false)
      fun assertNonAtomic (msg: unit -> string): unit =
	 assert ([C.atomicMsg], msg, fn () =>
		 case MLton.Thread.atomicState () of
		    Atomic _ => false
		  | NonAtomic => true)
      fun assertAtomic' (msg, n) = assertAtomic (fn () => msg, n)
      fun assertNonAtomic' msg = assertNonAtomic (fn () => msg)
   end



1.1                  mlton/lib/cml/util/critical.sig

Index: critical.sig
===================================================================
(* critical.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

signature CRITICAL =
   sig
      val atomicBegin : unit -> unit
      val atomicEnd : unit -> unit
      val atomicMsg : unit -> string
      val doAtomic : (unit -> unit) -> unit
      
      val maskBegin : unit -> unit
      val maskEnd : unit -> unit
      val doMasked : (unit -> unit) -> unit
   end


1.1                  mlton/lib/cml/util/critical.sml

Index: critical.sml
===================================================================
(* critical.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

structure Critical : CRITICAL =
   struct
      structure Thread = MLton.Thread
      structure AtomicState = MLton.Thread.AtomicState
      structure Signal = MLton.Signal
      structure Itimer = MLton.Itimer

      val atomicBegin = Thread.atomicBegin
      val atomicEnd = Thread.atomicEnd
      local datatype z = datatype Thread.AtomicState.t
      in
	 fun atomicMsg () =
	    case Thread.atomicState () of
	       AtomicState.NonAtomic => "[NonAtomic]"
	     | AtomicState.Atomic n => concat ["[ Atomic ", Int.toString n, "]"]
      end
      fun doAtomic f = (atomicBegin (); f (); atomicEnd ())

      val mask = Signal.Mask.some [Itimer.signal Itimer.Real]
      fun maskBegin () = Signal.Mask.block mask
      fun maskEnd () = Signal.Mask.unblock mask
      fun doMasked f = (maskBegin (); f (); maskEnd ())
   end



1.1                  mlton/lib/cml/util/debug.sig

Index: debug.sig
===================================================================
(* debug.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* debug.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Debugging support for the CML core.
 *)

signature DEBUG =
   sig
      val sayDebug  : (unit -> string) list * (unit -> string) -> unit
      val sayDebug' : string -> unit
   end



1.1                  mlton/lib/cml/util/debug.sml

Index: debug.sml
===================================================================
(* debug.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* debug.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Debugging support for the CML core.
 *)

structure Debug : DEBUG =
   struct
      structure C = Critical
      val debugFlg = true

      fun sayDebug (msgs: (unit -> string) list, 
		    msg: unit -> string) =
	 if debugFlg
	    then let
		    val msgs = List.map (fn f => f ()) msgs
		    val msg = concat [String.concatWith " " msgs, " :: ", msg ()]
		 in
		    C.atomicBegin ();
		    TextIO.print (concat [msg, "\n"]);
		    C.atomicEnd ()
		 end
	    else ()
      fun sayDebug' (msg: string) = sayDebug ([], fn () => msg)
   end



1.1                  mlton/lib/cml/util/fun-priority-queue.fun

Index: fun-priority-queue.fun
===================================================================
(* fun-queue.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

functor FunPriorityQueue(S: FUN_PRIORITY_QUEUE_ARG) : 
   FUN_PRIORITY_QUEUE where type Key.t = S.Key.t =
   struct
      open S

      structure Elt =
	 struct
	    datatype 'a t = T of Key.t * 'a
	    fun key (T (k, _)) = k
	    fun value (T (_, v)) = v
	 end

      datatype 'a t = T of 'a Elt.t list

      local
	 fun filterPrefix (xs, p) =
	    case xs of
	       [] => []
	     | y::ys => if p y
			  then filterPrefix (ys, p)
			  else xs
	 fun filter (xs, p) = List.filter (not o p) xs
      in
	 fun cleanPrefix (T xs, p) = T (filterPrefix (xs, p))
	 fun clean (T xs, p) = T (filter (xs, p))
      end

      fun deque (T xs) =
	 (case xs of
	     [] => NONE
	   | x::xs => SOME (x, T xs))

      fun cleanAndDeque (q, p) =
	 let
	    val q = clean (q, p)
	 in
	    case deque q of
	       NONE => (NONE, q)
	     | SOME (x, q) => (SOME x, q)
	 end

      fun empty (T xs) = 
	 (case xs of 
	     [] => true
	   | _ => false)

      fun enque (T xs, k', v') =
	 let
	    val x' = Elt.T (k', v')
	    fun loop (xs, ys) =
	       case xs of
		  [] => List.revAppend(ys, [x'])
		| (z as Elt.T (k, _))::zs => 
		     (case Key.compare (k, k') of
			 GREATER => List.revAppend(ys, x'::xs)
		       | _ => loop(zs, z::ys))
	 in
	    T (loop (xs, []))
	 end

      fun enqueAndClean (q, k, v, p) =
	 clean (enque (q, k, v), p)

      fun new () = T []

      fun peek (T xs) =
	 (case xs of
	     [] => NONE
	   | elt::_ => SOME elt)
   end



1.1                  mlton/lib/cml/util/fun-priority-queue.sig

Index: fun-priority-queue.sig
===================================================================
(* fun-prio-queue.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

signature FUN_PRIORITY_QUEUE_ARG =
   sig
      structure Key :
	 sig
	    type t
	    val compare : t * t -> order
	 end
   end

signature FUN_PRIORITY_QUEUE =
   sig
      include FUN_PRIORITY_QUEUE_ARG

      structure Elt:
	 sig
	    type 'a t
	    val key: 'a t -> Key.t
	    val value: 'a t -> 'a
	 end

      type 'a t

      val clean: 'a t * ('a Elt.t -> bool) -> 'a t
      val cleanAndDeque: 'a t * ('a Elt.t -> bool) -> 'a Elt.t option * 'a t
      val cleanPrefix: 'a t * ('a Elt.t -> bool) -> 'a t
      val deque: 'a t -> ('a Elt.t * 'a t) option
      val empty: 'a t -> bool
      val enque: 'a t * Key.t * 'a -> 'a t
      val enqueAndClean: 'a t * Key.t * 'a * ('a Elt.t -> bool) -> 'a t
      val new: unit -> 'a t
      val peek: 'a t -> 'a Elt.t option
   end



1.1                  mlton/lib/cml/util/fun-queue.sig

Index: fun-queue.sig
===================================================================
(* fun-queue.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

signature FUN_QUEUE =
   sig
      type 'a t

      val clean: 'a t * ('a -> bool) -> 'a t
      val cleanAndDeque: 'a t * ('a -> bool) -> 'a option * 'a t
      val cleanPrefix: 'a t * ('a -> bool) -> 'a t
      val deque: 'a t -> ('a * 'a t) option
      val empty: 'a t -> bool
      val enque: 'a t * 'a -> 'a t
      val enqueAndClean: 'a t * 'a * ('a -> bool) -> 'a t
      val new: unit -> 'a t
      val peek: 'a t -> 'a option
   end



1.1                  mlton/lib/cml/util/fun-queue.sml

Index: fun-queue.sml
===================================================================
(* fun-queue.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

structure FunQueue : FUN_QUEUE =
   struct
      datatype 'a t = T of {front: 'a list, back: 'a list}

      local
	 fun filterPrefix (xs, p) =
	    case xs of
	       [] => []
	     | y::ys => if p y
			  then filterPrefix (ys, p)
			  else xs
	 fun filter (xs, p) = List.filter (not o p) xs
	 fun filterRevAcc ((xs, zs), p) =
	    case xs of
	       [] => zs
	     | y::ys => if p y
			  then filterRevAcc ((ys, zs), p)
			  else filterRevAcc ((ys, y::zs), p)
	 fun filterRev (xs, p) = filterRevAcc ((xs, []), p)
      in
	 fun cleanPrefix (T {front, back}, p) =
	    (case filterPrefix (front, p) of
		[] => T {front = filterPrefix (List.rev(back), p),
			 back = []}
	      | front' =>  T {front = front',
			      back = back})
	 fun clean (T {front, back}, p) =
	    (case filter (front, p) of
		[] => T {front = filterRev (back, p),
			 back = []}
	      | front' =>  T {front = front',
			      back = filter (back, p)})
	 fun cleanAndDeque (T {front, back}, p) =
	    (case filter (front, p) of
		[] => (case filterRev(back, p) of
			  [] => (NONE,
				 T {front = [],
				    back = []})
			| x::front' => (SOME x,
					T {front = front',
					   back = []}))
	      | [x] => (SOME x,
			T {front = filterRev (back, p),
			   back = []})
	      | x::front' => (SOME x,
			      T {front = front',
				 back = filter (back, p)}))
      end

      fun deque (T {front, back}) =
	 (case front of
	     [] => (case back of
		       [] => NONE
		     | l => let val l = List.rev l
			    in 
			       case l of
				  [] => raise Fail "FunQueue.deque:impossible"
				| x::front' => 
				     SOME (x,
					   T {front = front',
					      back = []})
			    end)
	   | x::front' => SOME (x, T {front = front', back = back}))
			  
      fun empty (T {front, back}) =
	 (case front of
	     [] => (case back of
		       [] => true
		     | _ => false)
	   | _ => false)
	     
      fun enque (T {front, back, ...}, x) = 
	 T {front = front, back = x::back}

      fun enqueAndClean (q, y, p) =
	 clean (enque (q, y), p)

      fun new () = T {front = [], back = []}

      fun peek (T {front, back}) =
	 (case front of
	     [] => (case back of
		       [] => NONE
		     | l => let val l = List.rev l
			    in 
			       case l of
				  [] => raise Fail "FunQueue.peek:impossible"
				| x::_ => SOME x
			    end)
	   | x::_ => SOME x)
   end



1.1                  mlton/lib/cml/util/imp-queue.sig

Index: imp-queue.sig
===================================================================
(* imp-queue.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

signature IMP_QUEUE =
   sig
      type 'a t

      val clean: 'a t * ('a -> bool) -> unit
      val cleanAndDeque: 'a t * ('a -> bool) -> 'a option
      val cleanPrefix: 'a t * ('a -> bool) -> unit
      val deque: 'a t -> 'a option
      val empty: 'a t -> bool
      val enque: 'a t * 'a -> unit
      val enqueAndClean: 'a t * 'a * ('a -> bool) -> unit
      val new: unit -> 'a t
      val peek: 'a t -> 'a option
      val reset: 'a t -> unit
   end



1.1                  mlton/lib/cml/util/imp-queue.sml

Index: imp-queue.sml
===================================================================
(* imp-queue.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

structure ImpQueue : IMP_QUEUE =
   struct
      datatype 'a t = T of {front: 'a list ref, back: 'a list ref}

      local
	 fun filterPrefix (xs, p) =
	    case xs of
	       [] => []
	     | y::ys => if p y
			  then filterPrefix (ys, p)
			  else xs
	 fun filter (xs, p) = List.filter (not o p) xs
	 fun filterRevAcc ((xs, zs), p) =
	    case xs of
	       [] => zs
	     | y::ys => if p y
			  then filterRevAcc ((ys, zs), p)
			  else filterRevAcc ((ys, y::zs), p)
	 fun filterRev (xs, p) = filterRevAcc ((xs, []), p)
      in
	 fun cleanPrefix (T {front, back}, p) =
	    (Assert.assertAtomic' ("ImpQueue.cleanPrefix", NONE)
	     ; case filterPrefix (!front, p) of
	          [] => (front := filterPrefix (List.rev(!back), p)
			 ; back := [])
		| front' =>  front := front')
	 fun clean (T {front, back}, p) =
	    (Assert.assertAtomic' ("ImpQueue.clean", NONE)
	     ; case filter (!front, p) of
	          [] => (front := filterRev (!back, p)
			 ; back := [])
		| front' =>  (front := front'
			      ; back := filter (!back, p)))
	 fun cleanAndDeque (T {front, back}, p) =
	    (Assert.assertAtomic' ("ImpQueue.cleanAndDeque", NONE)
	     ; case filter (!front, p) of
	          [] => (case filterRev(!back, p) of
			    [] => (front := []
				   ; back := []
				   ; NONE)
			  | x::front' => (front := front'
					  ; back := []
					  ; SOME x))
		| [x] => (front := filterRev (!back, p)
			  ; back := []
			  ; SOME x)
		| x::front' => (front := front'
				; back := filter (!back, p)
				; SOME x))
      end

      fun deque (T {front, back}) =
	 (Assert.assertAtomic' ("ImpQueue.deque", NONE)
	  ; case !front of
	       [] => (case !back of
			 [] => NONE
		       | l => let val l = List.rev l
			      in case l of
				    [] => raise Fail "ImpQueue.deque:impossible"
				  | x :: front' => 
				       (front := front'
					; back := []
					; SOME x)
			      end)
	     | x::front' => (front := front'; SOME x))

      fun empty (T {front, back}) =
	 (Assert.assertAtomic' ("ImpQueue.empty", NONE)
	  ; case !front of
	       [] => (case !back of
			 [] => true
		       | _ => false)
	     | _ => false)
	     
      fun enque (T {back, ...}, x) = 
	 (Assert.assertAtomic' ("ImpQueue.enque", NONE)
	  ; back := x::(!back))

      fun enqueAndClean (q, y, p) =
	 (enque (q, y); clean (q, p))

      fun new () = T {front = ref [], back = ref []}

      fun peek (T {front, back}) =
	 (Assert.assertAtomic' ("ImpQueue.peek", NONE)
	  ; case !front of
	       [] => (case !back of
			 [] => NONE
		       | l => let val l = List.rev l
			      in case l of
				    [] => raise Fail "ImpQueue.peek:impossible"
				  | x::front' => 
				       (front := x::front'
					; back := []
					; SOME x)
			      end)
	     | x::_ => SOME x)

      fun reset (T {front, back}) =
	 (Assert.assertAtomic' ("ImpQueue.reset", NONE)
	  ; front := []
	  ; back := [])

(*
      val clean = fn arg => TimeIt.timeit "ImpQueue.clean" clean arg
      val cleanAndDeque = fn arg => TimeIt.timeit "ImpQueue.cleanAndDeque" cleanAndDeque arg
      val cleanPrefix = fn arg => TimeIt.timeit "ImpQueue.cleanPrefix" cleanPrefix arg
      val deque = fn arg => TimeIt.timeit "ImpQueue.deque" deque arg
      val empty = fn arg => TimeIt.timeit "ImpQueue.empty" empty arg
      val enque = fn arg => TimeIt.timeit "ImpQueue.enque" enque arg
      val enqueAndClean = fn arg => TimeIt.timeit "ImpQueue.enqueAndClean" enqueAndClean arg
      val new = fn arg => TimeIt.timeit "ImpQueue.new" new arg
      val peek = fn arg => TimeIt.timeit "ImpQueue.peek" peek arg
      val reset = fn arg => TimeIt.timeit "ImpQueue.reset" reset arg
*)
   end



1.1                  mlton/lib/cml/util/local-assert.fun

Index: local-assert.fun
===================================================================
(* local-assert.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

functor LocalAssert(val assert: bool): ASSERT =
   struct
      fun make f =
	 if assert then f else fn _ => ()
      val assert = make Assert.assert
      val assert' = make Assert.assert'
      val assertAtomic = make Assert.assertAtomic
      val assertNonAtomic = make Assert.assertNonAtomic
      val assertAtomic' = make Assert.assertAtomic'
      val assertNonAtomic' = make Assert.assertNonAtomic'
   end


1.1                  mlton/lib/cml/util/local-debug.fun

Index: local-debug.fun
===================================================================
(* local-debug.fun
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

functor LocalDebug(val debug: bool): DEBUG =
   struct
      fun make f =
	 if debug then f else fn _ => ()
      val sayDebug' = make Debug.sayDebug'
      val sayDebug = make Debug.sayDebug
   end


1.1                  mlton/lib/cml/util/timeit.sig

Index: timeit.sig
===================================================================
(* timeit.sig
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

signature TIMEIT =
   sig
      val timeit : string -> ('a -> 'b) -> 'a -> 'b
   end



1.1                  mlton/lib/cml/util/timeit.sml

Index: timeit.sml
===================================================================
(* timeit.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

structure TimeIt : TIMEIT =
   struct
      val timeitFlg = true

      fun timeit (name: string) (f: 'a -> 'b) (a: 'a) : 'b =
	 if timeitFlg 
	    then let
		    val start = Time.now ()
		    fun done () =
		       let
			  val finish = Time.now ()
			  val diff = Time.-(finish, start)
		       in
			  Debug.sayDebug 
			  ([], fn () =>
			   concat [name, ": ",
				   LargeInt.toString (Time.toMilliseconds diff),
				   " ms"])
		       end
		 in
		    (f a before done ())
		    handle e => (done (); raise e)
		 end
	    else f a
   end



1.1                  mlton/lib/cml/util/util.cm

Index: util.cm
===================================================================
Group is
  critical.sig
  critical.sml
  assert.sig
  assert.sml
  local-assert.fun
  debug.sig
  debug.sml
  local-debug.fun
  timeit.sig
  timeit.sml
  fun-queue.sig
  fun-queue.sml
  imp-queue.sig
  imp-queue.sml
  fun-priority-queue.sig
  fun-priority-queue.fun