-- A Concurrent ML Library in Concurrent Haskell -- Avik Chaudhuri -- avik@cs.ucsc.edu -------------------------------------------------------------------------------- import Control.Concurrent (forkIO, threadDelay, ThreadId) import Control.Concurrent.MVar import IO import Maybe import List import Control.Monad (foldM) import Control.Monad.Fix -------------------------------------------------------------------------------- type Commit = MVar Bool type Decision = MVar (Maybe Commit) type Candidate = MVar Decision type In = MVar Candidate type Out = MVar Candidate type Channel a = (In, Out, MVar a) type Point = MVar () type Name = MVar [Point] type Abort = MVar ([Point], IO ()) type Synchronizer = MVar (Point, Decision) type Event a = Synchronizer -> Abort -> Name -> IO a -------------------------------------------------------------------------------- atchan :: In -> Out -> IO () atchan i o = do { ei <- takeMVar i; eo <- takeMVar o; si <- newEmptyMVar; putMVar ei si; ki <- takeMVar si; so <- newEmptyMVar; putMVar eo so; ko <- takeMVar so; maybe (return ()) (\ci -> putMVar ci (isJust ko)) ki; maybe (return ()) (\co -> putMVar co (isJust ki)) ko } atsync :: Synchronizer -> Abort -> IO () -> IO () atsync r a x = do { (t,s) <- takeMVar r; forkIO (fix (\z -> do { (t',s') <- takeMVar r; forkIO z; putMVar s' Nothing }) ); c <- newEmptyMVar; putMVar s (Just c); b <- takeMVar c; if b then do { putMVar t (); fix (\z -> do { (tL,f) <- takeMVar a; forkIO z; if (elem t tL) then (return ()) else f }) } else x } atpoint :: Synchronizer -> Point -> MVar Candidate -> IO a -> IO a atpoint r t l x = do { e <- newEmptyMVar; putMVar l e; s <- takeMVar e; putMVar r (t,s); takeMVar t; x } -------------------------------------------------------------------------------- spawn :: IO () -> IO ThreadId spawn = forkIO new :: IO (Channel a) new = do { i <- newEmptyMVar; o <- newEmptyMVar; forkIO (fix (\z -> do { atchan i o; z }) ); m <- newEmptyMVar; return (i,o,m) } receive :: Channel a -> Event a receive (i,o,m) = \r -> \a -> \n -> do { t <- newEmptyMVar; forkIO (putMVar n [t]); atpoint r t i (takeMVar m) } transmit :: Channel a -> a -> Event () transmit (i,o,m) y = \r -> \a -> \n -> do { t <- newEmptyMVar; forkIO (putMVar n [t]); atpoint r t o (putMVar m y) } wrap :: Event a -> (a -> IO b) -> Event b wrap v f = \r -> \a -> \n -> do { x <- v r a n; f x } choose :: [Event a] -> Event a choose vL = \r -> \a -> \n -> do { j <- newEmptyMVar; tL <- foldM (\tL -> \v -> do { n' <- newEmptyMVar; forkIO (do { x <- v r a n'; putMVar j x }); tL' <- takeMVar n'; putMVar n' tL'; return (tL' ++ tL) }) [] vL; forkIO (putMVar n tL); takeMVar j } guard :: IO (Event a) -> Event a guard vs = \r -> \a -> \n -> do { v <- vs; v r a n } wrapabort :: IO () -> Event a -> Event a wrapabort f v = \r -> \a -> \n -> do { forkIO (do { tL <- takeMVar n; putMVar n tL; putMVar a (tL, f) }); v r a n } sync :: Event a -> IO a sync v = do { j <- newEmptyMVar; forkIO (fix (\z -> do { r <- newEmptyMVar; a <- newEmptyMVar; n <- newEmptyMVar; forkIO (atsync r a z); x <- v r a n; putMVar j x })); takeMVar j } ----------- main = new >>= \x -> new >>= \y -> new >>= \z -> let abortTx = putStrLn "Aborted transmit on x" in let abortRx = putStrLn "Aborted receive on x" in let abortTy = putStrLn "Aborted transmit on y" in let abortRy = putStrLn "Aborted receive on y" in let abortTz = putStrLn "Aborted transmit on z" in let abortRz = putStrLn "Aborted receive on z" in let guardTx = putStrLn "Trying transmit on x" in let guardRx = putStrLn "Trying receive on x" in let guardTy = putStrLn "Trying transmit on y" in let guardRy = putStrLn "Trying receive on y" in let guardTz = putStrLn "Trying transmit on z" in let guardRz = putStrLn "Trying receive on z" in let wrapTx = (\_ -> putStrLn "Done transmit on x") in let wrapRx = (\_ -> putStrLn "Done receive on x") in let wrapTy = (\_ -> putStrLn "Done transmit on y") in let wrapRy = (\_ -> putStrLn "Done receive on y") in let wrapTz = (\_ -> putStrLn "Done transmit on z") in let wrapRz = (\_ -> putStrLn "Done receive on z") in spawn (sync (choose [guard (guardTx >> return (wrapabort abortTx (wrap (transmit x "Mx") wrapTx))), guard (guardTy >> return (wrapabort abortTy (wrap (transmit y "My") wrapTy))) ] ) ) >> spawn (sync (choose [guard (guardRy >> return (wrapabort abortRy (wrap (receive y) wrapRy))), guard (guardRz >> return (wrapabort abortRz (wrap (receive z) wrapRz))) ] ) ) >> spawn (sync (guard (guardRx >> return (wrapabort abortRx (wrap (receive x) wrapRx))) ) ) >> sync (guard (guardTz >> return (wrapabort abortTz (wrap (transmit z "Mz") wrapTz))) )