[MLton-commit] r4982

Wesley Terpstra wesley at mlton.org
Mon Dec 18 18:52:57 PST 2006


my collection of SML libs, only half finished mostly
----------------------------------------------------------------------

A   mltonlib/trunk/ca/terpstra/st/
A   mltonlib/trunk/ca/terpstra/st/Makefile
A   mltonlib/trunk/ca/terpstra/st/README
A   mltonlib/trunk/ca/terpstra/st/data.sig
A   mltonlib/trunk/ca/terpstra/st/data.sml
A   mltonlib/trunk/ca/terpstra/st/edge.fun
A   mltonlib/trunk/ca/terpstra/st/epoll.h
A   mltonlib/trunk/ca/terpstra/st/epoll.sig
A   mltonlib/trunk/ca/terpstra/st/epoll.sml
A   mltonlib/trunk/ca/terpstra/st/ioevent.sig
A   mltonlib/trunk/ca/terpstra/st/ioevent.sml
A   mltonlib/trunk/ca/terpstra/st/kevent.h
A   mltonlib/trunk/ca/terpstra/st/kqueue.sml
A   mltonlib/trunk/ca/terpstra/st/level.fun
A   mltonlib/trunk/ca/terpstra/st/lpoll.sig
A   mltonlib/trunk/ca/terpstra/st/open.sml
A   mltonlib/trunk/ca/terpstra/st/scheduler.sig
A   mltonlib/trunk/ca/terpstra/st/socket.sml
A   mltonlib/trunk/ca/terpstra/st/st.mlb
A   mltonlib/trunk/ca/terpstra/st/state.sig
A   mltonlib/trunk/ca/terpstra/st/state.sml
A   mltonlib/trunk/ca/terpstra/st/test.mlb
A   mltonlib/trunk/ca/terpstra/st/test.sml
A   mltonlib/trunk/ca/terpstra/st/thread.sig
A   mltonlib/trunk/ca/terpstra/st/thread.sml
A   mltonlib/trunk/ca/terpstra/st/timeout.sig
A   mltonlib/trunk/ca/terpstra/st/timeout.sml

----------------------------------------------------------------------

Added: mltonlib/trunk/ca/terpstra/st/Makefile
===================================================================
--- mltonlib/trunk/ca/terpstra/st/Makefile	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/Makefile	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,9 @@
+all:	st
+	
+epoll/epoll.mlb:	epoll.h /usr/include/x86_64-linux/i386-linux/sys/epoll.h 
+kevent/kevent.mlb:	kevent.h /usr/include/sys/event.h
+
+%.mlb:
+	mlnlffigen -allSU true -linkage static -dir $(@D) -mlbfile $(@F) $^
+
+-include $(patsubst %.mlb,%.dep,$(wildcard *.mlb))

Added: mltonlib/trunk/ca/terpstra/st/README
===================================================================
--- mltonlib/trunk/ca/terpstra/st/README	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/README	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,16 @@
+This is a simple work-alike of state-threads.sf.net for Standard ML.
+It helps in building event driven state machines with non-concurrent threads.
+
+For an example, see test.sml
+
+To use on osx:
+	make kevent/kevent.mlb
+	mlton test.mlb
+
+To use on linux:
+	edit st.mlb to use epoll.mlb instead of kevent.mlb
+	make epoll/epoll.mlb
+	mlton test.mlb
+
+The test program downloads two webpages from google concurrently, while
+answering TCP connections on port 12467 and printing a heart beat.

Added: mltonlib/trunk/ca/terpstra/st/data.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/data.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/data.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,46 @@
+signature SPARSE_ARRAY =
+  sig
+    type 'a sparse_array
+    
+    val new: unit -> 'a sparse_array
+    
+    val sub: 'a sparse_array * int -> 'a option
+    val update: 'a sparse_array * int * 'a -> unit
+    val erase: 'a sparse_array * int -> unit
+  end
+
+signature DYNAMIC_ARRAY =
+  sig
+    type 'a dynamic_array
+    
+    val new: unit -> 'a dynamic_array
+    val size: 'a dynamic_array -> int
+    
+    val sub: 'a dynamic_array * int -> 'a
+    val update: 'a dynamic_array * int * 'a -> unit
+    val swap: 'a dynamic_array * int * int -> unit
+    
+    val push: 'a dynamic_array * 'a -> unit
+    val pop: 'a dynamic_array -> unit
+  end
+
+signature HEAP =
+  sig
+    type 'a heap
+    val new: ('a * 'a -> bool) -> 'a heap
+    val push: 'a heap * 'a -> unit
+    val pop: 'a heap -> unit
+    val peek: 'a heap -> 'a option
+  end
+
+signature QUEUE =
+  sig
+    type 'a queue
+    val new: unit -> 'a queue
+    
+    val empty: 'a queue -> bool
+    val enque: 'a queue * 'a -> unit
+    val deque: 'a queue -> 'a option
+    
+(*    val enqueList: 'a queue * 'a list -> unit *)
+ end

