[MLton-commit] r6454

spoons at mlton.org spoons at mlton.org
Mon Mar 3 07:41:23 PST 2008


SML portion of the parallel library.

Provide support for richer parallel primitives, as well as multiple
scheduling policies.

----------------------------------------------------------------------

U   mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb
U   mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml
U   mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig
U   mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml
U   mlton/branches/shared-heap-multicore/basis-library/mlton.mlb
A   mlton/branches/shared-heap-multicore/basis-library/parallel/
A   mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig
A   mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig
A   mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig
A   mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig
A   mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig
A   mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map
A   mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map
A   mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map
A   mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml
A   mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig
A   mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map
A   mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml

----------------------------------------------------------------------

Modified: mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/build/sources.mlb	2008-03-03 15:41:17 UTC (rev 6454)
@@ -281,6 +281,10 @@
    ../mlton/exit.sml
    ../mlton/exn.sig
    ../mlton/exn.sml
+
+   ann "allowFFI true" in
+      ../parallel/internal.sml
+   end
    ../mlton/thread.sig
    ../mlton/thread.sml
    ../mlton/signal.sig
@@ -363,6 +367,22 @@
    ../mlton/world.sml
    ../mlton/mono-array.sig
    ../mlton/mono-vector.sig
+
+   ../parallel/workqueue.sig
+   ../parallel/basic.sig
+   ../parallel/future.sig
+   ../parallel/forkjoin.sig
+   ../parallel/array.sig
+   ../parallel/parallel.sig
+   ann "allowFFI true" in
+      ../parallel/$(WORK_QUEUE).sml
+      ../parallel/basic.sml
+      ../parallel/future.sml
+      ../parallel/forkjoin.sml
+   end
+   ../parallel/array.sml
+   ../parallel/parallel.sml
+
    ../mlton/mlton.sig
    ../mlton/mlton.sml
 

Modified: mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/libs/basis-extra/top-level/basis-sigs.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -94,6 +94,7 @@
 signature MLTON_ITIMER = MLTON_ITIMER
 signature MLTON_MONO_ARRAY = MLTON_MONO_ARRAY
 signature MLTON_MONO_VECTOR = MLTON_MONO_VECTOR
+signature MLTON_PARALLEL = MLTON_PARALLEL
 signature MLTON_PLATFORM = MLTON_PLATFORM
 signature MLTON_POINTER = MLTON_POINTER
 signature MLTON_PROC_ENV = MLTON_PROC_ENV

Modified: mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -34,6 +34,7 @@
       structure Itimer: MLTON_ITIMER
       structure LargeReal: MLTON_REAL
       structure LargeWord: MLTON_WORD
+      structure Parallel: MLTON_PARALLEL
       structure Platform: MLTON_PLATFORM
       structure Pointer: MLTON_POINTER
       structure ProcEnv: MLTON_PROC_ENV

Modified: mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/mlton/mlton.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -63,6 +63,7 @@
       open LargeWord
       type t = word
    end
+structure Parallel = MLtonParallel
 structure Platform = MLtonPlatform
 structure Pointer = MLtonPointer
 structure ProcEnv = MLtonProcEnv

Modified: mlton/branches/shared-heap-multicore/basis-library/mlton.mlb
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/mlton.mlb	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/mlton.mlb	2008-03-03 15:41:17 UTC (rev 6454)
@@ -26,6 +26,7 @@
       signature MLTON_ITIMER
       signature MLTON_MONO_ARRAY
       signature MLTON_MONO_VECTOR
