(* A Concurrent ML Library in Concurrent Haskell (benchmarks) * * Avik Chaudhuri * avik@cs.ucsc.edu *) open Thread open Event let spawn (f: unit -> unit) = ignore (create f ()) (* Example in the paper *) let egpaper (_: unit) = let (x: unit channel) = new_channel () in let (y: unit channel) = new_channel () in let (z: unit channel) = new_channel () in let abortTx = fun (_: unit) -> print_string "Aborted transmit on x\n" in let abortRx = fun (_: unit) -> print_string "Aborted receive on x\n" in let abortTy = fun (_: unit) -> print_string "Aborted transmit on y\n" in let abortRy = fun (_: unit) -> print_string "Aborted receive on y\n" in let abortTz = fun (_: unit) -> print_string "Aborted transmit on z\n" in let abortRz = fun (_: unit) -> print_string "Aborted receive on z\n" in let guardTx = fun (_: unit) -> print_string "Trying transmit on x\n" in let guardRx = fun (_: unit) -> print_string "Trying receive on x\n" in let guardTy = fun (_: unit) -> print_string "Trying transmit on y\n" in let guardRy = fun (_: unit) -> print_string "Trying receive on y\n" in let guardTz = fun (_: unit) -> print_string "Trying transmit on z\n" in let guardRz = fun (_: unit) -> print_string "Trying receive on z\n" in let wrapTx = fun (_: unit) -> print_string "Done transmit on x\n" in let wrapRx = fun (_: unit) -> print_string "Done receive on x\n" in let wrapTy = fun (_: unit) -> print_string "Done transmit on y\n" in let wrapRy = fun (_: unit) -> print_string "Done receive on y\n" in let wrapTz = fun (_: unit) -> print_string "Done transmit on z\n" in let wrapRz = fun (_: unit) -> print_string "Done receive on z\n" in let _ = spawn (fun (_: unit) -> sync (choose [guard (fun (_: unit) -> guardTx (); wrap_abort (wrap (send x ()) wrapTx) abortTx ); guard (fun (_: unit) -> guardTy (); wrap_abort (wrap (send y ()) wrapTy) abortTy ) ])) in let _ = spawn (fun (_: unit) -> sync (choose [guard (fun (_: unit) -> guardRy (); wrap_abort (wrap (receive y) wrapRy) abortRy ); guard (fun (_: unit) -> guardRz (); wrap_abort (wrap (receive z) wrapRz) abortRz ) ])) in let _ = spawn (fun (_: unit) -> sync (guard (fun (_: unit) -> guardRx (); wrap_abort (wrap (receive x) wrapRx) abortRx ) )) in sync (guard (fun (_: unit) -> guardTz (); wrap_abort (wrap (send z ()) wrapTz) abortTz )) let egpaper2 (_: unit) = let (x: unit channel) = new_channel () in let (y: unit channel) = new_channel () in let (z: unit channel) = new_channel () in let abortTx = fun (_: unit) -> () in let abortRx = fun (_: unit) -> () in let abortTy = fun (_: unit) -> () in let abortRy = fun (_: unit) -> () in let abortTz = fun (_: unit) -> () in let abortRz = fun (_: unit) -> () in let guardTx = fun (_: unit) -> () in let guardRx = fun (_: unit) -> () in let guardTy = fun (_: unit) -> () in let guardRy = fun (_: unit) -> () in let guardTz = fun (_: unit) -> () in let guardRz = fun (_: unit) -> () in let wrapTx = fun (_: unit) -> () in let wrapRx = fun (_: unit) -> () in let wrapTy = fun (_: unit) -> () in let wrapRy = fun (_: unit) -> () in let wrapTz = fun (_: unit) -> () in let wrapRz = fun (_: unit) -> () in let _ = spawn (fun (_: unit) -> sync (choose [guard (fun (_: unit) -> guardTx (); wrap_abort (wrap (send x ()) wrapTx) abortTx ); guard (fun (_: unit) -> guardTy (); wrap_abort (wrap (send y ()) wrapTy) abortTy ) ])) in let _ = spawn (fun (_: unit) -> sync (choose [guard (fun (_: unit) -> guardRy (); wrap_abort (wrap (receive y) wrapRy) abortRy ); guard (fun (_: unit) -> guardRz (); wrap_abort (wrap (receive z) wrapRz) abortRz ) ])) in let _ = spawn (fun (_: unit) -> sync (guard (fun (_: unit) -> guardRx (); wrap_abort (wrap (receive x) wrapRx) abortRx ) )) in sync (guard (fun (_: unit) -> guardTz (); wrap_abort (wrap (send z ()) wrapTz) abortTz )) (* Sieve of Erastothenes *) let sieve (_: unit) = let (c2 : bool channel) = new_channel () in let (c3 : bool channel) = new_channel () in let (c4 : bool channel) = new_channel () in let (c5 : bool channel) = new_channel () in let (c6 : bool channel) = new_channel () in let (c7 : bool channel) = new_channel () in let (c8 : bool channel) = new_channel () in let (c9 : bool channel) = new_channel () in let (c10 : bool channel) = new_channel () in let (c11 : bool channel) = new_channel () in let (c12 : bool channel) = new_channel () in spawn (fun (_: unit) -> sync (wrap (receive c2) (fun b -> if b then (print_int 2; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c3) (fun b -> if b then (print_int 3; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c4) (fun b -> if b then (print_int 4; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c5) (fun b -> if b then (print_int 5; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c6) (fun b -> if b then (print_int 6; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c7) (fun b -> if b then (print_int 7; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c8) (fun b -> if b then (print_int 8; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c9) (fun b -> if b then (print_int 9; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c10) (fun b -> if b then (print_int 10; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c11) (fun b -> if b then (print_int 11; print_newline ()) else ()) )); spawn (fun (_: unit) -> sync (wrap (receive c12) (fun b -> if b then (print_int 12; print_newline ()) else ()) )); spawn (fun _ -> sync (send c4 false)); spawn (fun _ -> sync (send c6 false)); spawn (fun _ -> sync (send c8 false)); spawn (fun _ -> sync (send c10 false)); spawn (fun _ -> sync (send c12 false)); spawn (fun _ -> sync (send c9 false)); spawn (fun (_: unit) -> sync (send c3 true) ); spawn (fun (_: unit) -> sync (send c4 true) ); spawn (fun (_: unit) -> sync (send c5 true) ); spawn (fun (_: unit) -> sync (send c6 true) ); spawn (fun (_: unit) -> sync (send c7 true) ); spawn (fun (_: unit) -> sync (send c8 true) ); spawn (fun (_: unit) -> sync (send c9 true) ); spawn (fun (_: unit) -> sync (send c10 true) ); spawn (fun (_: unit) -> sync (send c11 true) ); spawn (fun (_: unit) -> sync (send c12 true) ); sync (send c2 true) let sieve2 (_: unit) = let (c2 : bool channel) = new_channel () in let (c3 : bool channel) = new_channel () in let (c4 : bool channel) = new_channel () in let (c5 : bool channel) = new_channel () in let (c6 : bool channel) = new_channel () in let (c7 : bool channel) = new_channel () in let (c8 : bool channel) = new_channel () in let (c9 : bool channel) = new_channel () in let (c10 : bool channel) = new_channel () in let (c11 : bool channel) = new_channel () in let (c12 : bool channel) = new_channel () in spawn (fun (_: unit) -> sync (wrap (receive c2) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c3) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c4) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c5) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c6) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c7) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c8) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c9) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c10) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c11) (fun b -> ()) )); spawn (fun (_: unit) -> sync (wrap (receive c12) (fun b -> ()) )); spawn (fun _ -> sync (send c4 false)); spawn (fun _ -> sync (send c6 false)); spawn (fun _ -> sync (send c8 false)); spawn (fun _ -> sync (send c10 false)); spawn (fun _ -> sync (send c12 false)); spawn (fun _ -> sync (send c9 false)); spawn (fun (_: unit) -> sync (send c3 true) ); spawn (fun (_: unit) -> sync (send c4 true) ); spawn (fun (_: unit) -> sync (send c5 true) ); spawn (fun (_: unit) -> sync (send c6 true) ); spawn (fun (_: unit) -> sync (send c7 true) ); spawn (fun (_: unit) -> sync (send c8 true) ); spawn (fun (_: unit) -> sync (send c9 true) ); spawn (fun (_: unit) -> sync (send c10 true) ); spawn (fun (_: unit) -> sync (send c11 true) ); spawn (fun (_: unit) -> sync (send c12 true) ); sync (send c2 true) (* Swap channel abstraction *) type 'a swap = ('a * 'a channel) channel let swapchannel () : 'a swap = new_channel () let swap ch msgOut = guard (fun _ -> let _ = print_string "Trying\n" in let inCh = new_channel () in choose [ wrap (receive ch) (fun x -> let (msgIn, outCh) = x in sync (send outCh msgOut); msgIn ); wrap (send ch (msgOut, inCh)) (fun _ -> sync (receive inCh)) ]) let swapfun () = let x = swapchannel () in spawn (fun _ -> sync (swap x ())); sync (swap x ()) (* Buffered channel abstraction *) type 'a buffer = ('a channel * 'a channel) let rec loop inCh outCh front rear = match (front, rear) with ([], []) -> loop inCh outCh [sync (receive inCh)] [] | (x :: front', rear) -> sync (choose [wrap (receive inCh) (fun y -> loop inCh outCh (x :: front') (y :: rear)); wrap (send outCh x) (fun _ -> loop inCh outCh front' rear)]) | ([], rear) -> loop inCh outCh (List.rev rear) [] let buffer () = let inCh = new_channel () in let outCh = new_channel () in spawn (fun _ -> loop inCh outCh [] []); (inCh, outCh) let buffersend buf x = let (inCh, _) = buf in send inCh x let bufferreceive buf = let (_, outCh) = buf in receive outCh let bufferfun () = let buf = buffer () in spawn (fun _ -> sync (buffersend buf ())); spawn (fun _ -> sync (buffersend buf ())); spawn (fun _ -> sync (bufferreceive buf)); sync (bufferreceive buf) (* Main *) let main = let t1 = Sys.time () in for i = 1 to 100 do bufferfun () done; let t2 = Sys.time () in print_float (t2 -. t1)