-- A Concurrent ML Library in Concurrent Haskell -- (including benchmarks) -- Avik Chaudhuri -- avik@cs.ucsc.edu -------------------------------------------------------------------------------- import Control.Concurrent (forkIO, threadDelay, ThreadId) import Control.Concurrent.MVar import IO import Maybe import List import Control.Monad (foldM) import Control.Monad.Fix import System.CPUTime -------------------------------------------------------------------------------- type Commit = MVar Bool type Decision = MVar (Maybe Commit) type Candidate = MVar Decision type In = MVar Candidate type Out = MVar Candidate type Channel a = (In, Out, MVar a) type Point = MVar () type Name = MVar [Point] type Abort = MVar ([Point], IO ()) type Synchronizer = MVar (Point, Decision) type Event a = Synchronizer -> Abort -> Name -> IO a -------------------------------------------------------------------------------- atchan :: In -> Out -> IO () atchan i o = do { ei <- takeMVar i; eo <- takeMVar o; si <- newEmptyMVar; putMVar ei si; ki <- takeMVar si; so <- newEmptyMVar; putMVar eo so; ko <- takeMVar so; maybe (return ()) (\ci -> putMVar ci (isJust ko)) ki; maybe (return ()) (\co -> putMVar co (isJust ki)) ko } atsync :: Synchronizer -> Abort -> IO () -> IO () atsync r a x = do { (t,s) <- takeMVar r; forkIO (fix (\z -> do { (t',s') <- takeMVar r; forkIO z; putMVar s' Nothing }) ); c <- newEmptyMVar; putMVar s (Just c); b <- takeMVar c; if b then do { putMVar t (); fix (\z -> do { (tL,f) <- takeMVar a; forkIO z; if (elem t tL) then (return ()) else f }) } else x } atpoint :: Synchronizer -> Point -> MVar Candidate -> IO a -> IO a atpoint r t l x = do { e <- newEmptyMVar; putMVar l e; s <- takeMVar e; putMVar r (t,s); takeMVar t; x } -------------------------------------------------------------------------------- spawn :: IO () -> IO ThreadId spawn = forkIO new :: IO (Channel a) new = do { i <- newEmptyMVar; o <- newEmptyMVar; forkIO (fix (\z -> do { atchan i o; z }) ); m <- newEmptyMVar; return (i,o,m) } receive :: Channel a -> Event a receive (i,o,m) = \r -> \a -> \n -> do { t <- newEmptyMVar; forkIO (putMVar n [t]); atpoint r t i (takeMVar m) } transmit :: Channel a -> a -> Event () transmit (i,o,m) y = \r -> \a -> \n -> do { t <- newEmptyMVar; forkIO (putMVar n [t]); atpoint r t o (putMVar m y) } wrap :: Event a -> (a -> IO b) -> Event b wrap v f = \r -> \a -> \n -> do { x <- v r a n; f x } choose :: [Event a] -> Event a choose vL = \r -> \a -> \n -> do { j <- newEmptyMVar; tL <- foldM (\tL -> \v -> do { n' <- newEmptyMVar; forkIO (do { x <- v r a n'; putMVar j x }); tL' <- takeMVar n'; putMVar n' tL'; return (tL' ++ tL) }) [] vL; forkIO (putMVar n tL); takeMVar j } guard :: IO (Event a) -> Event a guard vs = \r -> \a -> \n -> do { v <- vs; v r a n } wrapabort :: IO () -> Event a -> Event a wrapabort f v = \r -> \a -> \n -> do { forkIO (do { tL <- takeMVar n; putMVar n tL; putMVar a (tL, f) }); v r a n } sync :: Event a -> IO a sync v = do { j <- newEmptyMVar; forkIO (fix (\z -> do { r <- newEmptyMVar; a <- newEmptyMVar; n <- newEmptyMVar; forkIO (atsync r a z); x <- v r a n; putMVar j x })); takeMVar j } ----------- -- Example in the paper egpaper = new >>= \x -> new >>= \y -> new >>= \z -> let abortTx = putStrLn "Aborted transmit on x" in let abortRx = putStrLn "Aborted receive on x" in let abortTy = putStrLn "Aborted transmit on y" in let abortRy = putStrLn "Aborted receive on y" in let abortTz = putStrLn "Aborted transmit on z" in let abortRz = putStrLn "Aborted receive on z" in let guardTx = putStrLn "Trying transmit on x" in let guardRx = putStrLn "Trying receive on x" in let guardTy = putStrLn "Trying transmit on y" in let guardRy = putStrLn "Trying receive on y" in let guardTz = putStrLn "Trying transmit on z" in let guardRz = putStrLn "Trying receive on z" in let wrapTx = (\_ -> putStrLn "Done transmit on x") in let wrapRx = (\_ -> putStrLn "Done receive on x") in let wrapTy = (\_ -> putStrLn "Done transmit on y") in let wrapRy = (\_ -> putStrLn "Done receive on y") in let wrapTz = (\_ -> putStrLn "Done transmit on z") in let wrapRz = (\_ -> putStrLn "Done receive on z") in spawn (sync (choose [guard (guardTx >> return (wrapabort abortTx (wrap (transmit x "Mx") wrapTx))), guard (guardTy >> return (wrapabort abortTy (wrap (transmit y "My") wrapTy))) ] ) ) >> spawn (sync (choose [guard (guardRy >> return (wrapabort abortRy (wrap (receive y) wrapRy))), guard (guardRz >> return (wrapabort abortRz (wrap (receive z) wrapRz))) ] ) ) >> spawn (sync (guard (guardRx >> return (wrapabort abortRx (wrap (receive x) wrapRx))) ) ) >> sync (guard (guardTz >> return (wrapabort abortTz (wrap (transmit z "Mz") wrapTz))) ) egpaper2 = new >>= \x -> new >>= \y -> new >>= \z -> let abortTx = return () in let abortRx = return () in let abortTy = return () in let abortRy = return () in let abortTz = return () in let abortRz = return () in let guardTx = return () in let guardRx = return () in let guardTy = return () in let guardRy = return () in let guardTz = return () in let guardRz = return () in let wrapTx = (\_ -> return ()) in let wrapRx = (\_ -> return ()) in let wrapTy = (\_ -> return ()) in let wrapRy = (\_ -> return ()) in let wrapTz = (\_ -> return ()) in let wrapRz = (\_ -> return ()) in spawn (sync (choose [guard (guardTx >> return (wrapabort abortTx (wrap (transmit x ()) wrapTx))), guard (guardTy >> return (wrapabort abortTy (wrap (transmit y ()) wrapTy))) ] ) ) >> spawn (sync (choose [guard (guardRy >> return (wrapabort abortRy (wrap (receive y) wrapRy))), guard (guardRz >> return (wrapabort abortRz (wrap (receive z) wrapRz))) ] ) ) >> spawn (sync (guard (guardRx >> return (wrapabort abortRx (wrap (receive x) wrapRx))) ) ) >> sync (guard (guardTz >> return (wrapabort abortTz (wrap (transmit z ()) wrapTz))) ) -- Sieve of Erastothenes sieve = new >>= \c2 -> new >>= \c3 -> new >>= \c4 -> new >>= \c5 -> new >>= \c6 -> new >>= \c7 -> new >>= \c8 -> new >>= \c9 -> new >>= \c10 -> new >>= \c11 -> new >>= \c12 -> spawn ( sync (wrap (receive c2) (\b -> if b then putStrLn "2\n" else return ()) )) >> spawn ( sync (wrap (receive c3) (\b -> if b then putStrLn "3\n" else return ()) )) >> spawn ( sync (wrap (receive c4) (\b -> if b then putStrLn "4\n" else return ()) )) >> spawn ( sync (wrap (receive c5) (\b -> if b then putStrLn "5\n" else return ()) )) >> spawn ( sync (wrap (receive c6) (\b -> if b then putStrLn "6\n" else return ()) )) >> spawn ( sync (wrap (receive c7) (\b -> if b then putStrLn "7\n" else return ()) )) >> spawn ( sync (wrap (receive c8) (\b -> if b then putStrLn "8\n" else return ()) )) >> spawn ( sync (wrap (receive c9) (\b -> if b then putStrLn "9\n" else return ()) )) >> spawn ( sync (wrap (receive c10) (\b -> if b then putStrLn "10\n" else return ()) )) >> spawn ( sync (wrap (receive c11) (\b -> if b then putStrLn "11\n" else return ()) )) >> spawn ( sync (wrap (receive c12) (\b -> if b then putStrLn "12\n" else return ()) )) >> sync (transmit c4 False) >> sync (transmit c6 False) >> sync (transmit c8 False) >> sync (transmit c10 False) >> sync (transmit c12 False) >> sync (transmit c9 False) >> spawn ( sync (transmit c3 True) ) >> spawn ( sync (transmit c4 True) ) >> spawn ( sync (transmit c5 True) ) >> spawn ( sync (transmit c6 True) ) >> spawn ( sync (transmit c7 True) ) >> spawn ( sync (transmit c8 True) ) >> spawn ( sync (transmit c9 True) ) >> spawn ( sync (transmit c10 True) ) >> spawn ( sync (transmit c11 True) ) >> spawn ( sync (transmit c12 True) ) >> sync (transmit c2 True) sieve2 = new >>= \c2 -> new >>= \c3 -> new >>= \c4 -> new >>= \c5 -> new >>= \c6 -> new >>= \c7 -> new >>= \c8 -> new >>= \c9 -> new >>= \c10 -> new >>= \c11 -> new >>= \c12 -> spawn ( sync (wrap (receive c2) (\b -> return ()) )) >> spawn ( sync (wrap (receive c3) (\b -> return ()) )) >> spawn ( sync (wrap (receive c4) (\b -> return ()) )) >> spawn ( sync (wrap (receive c5) (\b -> return ()) )) >> spawn ( sync (wrap (receive c6) (\b -> return ()) )) >> spawn ( sync (wrap (receive c7) (\b -> return ()) )) >> spawn ( sync (wrap (receive c8) (\b -> return ()) )) >> spawn ( sync (wrap (receive c9) (\b -> return ()) )) >> spawn ( sync (wrap (receive c10) (\b -> return ()) )) >> spawn ( sync (wrap (receive c11) (\b -> return ()) )) >> spawn ( sync (wrap (receive c12) (\b -> return ()) )) >> spawn (sync (transmit c4 False)) >> spawn (sync (transmit c6 False)) >> spawn (sync (transmit c8 False)) >> spawn (sync (transmit c10 False)) >> spawn (sync (transmit c12 False)) >> spawn (sync (transmit c6 False)) >> spawn (sync (transmit c9 False)) >> spawn ( sync (transmit c3 True) ) >> spawn ( sync (transmit c4 True) ) >> spawn ( sync (transmit c5 True) ) >> spawn ( sync (transmit c6 True) ) >> spawn ( sync (transmit c7 True) ) >> spawn ( sync (transmit c8 True) ) >> spawn ( sync (transmit c9 True) ) >> spawn ( sync (transmit c10 True) ) >> spawn ( sync (transmit c11 True) ) >> spawn ( sync (transmit c12 True) ) >> sync (transmit c2 True) -- Swap channel abstraction type SwapChannel a = Channel (a, Channel a) swapchannel :: IO (SwapChannel a) swapchannel = new swap :: SwapChannel a -> a -> Event a swap ch msgOut = guard (putStrLn "Trying" >> new >>= \inCh -> return (choose [ wrap (receive ch) (\x -> let (msgIn, outCh) = x in sync (transmit outCh msgOut) >> return msgIn ), wrap (transmit ch (msgOut, inCh)) (\_ -> sync (receive inCh)) ]) ) swapfun = swapchannel >>= \x -> spawn (sync (swap x ())) >> sync (swap x ()) -- Buffered channel abstraction type Buffer a = (Channel a, Channel a) loop inCh outCh front rear = case (front, rear) of ([], []) -> sync (receive inCh) >>= \x -> loop inCh outCh [x] [] (x : front', rear) -> sync (choose [wrap (receive inCh) (\y -> loop inCh outCh (x : front') (y : rear)), wrap (transmit outCh x) (\_ -> loop inCh outCh front' rear)]) ([], rear) -> loop inCh outCh (reverse rear) [] buffer = new >>= \inCh -> new >>= \outCh -> spawn (loop inCh outCh [] []) >> return (inCh, outCh) buffertransmit buf x = let (inCh, _) = buf in transmit inCh x bufferreceive buf = let (_, outCh) = buf in receive outCh bufferfun = buffer >>= \buf -> spawn (sync (buffertransmit buf ())) >> spawn (sync (buffertransmit buf ())) >> spawn (sync (bufferreceive buf)) >> sync (bufferreceive buf) -- Main main = getCPUTime >>= \t1 -> (foldM (\_ -> \_ -> bufferfun) () [1..1000]) >> putStrLn "\n\t" >> getCPUTime >>= \t2 -> return (t2 - t1) >>= putStrLn.show