+      signature MLTON_PARALLEL
       signature MLTON_PLATFORM
       signature MLTON_POINTER
       signature MLTON_PROC_ENV

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/array.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,14 @@
+signature MLTON_PARALLEL_ARRAY =
+sig
+  (* parallel versions of ordinary array operations.  maxSeq indicates the
+   largest number of elements that will be computed sequentially on a single
+   processor. *)
+             (* maxSeq      f           n *)
+  val tabulate : int -> (int -> 'a) -> int -> 'a Array.array
+           (* maxSeq      f                      a *)
+  val modify : int -> (int * 'a -> 'a) -> 'a Array.array -> unit
+
+  (* see the comment for MLTON_PARALLEL_FORKJOIN.reduce *)
+            (* maxSeq       "*"            "inj"      "unit"      a *)
+  val reduce : int -> ('b * 'b -> 'b) -> ('a -> 'b) -> 'b -> 'a Array.array -> 'b
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/array.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,43 @@
+structure MLtonParallelArray =
+struct
+
+  structure B = MLtonParallelBasic
+  structure F = MLtonParallelForkJoin
+
+  fun tabulate maxSeq f size =
+    let
+      (* XXX check that maxSeq is large enough to ensure atomic writes *)
+      val () = if maxSeq < 1 then raise B.Parallel "maxSeq must be at least 1" else ()
+      val a = Array.arrayUninit size
+      val () = F.reduce maxSeq
+                        (fn ((), ()) => ()) 
+                        (fn i => Array.update (a, i, f i)) 
+                        ()
+                        size
+    in
+      a
+    end
+
+  fun modify maxSeq f a =
+    let
+      (* XXX check that maxSeq is large enough to ensure atomic writes *)
+      val () = if maxSeq < 1 then raise B.Parallel "maxSeq must be at least 1" else ()
+      val () = F.reduce maxSeq
+                        (fn ((), ()) => ()) 
+                        (fn i => Array.update (a, i, 
+                                               f (i, Array.sub (a, i))))
+                        ()
+                        (Array.length a)
+    in
+      ()
+    end
+
+  (* XXX check that maxSeq is large enough to ensure atomic writes *)
+  fun arrayReduce maxSeq f g u a = F.reduce maxSeq 
+                                            f
+                                            (fn i => g (Array.sub (a, i)))
+                                            u
+                                            (Array.length a)
+  val reduce = arrayReduce
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,45 @@
+signature MLTON_PARALLEL_BASIC =
+sig
+
+  (* the empty type *)
+  type void
+  (* the type of a parallel task -- work never returns normally *)
+  type work = unit -> void
+  (* a suspended computation waiting for a value of type 'a *)
+  type 'a t
+
+  (* reify the current point of execution and then add the given work to the
+   queue; the list is assumed to be given in order of decreasing priority.  NB
+   no processor should resume this suspension until *AFTER* it has returned a
+   new set of parallel tasks. *)
+  val suspend : ('a t -> work list) -> 'a
+
+  (* end the current task and add the given computation to the queue *)
+  val resume : 'a t * 'a -> void
+
+  (* end the current task and return control to the scheduler *)
+  val return : unit -> void
+
+  (* add the given work to the queue; the list is assumed to be given in order
+   of decreasing priority.  may suspend under some scheduling policies -- the
+   implementation performs no worse than
+       addWork ws = suspend (fn k => (fn () => resume (k, ()))::ws)
+   *)
+  val addWork : work list -> unit
+
+  (* temporarily yield, but continue as scheduling policy permits.  
+     the implementation performs no worse than
+       continue f = suspend (fn k => [fn () => resume (k, f ())])
+   *)
+  val continue : (unit -> 'a) -> 'a
+
+  (* general errors related to parallelism *)
+  exception Parallel of string
+
+  (* informational *)
+  val policyName : string
+  val numberOfProcessors : int
+  val maxBytesLive : unit -> Word64.word
+  val resetBytesLive : unit -> unit
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/basic.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,107 @@
+structure MLtonParallelBasic :> MLTON_PARALLEL_BASIC =
+struct
+
+  type void = unit
+  type work = unit -> void
+
+  val numberOfProcessors = MLtonParallelInternal.numberOfProcessors
+
+  structure Q = WorkQueue (struct 
+                             type work = work 
+                             val numberOfProcessors = fn () => numberOfProcessors
+                           end) 
+    :> PARALLEL_WORKQUEUE where type work = work
+
+  val processorNumber = MLtonParallelInternal.processorNumber
+  val profileDisable = _import "GC_profileDisable": unit -> unit;
+  val profileEnable = _import "GC_profileEnable": unit -> unit;
+
+  exception Parallel of string
+
+  structure T = MLtonThread
+  type 'a t = 'a T.t
+
+  val enabled = ref true
+
+  fun schedule ws () = 
+    let 
+      fun loop p =
+          let in
+            case Q.getWork p
+             of NONE => 
+                let in
+                  (* if !enabled then (enabled := false; profileDisable ()) else (); *)
+                  ()
+                end
+              | SOME w => 
+                let in
+                  (* if not (!enabled) then (enabled := true; profileEnable ()) else (); *)
+                  w ()
+                  handle e => TextIO.output (TextIO.stdErr, 
+                                             ("WARNING: Caught exception \""
+                                              ^ (Primitive.Exn.name e) 
+                                              ^ "\" in parallel scheduler!\n"))
+                end;
+            (* NB we call processorNumber again here in case that this job has
+             been split across two processors *)
+            loop (processorNumber ())
+          end
+
+      val p = processorNumber ()
+    in
+      Q.addWork p ws;
+      Q.finishWork p;
+      loop p
+    end
+
+  fun suspend f = 
+      T.switch 
+          (fn k => 
+              let 
+                val ws = f k
+              in
+                (* Note that we cannot call addWork on this thread!  One of
+                 the ws may contain a reference to the current thread, and if
+                 that work is scheduled it would be running on the same stack
+                 as us! *)
+                (* Also, we can't just call schedule here because we need to
+                 preserve the current thread/stack.  Instead we create a new
+                 thread that will continue by calling schedule. *)
+                T.prepare (T.new (schedule ws), ())
+              end)
+
+  fun resume (k, v) = 
+      if Q.shouldYield (processorNumber ()) then
+        let in
+          schedule [fn () => T.switch (fn _ => T.prepare (k, v))] ()
+        end
+      else
+        T.switch (fn _ => T.prepare (k, v))
+
+  val return = schedule []
+
+  fun addWork ws = 
+      let 
+        val p = processorNumber ()
+      in
+        if Q.shouldYield p then
+          suspend (fn k => (fn () => resume (k, ()))::ws)
+        else
+          Q.addWork p ws
+      end
+
+  fun continue f = 
+      if Q.shouldYield (processorNumber ()) then
+        suspend (fn k => [fn () => resume (k, f ())])
+      else
+        f ()
+
+  val () = (_export "Parallel_run": (unit -> void) -> unit;) return
+  (* init MUST come after schedulerLoop has been exported *)
+  val () = (_import "Parallel_init": unit -> unit;) ()
+
+  val policyName = Q.policyName
+  val maxBytesLive = _import "Parallel_maxBytesLive": unit -> Word64.word;
+  val resetBytesLive = _import "Parallel_resetBytesLive": unit -> unit;
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,25 @@
+signature MLTON_PARALLEL_FORKJOIN =
+sig
+
+  (* run two functions, possibly in parallel, and return their results as the
+   components of a pair *)
+  val fork : (unit -> 'a) * (unit -> 'b) -> 'a * 'b
+
+  (* aggregate some intermediate results.  you can think of the arguments like
+   the multiplication and unit operations of a group, along with an injection
+   function from the integers.  reduce tabulates the integers from zero
+   (inclusive) to "length" (exclusion), injects them into the group and then
+   multiplies them up.
+     
+   "*" must be associative and the unit must really be the identity element of
+   the group.  assuming these are true, reduce behaves according to the
+   following equivalence:
+     reduce _ m j u l = List.fold m u (List.tabulate (l, fn i => j i))
+
+   maxSeq is the largest number of injections/multiplications that are run
+   sequentially on a single processor.
+*)
+           (* maxSeq       "*"            "inj"        unit  length *)
+  val reduce : int -> ('b * 'b -> 'b) -> (int -> 'b) -> 'b -> int -> 'b
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/forkjoin.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,63 @@
+structure MLtonParallelForkJoin :> MLTON_PARALLEL_FORKJOIN = 
+struct
+
+  structure B = MLtonParallelBasic
+
+  val fetchAndAdd = _import "Parallel_fetchAndAdd": Int32.int ref * Int32.int -> Int32.int;
+
+  datatype 'a result = 
+      NotYet
+    | Finished of 'a
+    | Raised of exn
+
+  fun fork (f, g) =
+      let
+        val c = ref 0
+        val l = ref NotYet
+        val r = ref NotYet
+                
+        fun wrap k h res () = 
+            let
+              val v = Finished (h ())
+                      handle e => Raised e
+              val () = res := v
+              val t = fetchAndAdd (c, 1)
+            in
+              if t = 1 then 
+                B.resume (k, (!l, !r))
+              else 
+                B.return ()
+            end
+      in
+        case B.suspend (fn k => [wrap k f l, wrap k g r])
+         of (Finished a, Finished b) => (a, b)
+          | (Raised e, _) => raise e
+          | (_, Raised e) => raise e
+          | _ => raise B.Parallel "impossible"
+      end
+
+  fun reduce maxSeq f g u n =
+    let
+      val () = if maxSeq < 1 then raise B.Parallel "maxSeq must be at least 1" else ()
+
+      fun wrap i l () =
+        if l <= maxSeq then
+           let
+             val stop = i + l
+             fun loop j v = if j = stop then v
+                            else loop (j + 1) (f (v, g j))
+           in
+             loop i u
+           end
+        else
+          let
+            val l' = l div 2
+          in
+            f (fork (wrap i l',
+                     wrap (i + l') (l - l')))
+          end
+    in
+      wrap 0 n ()
+    end
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/future.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,15 @@
+signature MLTON_PARALLEL_FUTURE =
+sig
+
+  (* a future yielding a value of type 'a *)
+  type 'a t
+
+  (* depending on the scheduling policy BOTH future and force may suspend the
+    current task. *)
+  (* create a new parallel future.  futures may be executed speculatively. *)
+  val future : (unit -> 'a) -> 'a t
+
+  (* force the execution of a future if that has not yet occured. *)
+  val force : 'a t -> 'a
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/future.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,69 @@
+structure MLtonParallelFuture :> MLTON_PARALLEL_FUTURE =
+struct
+
+  structure B = MLtonParallelBasic
+
+  val fetchAndAdd = _import "Parallel_fetchAndAdd": Int32.int ref * Int32.int -> Int32.int;
+
+  datatype 'a result =
+      NotYet 
+    | Finished of 'a
+    | Raised of exn
+
+  type 'a t = 'a result ref * 'a result B.t option ref * int ref
+
+  fun future f =
+    let 
+      val r = ref NotYet
+      val n = ref NONE
+      val c = ref 0
+      val () = B.addWork 
+        [fn () => 
+         let
+           val res = Finished (f ())
+                       handle e => Raised e
+           val () = r := res
+           val t = fetchAndAdd (c, 1)
+         in
+           if t = 0 then B.return ()
+           else B.resume (valOf (!n), res)
+         end]
+    in
+      (r, n, c)
+    end
+
+  fun force (r, n, c) =
+    let 
+      val res = if !c = 1 then
+                  B.continue (fn () => !r)
+                else
+(*
+                  B.suspend (fn k =>
+                                let 
+                                  val () = n := SOME k
+                                  val t = fetchAndAdd (c, 1)
+                                in
+                                  if t = 0 then []
+                                  else [fn () => B.resume (k, !r)]
+                                end)
+*)
+                  B.suspend (fn k =>
+                                let 
+                                  val () = n := SOME k
+                                in
+                                  [fn () => 
+                                      let 
+                                        val t = fetchAndAdd (c, 1)
+                                      in
+                                        if t = 0 then B.return () else B.resume (k, !r)
+                                      end]
+                                end)
+
+    in
+      case res of 
+        Finished v => v
+      | Raised e => raise e
+      | NotYet => raise B.Parallel "got NotYet in force!"
+    end
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/internal.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,8 @@
+structure MLtonParallelInternal =
+struct
+
+  val numberOfProcessors = Int32.toInt ((_import "Parallel_numberOfProcessors": unit -> Int32.int;) ())
+
+  val processorNumber = _import "Parallel_processorNumber": unit -> Int32.int;
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,8 @@
+signature MLTON_PARALLEL =
+sig
+
+  structure Basic : MLTON_PARALLEL_BASIC
+  structure ForkJoin : MLTON_PARALLEL_FORKJOIN
+  structure Array : MLTON_PARALLEL_ARRAY
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/parallel.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,8 @@
+structure MLtonParallel :> MLTON_PARALLEL =
+struct
+
+  structure Basic : MLTON_PARALLEL_BASIC = MLtonParallelBasic
+  structure ForkJoin : MLTON_PARALLEL_FORKJOIN = MLtonParallelForkJoin
+  structure Array : MLTON_PARALLEL_ARRAY = MLtonParallelArray
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pbf-map	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE pbfworkqueue

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pbfworkqueue.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,65 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+  type proc = int
+  type work = W.work
+
+  local
+    val lock_ = _import "Parallel_lock": int -> unit;
+    val unlock_ = _import "Parallel_unlock": int -> unit;
+  in
+  fun lock () = lock_ 0
+  fun unlock () = unlock_ 0
+  end
+
+  datatype 'a mlist = 
+     Cons of 'a * 'a mlist ref 
+   | Nil
+
+  (* initialize state *)
+  val (head, tail) = (ref Nil, ref Nil)
+
+  fun addWork _ ws = 
+    let
+      fun add w = 
+          tail := (case !tail of
+                     Cons (_, r) => 
+                     let 
+                       val n = Cons (w, ref (!r))
+                       val () = r := n
+                     in
+                       n
+                     end
+                   | Nil => 
+                     let 
+                       val n = Cons (w, ref Nil)
+                       val () = head := n
+                     in
+                       n
+                     end)
+    in
+      lock ();
+      app add ws;
+      unlock ()
+    end
+
+  fun getWork _ = 
+    let in
+      lock ();
+      case !head
+       of Nil => (unlock ();
+                   NONE)
+        | Cons (w, t) => (head := !t;
+                           case !t of Nil => tail := !t
+                                    | _ => ();
+                           unlock ();
+                           SOME w)
+    end
+
+  fun finishWork _ = ()
+
+  fun shouldYield _ = true
+
+  val policyName = "pbf"
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pdf-map	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE pdfworkqueue

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/pdfworkqueue.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,165 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+  type proc = int
+  type work = W.work
+
+  local
+    val lock_ = _import "Parallel_lock": int -> unit;
+    val unlock_ = _import "Parallel_unlock": int -> unit;
+  in
+  fun lock () = lock_ 0
+  fun unlock () = unlock_ 0
+  end
+
+  local 
+    exception Impossible
+    open TextIO
+  in
+  fun die n = (output (stdErr, 
+                       "PDFWorkQueue: die at " ^ (Int.toString n) ^ "\n"); 
+               flushOut stdErr;
+               unlock ();
+               raise Impossible)
+  end
+
+  datatype 'a mdlist = 
+     Cons of 'a mdlist ref * int ref * 'a option ref * 'a mdlist ref 
+   | Nil
+
+  (* initialize global state *)
+  val head = Cons (ref Nil, ref 0, ref NONE, ref Nil)
+
+  (* private state *)
+  structure A = Array
+  val state = A.tabulate (W.numberOfProcessors (), fn _ => head)
+
+(*
+  fun ntos MNil = "_"
+    | ntos (MCons (l, s, c, wr, r)) =
+      let 
+        val star = case !wr of SOME _ => "*" | _ => ""
+        val ls = case !l of MNil => "_"
+                          | MCons (_, ls, _, _, _) => ls
+        val rs = case !r of MNil => "_"
+                          | MCons (_, rs, _, _, _) => rs
+      in
+        (s ^ star ^ "[" ^ Int.toString (!c) ^ "](" ^ ls ^ "," ^ rs ^ ")")
+      end
+
+  fun pr nr =
+      let
+        fun loop MNil = ()
+          | loop (n as MCons (_, _, _, _, r)) =
+            let 
+            in
+              print (ntos n ^ ", ");
+              loop (!r)
+            end
+
+        val s = case !nr of (MCons (_, s, _, _, _)) => s
+                          | MNil => "nil"
+      in
+        print "hd: ";
+        loop head;
+        print ("\ncr: " ^ s ^ "\n")
+      end
+
+  local
+    val count = ref 0
+  in
+    fun next () = (Int.toString (!count))
+                  before count := !count + 1
+  end
+*)
+
+  fun addWork p ws = 
+    let 
+      fun add w =
+          case A.unsafeSub (state, p) of
+            (l as Cons (_, c, wr, rl)) =>
+            let in
+              (* Can't read wr unless we have the lock! *)
+              case !wr of 
+                (* Easy if we can replace the current node *)
+                  NONE => wr := SOME w
+                (* Need to insert to the right of the current node *)
+                | SOME _ => 
+                  let
+                    val r = !rl
+                    val n = Cons (ref l, ref 1, ref (SOME w), ref r)
+                    val () = c := !c - 1
+                    val () = A.unsafeUpdate (state, p, n)
+                    val () = rl := n
+                  in
+                    case r of
+                      Cons (lr, _, _, _) => lr := n
+                    | Nil => ()
+                  end
+          end
+          | Nil => die 1
+    in
+      lock ();
+      app add ws;
+      unlock ()
+    end 
+
+  fun getWork p = 
+    let
+      (* get assumes that its argument doesn't need its ref count 
+        decremented *)
+      fun get Nil = NONE
+        | get (n as Cons (l, c, wr, r)) = 
+          let in
+            case !wr of 
+              SOME w => (wr := NONE;
+                         A.unsafeUpdate (state, p, n);
+                         c := !c + 1;
+                         SOME w)
+            (* Otherwise, continue down the list *)
+            | _ => get (!r)
+          end
+    in
+      lock ();
+      (* leftmost => always start from the head *)
+      get head
+      before
+      unlock ()
+    end
+
+  fun finishWork p =
+      let in
+        case A.unsafeSub (state, p)
+             before A.unsafeUpdate (state, p, head) of
+          (Cons (l, c, wr, r)) =>
+          let 
+            val () = lock ()
+            (* whatever happens, our state will no longer point at this node *)
+            val c' = !c - 1
+          in
+            (* decrement the ref count *)
+            c := c';
+            case (!wr, !l, !r, c') of
+              (SOME _, _, _, _) => () (* still work to do! *)
+            | (NONE, Nil, _, _) => () (* do nothing if first node in list *)
+            | (NONE, l as Cons (_, _, _, rl), r, 0) => 
+              (* if not first and new ref count is 0 then remove *)
+              let in
+                rl := r;
+                case r of Cons (lr, _, _, _) =>
+                          lr := l
+                        | Nil => ()
+              end
+            | (NONE, _, _, _) => (); (* do nothing if ref count > 0 *)
+            unlock ()
+          end
+        | Nil => die 2
+      end
+
+  (* PERF could check to see if there are any waiting jobs earlier in the
+    queue *)
+  fun shouldYield _ = true
+
+  val policyName = "pdf"
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/sim-map	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE simpleworkqueue

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/simpleworkqueue.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,42 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+  type proc = int
+  type work = W.work
+
+  local
+    val lock_ = _import "Parallel_lock": int -> unit;
+    val unlock_ = _import "Parallel_unlock": int -> unit;
+  in
+  fun lock () = lock_ 0
+  fun unlock () = unlock_ 0
+  end
+    
+  (* initialize state *)
+  val queue = ref nil : work list ref
+
+  fun addWork _ ws = 
+    let in
+      lock ();
+      queue := ws @ (!queue);
+      unlock ()
+    end
+
+  fun getWork _ = 
+    let in
+      lock ();
+      case !queue
+        of nil => (unlock ();
+                   NONE)
+         | w::ws => (queue := ws;
+                     unlock ();
+                     SOME w)
+    end
+
+  fun finishWork _ = ()
+
+  fun shouldYield _ = true
+
+  val policyName = "sim"
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/workqueue.sig	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,23 @@
+signature PARALLEL_WORKQUEUE =
+sig
+
+  (* processor identifier *)
+  type proc = int
+  (* abstract type of work *)
+  type work
+
+  (* these take the identifier of the current processor as their first
+   argument *)
+  (* add new work to the queue *)
+  val addWork : proc -> work list -> unit
+  (* remove the next, highest priority work *)
+  val getWork : proc -> work option
+  (* mark the most recent unit of work as done *)
+  val finishWork : proc -> unit
+  (* is there higher priority work for the given processor? *)
+  val shouldYield : proc -> bool 
+
+  (* name of the current policy *)
+  val policyName : string
+
+end

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/ws-map	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1 @@
+WORK_QUEUE wsworkqueue

Added: mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml
===================================================================
--- mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml	2008-03-03 15:38:54 UTC (rev 6453)
+++ mlton/branches/shared-heap-multicore/basis-library/parallel/wsworkqueue.sml	2008-03-03 15:41:17 UTC (rev 6454)
@@ -0,0 +1,83 @@
+functor WorkQueue (W : sig type work val numberOfProcessors : unit -> int end) : PARALLEL_WORKQUEUE =
+struct
+
+  type proc = int
+  type work = W.work
+
+  val lock = _import "Parallel_lock": int -> unit;
+  val unlock = _import "Parallel_unlock": int -> unit;
+
+  local 
+    exception Impossible
+    open TextIO
+  in
+  fun die n = (output (stdErr, 
+                       "WSWorkQueue: die at " ^ (Int.toString n) ^ "\n"); 
+               flushOut stdErr;
+               (* XX unlock (); *)
+               raise Impossible)
+  end
+
+  datatype 'a mdlist = 
+     Cons of 'a mdlist ref * int ref * 'a option ref * 'a mdlist ref 
+   | Nil
+
+  structure A = Array
+  val numberOfProcessors = W.numberOfProcessors ()
+
+  (* private state *)
+  val state = A.tabulate (numberOfProcessors, fn _ => nil)
+
+  fun addWork p ws = 
+    let 
+      val () = lock p
+      val lq = A.unsafeSub (state, p)
+    in
+      A.unsafeUpdate (state, p, ws @ lq);
+      unlock p
+    end 
+
+  local
+    fun victim () = Word.toIntX (MLtonRandom.rand ()) mod numberOfProcessors
+  in
+  fun getWork p = 
+    let
+      fun steal () = 
+          let 
+            val p' = victim ()
+            val () = lock p'
+            val lq' = A.unsafeSub (state, p')
+            val l = length lq'
+          in
+            (if l > 0 then
+               let 
+                 val (ws, ws') = (List.take (lq', l - 1), List.drop (lq', l - 1))
+                 val w = case ws' of w::nil => w | _ => die 1
+               in
+                 A.unsafeUpdate (state, p', ws);
+                 SOME w
+               end
+             else NONE)
+            before unlock p'
+          end
+
+      val () = lock p
+      val lq = A.unsafeSub (state, p)
+      val res = case lq of 
+                  w::ws => (A.unsafeUpdate (state, p, ws); 
+                            SOME w 
+                            before unlock p)
+                | nil => (unlock p; 
+                          steal ())
+    in
+      res
+    end
+  end
+
+  fun finishWork p = ()
+
+  fun shouldYield _ = false
+
+  val policyName = "ws"
+
+end




More information about the MLton-commit mailing list