Added: mltonlib/trunk/ca/terpstra/st/data.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/data.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/data.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,136 @@
+structure SparseArray :> SPARSE_ARRAY =
+  struct
+    type 'a sparse_array = 'a option array ref
+    
+    fun new () = ref (Array.array (8, NONE))
+    
+    fun sub (ref array, i) =
+      if i >= (Array.length array) then NONE else
+      Array.sub (array, i)
+    
+    fun update (array, i, x) = (
+      if i < Array.length (!array) then () else
+      let val a = Array.array (i*2 + 1, NONE)
+      in
+        Array.copy { src = !array, dst = a, di = 0 };
+        array := a
+      end;
+      Array.update (!array, i, SOME x))
+    
+    fun erase (ref array, i) =
+      if i >= (Array.length array) then () else
+      Array.update (array, i, NONE)
+  end
+
+structure DynamicArray :> DYNAMIC_ARRAY =
+  struct
+    type 'a dynamic_array = 'a option array ref * int ref
+    
+    fun new () = (ref (Array.array (8, NONE)), ref 0)
+    fun size (_, ref length) = length
+    
+    fun sub ((ref array, _), i) = valOf (Array.sub (array, i))
+    fun update ((ref array, _), i, x) = Array.update (array, i, SOME x)
+    
+    fun swap ((ref array, _), i, j) = 
+      let
+        val iv = Array.sub (array, i)
+        val jv = Array.sub (array, j)
+      in
+        Array.update (array, i, jv);
+        Array.update (array, j, iv)
+      end
+    
+    fun push ((array, length), x) = (
+      if Array.length (!array) > !length then () else
+      let val a = Array.array (!length * 2, NONE)
+      in
+        Array.copy { src = !array, dst = a, di = 0 };
+        array := a
+      end;
+      update ((array, length), !length, x);
+      length := !length + 1)
+    
+    fun pop (ref array, length) = (
+      length := !length - 1;
+      Array.update (array, !length, NONE))
+  end
+
+structure Heap :> HEAP =
+  struct
+    open DynamicArray
+    type 'a heap = 'a dynamic_array * ('a * 'a -> bool)
+    
+    fun left i = 2*i + 1
+    fun right i = 2*i + 2
+    fun parent i = (i - 1) div 2
+    
+    fun new cmp = (DynamicArray.new (), cmp)
+    
+    fun push ((a, cmp), x) =
+      let
+        fun fixtail 0 = () | fixtail i = 
+          let
+            val parent = parent i
+          in
+            if cmp (sub (a, parent), sub (a, i)) then () else
+            (swap (a, parent, i); fixtail parent)
+          end
+      in
+        DynamicArray.push (a, x);
+        fixtail (size a - 1)
+      end
+    
+    fun pop (a, cmp) =
+      let
+        val newsize = size a - 1
+        
+        fun fixhead i =
+          let
+            val left = left i
+            val right = right i
+          in
+            if left >= newsize then () else
+            if right >= newsize then
+              if cmp (sub (a, i), sub (a, left)) then () else
+              swap (a, i, left)
+            else
+              if cmp (sub (a, left), sub (a, right)) then
+                if cmp (sub (a, i), sub (a, left)) then () else
+                (swap (a, i, left); fixhead left)
+              else
+                if cmp (sub (a, i), sub (a, right)) then () else
+                (swap (a, i, right); fixhead right)
+          end
+      in
+        update (a, 0, sub (a, newsize));
+        DynamicArray.pop a;
+        fixhead 0
+      end
+    
+    fun peek (a, cmp) =
+      if size a = 0 then NONE else SOME (sub (a, 0))
+  end
+
+structure Queue :> QUEUE =
+   struct
+      datatype 'a queue = T of {front: 'a list ref, back: 'a list ref}
+
+      fun new() = T{front = ref [], back = ref []}
+      
+      fun empty (T {front=ref [], back=ref []}) = true
+        | empty _ = false
+        
+      fun enque(T{back, ...}, x) = back := x :: !back
+
+      fun deque(T{front, back}) =
+         case !front of
+            [] => (case !back of
+                      [] => NONE
+                    | l => let val l = rev l
+                           in case l of
+                              [] => raise Fail "deque"
+                            | x :: l => (back := []; front := l; SOME x)
+                           end)
+          | x :: l => (front := l; SOME x) 
+   end

Added: mltonlib/trunk/ca/terpstra/st/edge.fun
===================================================================
--- mltonlib/trunk/ca/terpstra/st/edge.fun	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/edge.fun	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,58 @@
+functor Edge(Poll : EPOLL) :> SCHEDULER_EXTRA =
+  struct
+    open State
+    open Thread_Extra
+    open Timeout_Extra
+    open Poll
+    
+    val poll = create 1000 (* ready for 1000 file descriptors *)
+    
+    structure IoEvent : IOEVENT =
+      struct
+        open IoEvent
+        fun monitor fd status = (
+          add (poll, fd);
+          IoEvent.monitor fd status)
+        fun unmonitor fd = (
+          remove (poll, fd);
+          IoEvent.unmonitor fd)
+      end
+    open IoEvent
+    
+    fun sigPulse thread = thread before stop ()
+    
+    fun loop block = 
+      let
+        fun relativeTime time =
+          let
+            val delta = Time.- (time, Time.now ())
+          in
+            if Time.< (delta, Time.zeroTime) 
+            then Time.zeroTime
+            else delta
+          end
+            
+        val delay = 
+          case block of
+              PENDING => SOME Time.zeroTime
+            | COMPLETE => Option.map relativeTime (getNext ())
+      in
+        wait (poll, delay);
+        trigger (Time.now ());
+        loop (run ())
+      end
+    
+    fun main () = 
+      let
+        open MLton
+        open Signal
+        val real = Itimer.signal Itimer.Real
+        val freq = Time.fromMilliseconds 50
+      in
+        (* prevent high throughput connections from causing starvation *)
+        Mask.unblock (Mask.some [real]);
+        setHandler (real, Handler.handler sigPulse);
+        (* Itimer.set (Itimer.Real, { interval = freq, value = freq }); *)
+        loop (run ())
+      end
+  end

Added: mltonlib/trunk/ca/terpstra/st/epoll.h
===================================================================
--- mltonlib/trunk/ca/terpstra/st/epoll.h	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/epoll.h	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,9 @@
+#include <sys/epoll.h>
+
+enum EPOLL_CTL {
+  CTL_ADD = EPOLL_CTL_ADD,
+  CTL_DEL = EPOLL_CTL_DEL,
+  CTL_MOD = EPOLL_CTL_MOD
+};
+
+int close(int);

Added: mltonlib/trunk/ca/terpstra/st/epoll.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/epoll.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/epoll.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,15 @@
+signature EPOLL =
+  sig
+    type poll
+    type ioh = IoEvent.ioh
+    
+    val create: int -> poll
+    val close: poll -> unit
+    
+    (* Track changes to state of the io handle *)
+    val add:    poll * ioh -> unit
+    val remove: poll * ioh -> unit
+    
+    (* will automatically change IoEvent's status *)
+    val wait: poll * Time.time option -> unit
+  end

Added: mltonlib/trunk/ca/terpstra/st/epoll.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/epoll.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/epoll.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,71 @@
+(* Edge-triggered *)
+structure EPoll :> EPOLL =
+  struct
+    type poll = MLRep.Int.Signed.int
+    type ioh = IoEvent.ioh
+    
+    fun create events = F_epoll_create.f (MLRep.Int.Signed.fromInt events)
+    fun close epoll = ignore (F_close.f epoll)
+    
+    fun ctl cmd (epoll, fd) = 
+      let
+        open E_EPOLL_EVENTS
+        val makeUnsigned = MLRep.Int.Unsigned.fromInt o MLRep.Int.Signed.toInt
+        val flags = makeUnsigned (e_EPOLLIN + e_EPOLLOUT + e_EPOLLERR + 
+                                  e_EPOLLHUP + e_EPOLLET)
+        val epoll_event = C.new S_epoll_event.typ
+      in
+        C.Set.uint (S_epoll_event.f_events epoll_event, flags);
+        C.Set.sint (U_epoll_data.f_fd (S_epoll_event.f_data epoll_event),
+                    MLRep.Int.Signed.fromInt fd);
+        F_epoll_ctl.f (epoll, cmd, MLRep.Int.Signed.fromInt fd, 
+                       C.Ptr.|&| epoll_event);
+        C.discard epoll_event
+      end
+    
+    val add    = ctl E_EPOLL_CTL.e_CTL_ADD
+    val remove = ctl E_EPOLL_CTL.e_CTL_DEL
+    
+    val nevents = 500
+    val events = C.alloc S_epoll_event.typ (Word.fromInt nevents)
+    
+    fun wait (epoll, time) = 
+      let
+        val roundup = Time.fromMicroseconds 999
+        val delay = case time of
+            NONE => ~1
+          | SOME x => LargeInt.toInt (Time.toMilliseconds (Time.+ (x, roundup)))
+        
+        val nevents = F_epoll_wait.f (epoll, events, nevents, delay)
+        
+        fun event ees =
+          let
+            open E_EPOLL_EVENTS
+            val makeUnsigned = MLRep.Int.Unsigned.fromInt o MLRep.Int.Signed.toInt
+            val EPOLLIN  = makeUnsigned e_EPOLLIN
+            val EPOLLOUT = makeUnsigned e_EPOLLOUT
+            val EPOLLERR = makeUnsigned e_EPOLLERR
+            val EPOLLHUP = makeUnsigned e_EPOLLHUP
+            
+            val fdf = U_epoll_data.f_fd (S_epoll_event.f_data ees)
+            val fd = MLRep.Int.Signed.toInt (C.Get.sint fdf)
+            val flags = C.Get.uint (S_epoll_event.f_events ees)
+            
+            fun value bit = MLRep.Int.Unsigned.andb (flags, bit) = bit
+            val broken = value EPOLLERR orelse value EPOLLHUP
+          in
+            IoEvent.notifyHASINPUT  fd (value EPOLLIN  orelse broken);
+            IoEvent.notifyCANOUTPUT fd (value EPOLLOUT orelse broken)
+          end
+        
+        fun process i =
+          if i = nevents then () else
+	  (event (C.Ptr.sub (events, i)); process (i + 1))
+      in
+        process 0
+      end
+  end
+
+structure Scheduler = Edge(EPoll)
+structure IoEvent = Scheduler.IoEvent
+structure Scheduler :> SCHEDULER = Scheduler

Added: mltonlib/trunk/ca/terpstra/st/ioevent.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/ioevent.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/ioevent.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,20 @@
+signature IOEVENT =
+  sig
+    exception Unmonitored
+    
+    type status = { hasinput: bool, canoutput: bool}
+    type ioh
+    
+    val socket:  ('af, 'sock_type) Socket.sock -> (ioh -> 'a) -> 'a
+    val sockdes: Socket.sock_desc -> (ioh -> 'a) -> 'a
+    val file:    Posix.IO.file_desc -> (ioh -> 'a) -> 'a
+    
+    val HASINPUT:  ioh -> (bool, bool) State.state
+    val CANOUTPUT: ioh -> (bool, bool) State.state
+    
+    val notifyHASINPUT:  ioh -> bool State.signal
+    val notifyCANOUTPUT: ioh -> bool State.signal
+    
+    val monitor: ioh -> status -> unit
+    val unmonitor: ioh -> unit
+  end

Added: mltonlib/trunk/ca/terpstra/st/ioevent.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/ioevent.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/ioevent.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,53 @@
+structure IoEvent : IOEVENT = 
+  struct
+    open State
+    open SparseArray
+    
+    type ioh = int
+    exception Unmonitored
+    
+    type status = { 
+      hasinput:  bool, 
+      canoutput: bool }
+    type filedes = {
+      fhasinput:  (bool, bool) state * bool signal,
+      fcanoutput: (bool, bool) state * bool signal }
+    val filedes : filedes sparse_array = new ()
+
+    type 'a t = (unit -> 'a) * ('a -> unit)
+    val (geti, _) = _symbol "side_channel_hack" alloc: int t;
+    val (_, sets) = _symbol "side_channel_hack": ('a, 'b) Socket.sock t;
+    val (_, setd) = _symbol "side_channel_hack": Socket.sock_desc t;
+    val (_, setf) = _symbol "side_channel_hack": Posix.IO.file_desc t;
+    
+    fun socket  sock f = f (sets sock; geti ())
+    fun sockdes des  f = f (setd des;  geti ())
+    fun file    file f = f (setf file; geti ())
+    
+    fun test select fd = case sub (filedes, fd) of
+        NONE => raise Unmonitored
+      | SOME x => case select x of (state, _) => state
+    
+    val HASINPUT  = test #fhasinput
+    val CANOUTPUT = test #fcanoutput
+    
+    fun notify select fd = case sub (filedes, fd) of
+        NONE => raise Unmonitored
+      | SOME x => case select x of (_, signal) => signal
+    
+    val notifyHASINPUT  = notify #fhasinput
+    val notifyCANOUTPUT = notify #fcanoutput
+    
+    fun monitor fd (status:status) =
+      let
+        val entry = {
+          fhasinput  = state (#hasinput  status),
+          fcanoutput = state (#canoutput status) }
+      in
+        update (filedes, fd, entry)
+      end
+    
+    fun unmonitor fd = case sub (filedes, fd) of
+        NONE => raise Unmonitored
+      | SOME _ => erase (filedes, fd)
+  end

Added: mltonlib/trunk/ca/terpstra/st/kevent.h
===================================================================
--- mltonlib/trunk/ca/terpstra/st/kevent.h	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/kevent.h	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,57 @@
+#include <sys/event.h>
+#include <sys/time.h>
+
+enum filter { 
+	read = EVFILT_READ,
+	write = EVFILT_WRITE,
+	aio = EVFILT_AIO,
+	vnode = EVFILT_VNODE,
+	proc = EVFILT_PROC,
+	signal = EVFILT_SIGNAL,
+	timer = EVFILT_TIMER,
+	machport = EVFILT_MACHPORT,
+	fs = EVFILT_FS
+};
+
+enum action {
+	add = EV_ADD,
+	delete = EV_DELETE,
+	enable = EV_ENABLE,
+	disable = EV_DISABLE,
+	oneshot = EV_ONESHOT,
+	clear = EV_CLEAR,
+	sysflags = EV_SYSFLAGS,
+	flag0 = EV_FLAG0,
+	flag1 = EV_FLAG1,
+	eof = EV_EOF,
+	error = EV_ERROR,
+	poll = EV_POLL,
+	ooband = EV_OOBAND
+};
+
+/*
+enum note {
+	lowat = NOTE_LOWAT,
+	delete = NOTE_DELETE,
+	write = NOTE_WRITE,
+	extend = NOTE_EXTEND,
+	attrib = NOTE_ATTRIB,
+	link = NOTE_LINK,
+	rename = NOTE_RENAME,
+	revoke = NOTE_REVOKE,
+	exit = NOTE_EXIT,
+	fork = NOTE_FORK,
+	exec = NOTE_EXEC,
+	pctrlmask = NOTE_PCTRLMASK,
+	pdatamask = NOTE_PDATAMASK,
+	seconds = NOTE_SECONDS,
+	useconds = NOTE_USECONDS,
+	nseconds = NOTE_NSECONDS,
+	absolute = NOTE_ABSOLUTE,
+	track = NOTE_TRACK,
+	trackerr = NOTE_TRACKERR,
+	child = NOTE_CHILD
+};
+*/
+
+int close(int fd);

Added: mltonlib/trunk/ca/terpstra/st/kqueue.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/kqueue.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/kqueue.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,102 @@
+structure KQueue :> EPOLL =
+  struct
+    type poll = MLRep.Int.Signed.int
+    type ioh = IoEvent.ioh
+    
+    fun create _ = F_kqueue.f ()
+    fun close epoll = ignore (F_close.f epoll)
+    
+(*
+	    val () = print ("change: " ^ Int.toString fd ^ ": ")
+	    val () = print (Int.toString filter ^ " ")
+	    val () = print (Int.toString flags)
+	    val () = print "\n"
+*)
+    fun kevent (ke, fd, filter, flags) =
+      (C.Set.ulong  (S_kevent.f_ident  ke, 
+		     MLRep.Long.Unsigned.fromInt fd);
+       C.Set.sshort (S_kevent.f_filter ke, 
+		     MLRep.Short.Signed.fromInt 
+			(MLRep.Int.Signed.toInt filter));
+       C.Set.ushort (S_kevent.f_flags  ke, 
+		     MLRep.Short.Unsigned.fromInt 
+			(MLRep.Int.Signed.toInt flags)))
+    
+    fun control flags (epoll, fd) = 
+      let
+        val changes = C.alloc S_kevent.typ (Word.fromInt 2)
+        val zero = C.new S_timespec.typ
+      in
+        kevent (C.Ptr.sub (changes, 0), fd, E_filter.e_read,  flags);
+        kevent (C.Ptr.sub (changes, 1), fd, E_filter.e_write, flags);
+        C.Set.slong (S_timespec.f_tv_sec zero, 0);
+        C.Set.slong (S_timespec.f_tv_nsec zero, 0);
+        F_kevent.f (epoll, 
+                    C.Ptr.ro changes, 2, 
+                    C.Ptr.null (C.T.pointer S_kevent.typ), 0, 
+                    C.Ptr.ro (C.Ptr.|&| zero));
+        C.discard zero;
+        C.free changes
+      end
+    
+    val add    = control (E_action.e_add + E_action.e_clear)
+    val remove = control E_action.e_delete
+    
+    val nevents = 500
+    val events = C.alloc S_kevent.typ (Word.fromInt nevents)
+    
+    fun event ke =
+	let
+	    val fd = C.Get.ulong  (S_kevent.f_ident ke)
+	    val io = C.Get.sshort (S_kevent.f_filter ke)
+	    
+	    val fd = MLRep.Long.Unsigned.toInt fd
+	    
+	    val cvt = MLRep.Short.Signed.fromInt o MLRep.Int.Signed.toInt
+	    val read  = cvt E_filter.e_read
+	    val write = cvt E_filter.e_write
+(*
+	    val () = print ("event: " ^ Int.toString fd ^ ":")
+	    val () = if io = read  then print " read"  else ()
+	    val () = if io = write then print " write" else ()
+	    val () = print "\n"
+*)
+	in
+	    if io = read  then IoEvent.notifyHASINPUT  fd true else ();
+	    if io = write then IoEvent.notifyCANOUTPUT fd true else ()
+	end
+    
+    fun wait (epoll, time) = 
+	let
+            fun timespec NONE = C.Ptr.null (C.T.pointer S_timespec.typ)
+              | timespec (SOME t) = 
+	          let
+	             val ts = C.alloc S_timespec.typ (Word.fromInt 1)
+                     val (seconds, nano) = 
+                         IntInf.quotRem (Time.toNanoseconds t, 1000000000)
+	          in
+	             C.Set.slong (S_timespec.f_tv_sec  (C.Ptr.|*| ts), 
+		                  MLRep.Long.Signed.fromLarge seconds);
+	             C.Set.slong (S_timespec.f_tv_nsec (C.Ptr.|*| ts), 
+			          MLRep.Long.Signed.fromLarge nano);
+		     ts
+		  end
+	    val ts = timespec time
+	    
+	    val changes = C.Ptr.ro (C.Ptr.null (C.T.pointer S_kevent.typ))
+	    val nevents = F_kevent.f (MLRep.Int.Signed.fromInt epoll, 
+				      changes, 0,
+				      events, nevents, 
+				      C.Ptr.ro ts)
+	    fun process i =
+	        if i = nevents then () else
+	        (event (C.Ptr.sub (events, i)); process (i + 1))
+	in
+	    process 0;
+	    if C.Ptr.isNull ts then () else C.free ts
+	end
+  end
+  
+structure Scheduler = Edge(KQueue)
+structure IoEvent   :> IOEVENT   = Scheduler.IoEvent
+structure Scheduler :> SCHEDULER = Scheduler

Added: mltonlib/trunk/ca/terpstra/st/level.fun
===================================================================
--- mltonlib/trunk/ca/terpstra/st/level.fun	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/level.fun	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,73 @@
+functor Level(Poll : LPOLL) :> SCHEDULER_EXTRA =
+  struct
+    open State
+    open Thread_Extra
+    open Timeout_Extra
+    open Poll
+    
+    val poll = create 1000 (* ready for 1000 file descriptors *)
+    
+    structure IoEvent : IOEVENT =
+      struct
+        open IoEvent
+        
+        fun monitor fd { hasinput, canoutput } = (
+          if hasinput  then () else watch (poll, fd, Poll.HASINPUT);
+          if canoutput then () else watch (poll, fd, Poll.CANOUTPUT); 
+          IoEvent.monitor fd {hasinput = hasinput, canoutput = canoutput})
+        
+        fun unmonitor fd = (
+          unwatchall (poll, fd);
+          IoEvent.unmonitor fd)
+        
+        fun notifyHASINPUT fd true = (
+              IoEvent.notifyHASINPUT fd true)
+          | notifyHASINPUT fd false = (
+              Poll.watch (poll, fd, Poll.HASINPUT);
+              IoEvent.notifyHASINPUT fd false)
+        
+        fun notifyCANOUTPUT fd true = (
+              IoEvent.notifyCANOUTPUT fd true)
+          | notifyCANOUTPUT fd false = (
+              Poll.watch (poll, fd, Poll.CANOUTPUT);
+              IoEvent.notifyCANOUTPUT fd false)
+      end
+    open IoEvent
+    
+    fun sigPulse thread = thread before stop ()
+    
+    fun loop block = 
+      let
+        fun relativeTime time =
+          let
+            val delta = Time.- (time, Time.now ())
+          in
+            if Time.< (delta, Time.zeroTime) 
+            then Time.zeroTime
+            else delta
+          end
+            
+        val delay = 
+          case block of
+              PENDING => SOME Time.zeroTime
+            | COMPLETE => Option.map relativeTime (getNext ())
+      in
+        wait (poll, delay);
+        trigger (Time.now ());
+        loop (run ())
+      end
+    
+    fun main () = 
+      let
+        open MLton
+        open Signal
+        val real = Itimer.signal Itimer.Real
+        val freq = Time.fromMilliseconds 50
+      in
+        (* prevent high throughput connections from causing starvation *)
+        Mask.unblock (Mask.some [real]);
+        setHandler (real, Handler.handler sigPulse);
+        (* Itimer.set (Itimer.Real, { interval = freq, value = freq }); *)
+        loop (run ())
+      end
+  end

Added: mltonlib/trunk/ca/terpstra/st/lpoll.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/lpoll.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/lpoll.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,21 @@
+(* Signature for level-triggered poll *)
+signature LPOLL =
+  sig
+    type poll
+    type ioh = IoEvent.ioh
+    datatype level = HASINPUT | CANOUTPUT
+    
+    val create: int -> poll
+    val close: poll -> unit
+    
+    (* add a watch to the list *)
+    val watch: poll * ioh * level -> unit
+    
+    (* called prior to closing the io handle *)
+    val unwatchall: poll * ioh -> unit
+    
+    (* automatically change IoEvent's status
+     * triggered watches are automatically removed from the poll (ie: oneshot)
+     *)
+    val wait: poll * Time.time option -> unit
+  end

Added: mltonlib/trunk/ca/terpstra/st/open.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/open.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/open.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,5 @@
+open State
+open Thread
+open Timeout
+open IoEvent
+open Scheduler

Added: mltonlib/trunk/ca/terpstra/st/scheduler.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/scheduler.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/scheduler.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,10 @@
+signature SCHEDULER =
+  sig
+    val main: unit -> unit
+  end
+
+signature SCHEDULER_EXTRA =
+  sig
+    include SCHEDULER    
+    structure IoEvent: IOEVENT
+  end

Added: mltonlib/trunk/ca/terpstra/st/socket.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/socket.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/socket.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,202 @@
+structure Socket : SOCKET =
+  struct
+    open Socket
+    open State
+    open IoEvent
+    open Timeout
+    open Thread
+    
+    fun wrapInNB f s x =
+      case f x of
+          NONE => NONE before socket (s x) notifyHASINPUT false
+        | SOME v => SOME v
+    
+    val recvVecNB  = fn x => wrapInNB recvVecNB  #1 x
+    val recvVecNB' = fn x => wrapInNB recvVecNB' #1 x
+    val recvArrNB  = fn x => wrapInNB recvArrNB  #1 x
+    val recvArrNB' = fn x => wrapInNB recvArrNB' #1 x
+    
+    val recvVecFromNB  = fn x => wrapInNB recvVecFromNB  #1 x
+    val recvVecFromNB' = fn x => wrapInNB recvVecFromNB' #1 x
+    val recvArrFromNB  = fn x => wrapInNB recvArrFromNB  #1 x
+    val recvArrFromNB' = fn x => wrapInNB recvArrFromNB' #1 x
+    
+    fun wrapIn f s x = (
+      stopTill (socket (s x) HASINPUT);
+      case f x of
+          NONE => wrapIn f s x
+        | SOME x => x)
+    
+    fun recvVec  x = wrapIn recvVecNB  #1 x
+    fun recvVec' x = wrapIn recvVecNB' #1 x
+    fun recvArr  x = wrapIn recvArrNB  #1 x
+    fun recvArr' x = wrapIn recvArrNB' #1 x
+    
+    fun recvVecFrom  x = wrapIn recvVecFromNB  #1 x
+    fun recvVecFrom' x = wrapIn recvVecFromNB' #1 x
+    fun recvArrFrom  x = wrapIn recvArrFromNB  #1 x
+    fun recvArrFrom' x = wrapIn recvArrFromNB' #1 x
+    
+    fun wrapOutNB f s x =
+      case f x of
+          NONE => NONE before socket (s x) notifyCANOUTPUT false
+        | SOME v => SOME v
+    
+    val sendVecNB  = fn x => wrapOutNB sendVecNB  #1 x
+    val sendVecNB' = fn x => wrapOutNB sendVecNB' #1 x
+    val sendArrNB  = fn x => wrapOutNB sendArrNB  #1 x
+    val sendArrNB' = fn x => wrapOutNB sendArrNB' #1 x
+    
+    fun wrapOutNBbool f s x =
+      case f x of
+          false => false before socket (s x) notifyCANOUTPUT false
+        | true => true
+    
+    val sendVecToNB  = fn x => wrapOutNBbool sendVecToNB  #1 x
+    val sendVecToNB' = fn x => wrapOutNBbool sendVecToNB' #1 x
+    val sendArrToNB  = fn x => wrapOutNBbool sendArrToNB  #1 x
+    val sendArrToNB' = fn x => wrapOutNBbool sendArrToNB' #1 x
+    
+    fun wrapOut f s x = (
+      stopTill (socket (s x) CANOUTPUT);
+      case f x of
+          NONE => wrapOut f s x
+        | SOME x => x)
+    
+    fun sendVec  x = wrapOut sendVecNB  #1 x
+    fun sendVec' x = wrapOut sendVecNB' #1 x
+    fun sendArr  x = wrapOut sendArrNB  #1 x
+    fun sendArr' x = wrapOut sendArrNB' #1 x
+    
+    fun wrapOutbool f s x = (
+      stopTill (socket (s x) CANOUTPUT);
+      case f x of
+          false => wrapOutbool f s x
+        | true => ())
+    
+    fun sendVecTo  x = wrapOutbool sendVecToNB  #1 x
+    fun sendVecTo' x = wrapOutbool sendVecToNB' #1 x
+    fun sendArrTo  x = wrapOutbool sendArrToNB  #1 x
+    fun sendArrTo' x = wrapOutbool sendArrToNB' #1 x
+    
+    val acceptNB = fn s => 
+      case acceptNB s of
+          NONE => NONE before socket s notifyHASINPUT false
+        | SOME (s, a) => 
+            (* It is safe to say no input, b/c edge triggered APIs always 
+             * give at least one initial status report. It is also safe
+             * for level triggered, since this gets it added to the poll.
+             * Thus, no really fast sends are lost.
+             *
+             * This is the smart thing to do, because SYN+ACK takes a while
+             * to reach the client. So, there's no point wasting a recv()
+             * when it's almost surely not going to have data yet anyways.
+             *)
+            SOME (s, a) before socket s monitor { hasinput  = false, 
+                                                  canoutput = true }
+    fun accept x = wrapIn acceptNB (fn s => s) x
+    
+    val close = fn s => (socket s unmonitor; close s)
+    
+    val listen = fn (s, i) =>
+      (* due to a bug in BSD's kqueue API, we must re-monitor *)
+      (socket s unmonitor;
+       listen (s, i);
+       socket s monitor { hasinput = false, canoutput = true })
+    
+    val connect = fn (s, a) =>
+      case connectNB (s, a) of
+        true => ()
+      | false => (
+          stopTill (socket s CANOUTPUT);
+          (* Get the error status, if getERROR doesn't raise, we raise
+           * something generic since we only know that it failed.
+           *)
+          if Socket.Ctl.getERROR s
+          then raise OS.SysErr ("Connection failed", NONE)
+          else ())
+    
+    fun select {rds, wrs, exs, timeout} = 
+      let
+        datatype which = 
+          RDS of sock_desc | WRS of sock_desc | TIMER
+        
+        val rds = List.map (fn rd => (sockdes rd HASINPUT,  RDS rd)) rds
+        val wrs = List.map (fn wr => (sockdes wr CANOUTPUT, WRS wr)) wrs
+        val tmr = case timeout of SOME x => [(TIMEOUT x, TIMER)] | NONE => []
+        val events = List.concat [rds, wrs, tmr]
+        
+        val ords = ref []
+        val owrs = ref []
+        
+        fun split (RDS rd) = ords := rd :: !ords
+          | split (WRS wr) = owrs := wr :: !owrs
+          | split TIME = ()
+      in
+        List.app split (Thread.select events);
+        {rds = !ords, wrs = !owrs, exs = []}
+      end
+  end
+
+structure Wrap =
+  struct
+    local
+      open IoEvent
+    in
+      val monitor = fn s =>
+        s before socket s monitor { hasinput = false, canoutput = false }
+      
+      fun monitorPair (s, t) = (monitor s, monitor t)
+    end
+  end
+
+structure GenericSock : GENERIC_SOCK =
+  struct
+    open GenericSock
+    open Wrap
+    
+    val socket  = fn x => monitor (socket  x)
+    val socket' = fn x => monitor (socket' x)
+    val socketPair  = fn x => monitorPair (socketPair  x)
+    val socketPair' = fn x => monitorPair (socketPair' x)
+  end
+
+structure INetSock : INET_SOCK =
+  struct
+    open INetSock
+    open Wrap
+    
+    structure UDP =
+      struct
+        open UDP
+        val socket  = fn x => monitor (socket  x)
+        val socket' = fn x => monitor (socket' x)
+      end
+    
+    structure TCP =
+      struct
+        open TCP
+        val socket  = fn x => monitor (socket  x)
+        val socket' = fn x => monitor (socket' x)
+      end
+  end
+
+structure UnixSock : UNIX_SOCK =
+  struct
+    open UnixSock
+    open Wrap
+    
+    structure Strm =
+      struct
+        open Strm
+        val socket = fn x => monitor (socket x)
+        val socketPair = fn x => monitorPair (socketPair x)
+      end
+      
+    structure DGrm =
+      struct
+        open DGrm
+        val socket = fn x => monitor (socket x)
+        val socketPair = fn x => monitorPair (socketPair x)
+      end
+  end

Added: mltonlib/trunk/ca/terpstra/st/st.mlb
===================================================================
--- mltonlib/trunk/ca/terpstra/st/st.mlb	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/st.mlb	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,56 @@
+local
+  $(SML_LIB)/basis/basis.mlb
+  $(SML_LIB)/basis/mlton.mlb
+  $(SML_LIB)/mlnlffi-lib/mlnlffi-lib.mlb
+  
+  ann
+    "allowFFI true"
+  in
+    data.sig
+    data.sml
+    
+    state.sig
+    state.sml
+    thread.sig
+    thread.sml
+    
+    timeout.sig
+    timeout.sml
+    ioevent.sig
+    ioevent.sml
+    
+    scheduler.sig
+    epoll.sig
+    edge.fun
+    lpoll.sig
+    level.fun
+
+    kevent/kevent.mlb
+    kqueue.sml
+
+(*  epoll/epoll.mlb
+    epoll.sml
+*)
+    socket.sml
+  end
+in
+  signature STATE
+  signature THREAD
+  signature TIMEOUT
+  signature IOEVENT
+  signature SCHEDULER
+  
+  structure State
+  structure Thread
+  structure Timeout
+  structure IoEvent
+  structure Scheduler
+  
+  (* override basis definitions with ours -- we have hooks *)
+  structure Socket
+  structure GenericSock
+  structure INetSock
+  structure UnixSock
+  
+  open.sml
+end

Added: mltonlib/trunk/ca/terpstra/st/state.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/state.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/state.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,47 @@
+(* Attempts to classify states as 'level-triggered' or 'edge-triggered'
+ * will fail, as these terms make sense only at the intersection of states
+ * and blocking primitives. Both styles (and others) can be realized using
+ * the watch and value methods.
+ * 
+ * A given state may only be watched once (1 time in 1 thread).
+ * If a second watch is attempted, the RaceCondition exception is raised.
+ *)
+signature STATE =
+  sig
+    type ('val, 'diff) state
+    type 'diff signal = 'diff -> unit
+    
+    (* create a new state *)
+    val state: ''val -> (''val, ''val) state * ''val signal
+    val delta: ('val * 'diff -> 'val option) -> 'val -> 
+               ('val, 'diff) state * 'diff signal
+    
+    (* get the current value of a state *)
+    val value: ('val, 'diff) state -> 'val
+    
+    (* hook a callback invoked when the state changes *)
+    exception RaceCondition
+    exception UnWatched
+    val dwatch: ('val * 'diff -> unit) -> ('val, 'diff) state -> unit
+    val swatch: ('val -> unit) -> ('val, 'diff) state -> unit
+    val release: ('val, 'diff) state -> unit
+    
+    (* map this state into a new derived state *)
+    val smap: ('val1 -> 'val2) -> 
+              ('val1, 'val1) state -> ('val2, 'val2) state
+    val dmap: ('val1 -> 'val2) * 
+              ('val1 * 'diff1 * 'val2 -> ('val2 * 'diff2) option) ->
+              ('val1, 'diff1) state -> ('val2, 'diff2) state
+    
+    (* compose two states into their product *)
+    datatype ('diff1, 'diff2) alt = DIFF1 of 'diff1 | DIFF2 of 'diff2
+    val scompose: ('val1, 'val1) state * ('val2, 'val2) state ->
+                  ('val1 * 'val2, 'val1 * 'val2) state
+    val dcompose: ('val1, 'diff1) state * ('val2, 'diff2) state ->
+                  ('val1 * 'val2, ('diff1, 'diff2) alt) state
+    
+    (* If you want multiple watchers on the same state *)
+    type ('val, 'diff) broadcast
+    val broadcast: ('val, 'diff) state -> ('val, 'diff) broadcast
+    val clone: ('val, 'diff) broadcast -> ('val, 'diff) state
+  end

Added: mltonlib/trunk/ca/terpstra/st/state.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/state.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/state.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,133 @@
+structure State :> STATE =
+  struct
+    type ('val, 'diff) state = {
+      value: unit -> 'val,
+      release: unit -> unit,
+      watch: ('val * 'diff -> unit) -> unit
+      }
+    type 'diff signal = 'diff -> unit
+    exception RaceCondition
+    exception UnWatched
+    
+    fun delta update init =
+      let
+        val state = ref init
+        val block = ref NONE
+        
+        fun value () = !state
+        fun release () = 
+          case !block of
+              NONE => raise UnWatched
+            | SOME _ => block := NONE
+        fun watch f =
+          case !block of
+              NONE => block := SOME f
+            | SOME _ => raise RaceCondition
+        fun signal diff =
+          case update (!state, diff) of
+              NONE => ()
+            | SOME newval =>
+                case !block of
+                    NONE => state := newval
+                  | SOME f => (state := newval; f (newval, diff))
+      in
+        ({ value = value, release = release, watch = watch }, signal)
+      end
+    fun state init = delta (fn (s, d) => if s = d then NONE else SOME d) init
+    
+    fun value    { value, release=_, watch=_ } = value ()
+    fun release  { value=_, release, watch=_ } = release ()
+    fun dwatch f { value=_, release=_, watch } = watch f
+    fun swatch f = dwatch (fn (x, _) => f x)
+    
+    fun dmap (valmap, diffmap) state =
+      let
+        val valproxy = ref NONE
+        val block = ref NONE
+        
+        fun proxy (val1, diff1) =
+          case diffmap (val1, diff1, valOf (!valproxy)) of
+              NONE => ()
+            | SOME (newval2, diff2) =>
+                case !block of
+                    NONE => valproxy := SOME newval2
+                  | SOME f => (valproxy := SOME newval2; f (newval2, diff2))
+        
+        val watch = fn f =>
+          case !block of
+              NONE => (dwatch proxy state; (* first b/c it might raise *)
+                       block := SOME f; 
+                       valproxy := SOME (valmap (value state)))
+            | SOME _ => raise RaceCondition
+        val value = fn () =>
+          case !valproxy of
+              NONE => valmap (value state)
+            | SOME x => x
+        val release = fn () =>
+          case !block of
+              NONE => raise UnWatched
+            | SOME _ => (release state; block := NONE; valproxy := NONE)
+      in
+        { value = value, release = release, watch = watch }
+      end
+    fun smap valmap = 
+      dmap (valmap, fn (v, _, _) => let val v2 = valmap v in SOME (v2, v2) end)
+    
+    datatype ('diff1, 'diff2) alt = DIFF1 of 'diff1 | DIFF2 of 'diff2
+    fun dcompose (state1, state2) =
+      let
+        val block = ref NONE
+        fun proxy1 (val1, diff1) =
+          (valOf (!block)) ((val1, value state2), DIFF1 diff1)
+        fun proxy2 (val2, diff2) =
+          (valOf (!block)) ((value state1, val2), DIFF2 diff2)
+        
+        val watch = fn f =>
+          case !block of
+              NONE => (
+                dwatch proxy1 state1;
+                (dwatch proxy2 state2 handle ex => (release state1; raise ex));
+                block := SOME f)
+            | SOME _ => raise RaceCondition
+        val value = fn () =>
+          (value state1, value state2)
+        val release = fn () =>
+          case !block of
+              NONE => raise UnWatched
+            | SOME _ => (release state1; release state2; block := NONE)
+      in
+        { value = value, release = release, watch = watch }
+      end
+      
+    fun scompose (state1, state2) =
+      let
+        val block = ref NONE
+        fun proxy1 (val1, diff1) =
+          let val val2 = value state2 in
+          (valOf (!block)) ((val1, val2), (val1, val2)) end
+        fun proxy2 (val2, diff2) =
+          let val val1 = value state1 in
+          (valOf (!block)) ((val1, val2), (val1, val2)) end
+        
+        val watch = fn f =>
+          case !block of
+              NONE => (
+                dwatch proxy1 state1;
+                (dwatch proxy2 state2 handle ex => (release state1; raise ex));
+                block := SOME f)
+            | SOME _ => raise RaceCondition
+        val value = fn () =>
+          (value state1, value state2)
+        val release = fn () =>
+          case !block of
+              NONE => raise UnWatched
+            | SOME _ => (release state1; release state2; block := NONE)
+      in
+        { value = value, release = release, watch = watch }
+      end
+      
+      (* !!! fixme *)
+      type ('val, 'diff) broadcast = ('val, 'diff) state
+      fun broadcast state = state
+      fun clone broadcaster = broadcaster
+  end

Added: mltonlib/trunk/ca/terpstra/st/test.mlb
===================================================================
--- mltonlib/trunk/ca/terpstra/st/test.mlb	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/test.mlb	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,6 @@
+local
+  $(SML_LIB)/basis/basis.mlb
+  st.mlb
+in
+  test.sml
+end

Added: mltonlib/trunk/ca/terpstra/st/test.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/test.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/test.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,59 @@
+type port = (INetSock.inet, Socket.passive Socket.stream) Socket.sock
+
+(* There must be a better (faster!) way to convert a string to unsigned char *)
+fun msg s = Word8VectorSlice.full (Word8Vector.tabulate 
+  (String.size s, Word8.fromInt o Char.ord o (fn i => String.sub (s, i))))
+fun str v = CharVector.tabulate (Word8Vector.length v,
+  Char.chr o Word8.toInt o (fn i => Word8Vector.sub (v, i)))
+
+val delay = Time.fromSeconds 5
+val port : port = INetSock.TCP.socket ()
+val () = Socket.Ctl.setREUSEADDR (port, true)
+val () = Socket.bind (port, INetSock.any 12467)
+val () = Socket.listen (port, 100)
+
+val google = valOf (NetHostDB.getByName "www.google.de")
+val ghttp = INetSock.toAddr (NetHostDB.addr google, 80)
+
+fun http () =
+  let
+    val s = INetSock.TCP.socket ()
+    val () = print "connecting...\n"
+    val () = Socket.connect (s, ghttp)
+    val () = print "sending...\n"
+    val _  = Socket.sendVec (s, msg "GET / HTTP/1.1\nHost: www.google.de\n\n")
+    val () = print "reading...\n"
+    val got = Socket.recvVec (s, 4096)
+    val () = print "done!\n"
+  in
+    print ("response: " ^ str got ^ "\n")
+  end
+
+fun worker s () = 
+  let
+    val _ = Socket.sendVec (s, msg "hello and welcome!\n");
+    val got = Word8VectorSlice.full (Socket.recvVec (s, 400))
+  in
+    if Word8VectorSlice.length got = 0 then Socket.close s else
+    (Socket.sendVec (s, got); worker s ())
+  end
+
+fun welcome () = 
+  let
+    val (s, _) = Socket.accept port
+  in
+    spawn (worker s);
+    welcome ()
+  end
+
+fun beat () = (
+  stopTill (TIMEOUT delay);
+  print "hello world\n";
+  beat ())
+  
+val () = spawn welcome
+val () = spawn beat
+val () = spawn http
+val () = spawn http
+
+val () = main ()

Added: mltonlib/trunk/ca/terpstra/st/thread.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/thread.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/thread.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,18 @@
+signature THREAD =
+  sig
+    (* start a new thread, which will be run later *)
+    val spawn: (unit -> unit) -> unit
+    val yield: 'a -> 'a (* release control for a tick *)
+    
+    val stopTill: (bool, 'a) State.state -> unit
+    val select: ((bool, 'b) State.state * 'a) list -> 'a list
+  end
+
+signature THREAD_EXTRA =
+  sig
+    include THREAD
+    
+    datatype loop = COMPLETE | PENDING
+    val run:  unit -> loop (* process queue till completed or stopped *)
+    val stop: unit -> unit (* stop processing queue and return soon *)
+  end

Added: mltonlib/trunk/ca/terpstra/st/thread.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/thread.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/thread.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,65 @@
+structure Thread_Extra :> THREAD_EXTRA =
+  struct
+    open MLton.Thread
+    open State
+    type thread = Runnable.t
+    
+    val ready : thread Queue.queue = Queue.new ()
+    val loop  : thread option ref = ref NONE
+    val quit  : bool ref = ref false
+    
+    fun next () = 
+      if Queue.empty ready orelse !quit then valOf (!loop) else
+      valOf (Queue.deque ready)
+    
+    fun spawn main = 
+      Queue.enque (ready, prepare (new 
+        (fn () => (main (); switch (fn _ => next ()))), ()))
+    
+    fun yield result = switch (fn thread => (
+      Queue.enque (ready, prepare (thread, result)); 
+      next ()))
+    
+    datatype loop = COMPLETE | PENDING
+    fun run () = (
+      quit := false;
+      switch (fn thread => (loop := SOME (prepare (thread, ())); next ()));
+      case Queue.empty ready of
+        true => COMPLETE | false => PENDING)
+    
+    fun stop () = quit := true
+    
+    (* the while loop deals with the case that a state may have only
+     * temporarily become true (before switch), but is not true any longer.
+     *)
+    fun stopTill state =
+      while not (value state) do switch (fn thread => 
+        let
+          fun resume _ = (
+            release state;
+            Queue.enque (ready, prepare (thread, ())))
+        in
+          swatch resume state;
+          next ()
+        end)
+    
+    fun select events =
+      let
+        fun map (state, res) = if value state then SOME res else NONE
+        fun block thread =
+          let
+            fun resume _ = (
+              List.app (fn (state, _) => release state) events;
+              Queue.enque (ready, prepare (thread, ())))
+          in
+            List.app (fn (state, _) => swatch resume state) events;
+            next ()
+          end
+      in
+        case List.mapPartial map events of
+          x :: r => x :: r
+        | [] => (switch block; select events)
+      end
+  end
+
+structure Thread :> THREAD = Thread_Extra

Added: mltonlib/trunk/ca/terpstra/st/timeout.sig
===================================================================
--- mltonlib/trunk/ca/terpstra/st/timeout.sig	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/timeout.sig	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,21 @@
+signature TIMEOUT =
+  sig
+    (* TIMEOUT is measured since the last IO poll, not the instant called *)
+    val TIMEOUT:   Time.time -> (bool, bool) State.state
+    
+    (* LATERTHAN is an absolute time value *)
+    val LATERTHAN: Time.time -> (bool, bool) State.state
+    
+    (* What is the cached time as of last tick (fast) *)
+    val lastTick: unit -> Time.time
+  end
+
+signature TIMEOUT_EXTRA =
+  sig
+    include TIMEOUT
+    
+    (* The earliest pending timer (if any) *)
+    val getNext: unit -> Time.time option
+    (* Toggle all states to true prior to the given *)
+    val trigger: Time.time -> unit
+  end

Added: mltonlib/trunk/ca/terpstra/st/timeout.sml
===================================================================
--- mltonlib/trunk/ca/terpstra/st/timeout.sml	2006-12-15 14:53:49 UTC (rev 4981)
+++ mltonlib/trunk/ca/terpstra/st/timeout.sml	2006-12-19 02:52:52 UTC (rev 4982)
@@ -0,0 +1,46 @@
+(* !!! fixme: timers persist in the heap even if unreferenced.
+ * once MLton bug is fixed, use MLton.Weak and MLton.Finalizable
+ *)
+structure Timeout_Extra :> TIMEOUT_EXTRA =
+  struct
+    open State
+    open Time
+    open Heap
+    
+    type sleeper = time * bool signal
+    fun nextSleeper ((t1, _), (t2, _)) = t1 < t2
+    val sleeper = new nextSleeper
+    val rLastTick = ref (Time.now ())
+    
+    fun lastTick () = !rLastTick
+    
+    fun LATERTHAN time =
+      let
+        val (state, signal) = state false
+      in
+        push (sleeper, (time, signal));
+        state
+      end
+      
+    fun TIMEOUT time = LATERTHAN (time + lastTick ())
+    
+    fun getNext () =
+      case peek sleeper of
+          NONE => NONE
+        | SOME (t, _) => SOME t
+    
+    fun trigger time =
+      let
+        fun loop () =
+          case peek sleeper of
+              NONE => ()
+            | SOME (t, s) => 
+                if time < t then () else
+                (pop sleeper; s true; loop ())
+      in
+        rLastTick := time;
+        loop ()
+      end
+  end
+
+structure Timeout :> TIMEOUT = Timeout_Extra




More information about the MLton-commit mailing list