-- A Concurrent ML Library in Concurrent Haskell -- (extended with guarded communication primitives) -- Avik Chaudhuri -- avik@cs.ucsc.edu -------------------------------------------------------------------------------- import Control.Concurrent (forkIO, threadDelay, ThreadId) import Control.Concurrent.MVar import IO import Maybe import List import Control.Monad (foldM) import Control.Monad.Fix -------------------------------------------------------------------------------- type Commit = MVar Bool type Decision = MVar (Maybe Commit) type Candidate = MVar (Maybe Decision) type In a = MVar (Candidate, a -> Bool) type Out a = MVar (Candidate, a) type Channel a = (In a, Out a, MVar a) type Point = MVar () type Name = MVar [Point] type Abort = MVar ([Point], IO ()) type Synchronizer = MVar (Point, Decision) type Event a = Synchronizer -> Abort -> Name -> IO a -------------------------------------------------------------------------------- atchan :: In a -> Out a -> IO () atchan i o = do { (ei,patt) <- takeMVar i; (eo,y) <- takeMVar o; if (patt y) then do { si <- newEmptyMVar; putMVar ei (Just si); ki <- takeMVar si; so <- newEmptyMVar; putMVar eo (Just so); ko <- takeMVar so; maybe (return ()) (\ci -> putMVar ci (isJust ko)) ki; maybe (return ()) (\co -> putMVar co (isJust ki)) ko } else do { putMVar ei Nothing; putMVar ei Nothing; atchan i o } } atsync :: Synchronizer -> Abort -> IO () -> IO () atsync r a x = do { (t,s) <- takeMVar r; forkIO (fix (\z -> do { (t',s') <- takeMVar r; forkIO z; putMVar s' Nothing }) ); c <- newEmptyMVar; putMVar s (Just c); b <- takeMVar c; if b then do { putMVar t (); fix (\z -> do { (tL,f) <- takeMVar a; forkIO z; if (elem t tL) then (return ()) else f }) } else x } atpointI :: Synchronizer -> Point -> In a -> (a -> Bool) -> IO a -> IO a atpointI r t i patt x = do { e <- newEmptyMVar; putMVar i (e,patt); ms <- takeMVar e; maybe (atpointI r t i patt x) (\s -> do { putMVar r (t,s); takeMVar t; x } ) ms } atpointO :: Synchronizer -> Point -> Out a -> a -> IO () -> IO () atpointO r t o y x = do { e <- newEmptyMVar; putMVar o (e,y); ms <- takeMVar e; maybe (atpointO r t o y x) (\s -> do { putMVar r (t,s); takeMVar t; x } ) ms } -------------------------------------------------------------------------------- spawn :: IO () -> IO ThreadId spawn = forkIO new :: IO (Channel a) new = do { i <- newEmptyMVar; o <- newEmptyMVar; forkIO (fix (\z -> do { atchan i o; z }) ); m <- newEmptyMVar; return (i,o,m) } receive :: Channel a -> (a -> Bool) -> Event a receive (i,o,m) patt = \r -> \a -> \n -> do { t <- newEmptyMVar; forkIO (putMVar n [t]); atpointI r t i patt (takeMVar m) } transmit :: Channel a -> a -> Event () transmit (i,o,m) y = \r -> \a -> \n -> do { t <- newEmptyMVar; forkIO (putMVar n [t]); atpointO r t o y (putMVar m y) } wrap :: Event a -> (a -> IO b) -> Event b wrap v f = \r -> \a -> \n -> do { x <- v r a n; f x } choose :: [Event a] -> Event a choose vL = \r -> \a -> \n -> do { j <- newEmptyMVar; tL <- foldM (\tL -> \v -> do { n' <- newEmptyMVar; forkIO (do { x <- v r a n'; putMVar j x }); tL' <- takeMVar n'; putMVar n' tL'; return (tL' ++ tL) }) [] vL; forkIO (putMVar n tL); takeMVar j } guard :: IO (Event a) -> Event a guard vs = \r -> \a -> \n -> do { v <- vs; v r a n } wrapabort :: IO () -> Event a -> Event a wrapabort f v = \r -> \a -> \n -> do { forkIO (do { tL <- takeMVar n; putMVar n tL; putMVar a (tL, f) }); v r a n } sync :: Event a -> IO a sync v = do { j <- newEmptyMVar; forkIO (fix (\z -> do { r <- newEmptyMVar; a <- newEmptyMVar; n <- newEmptyMVar; forkIO (atsync r a z); x <- v r a n; putMVar j x })); takeMVar j } ----------- main = new >>= \x -> new >>= \y -> new >>= \z -> let abortTx = putStrLn "Aborted transmit on x" in let abortRx = putStrLn "Aborted receive on x" in let abortTy = putStrLn "Aborted transmit on y" in let abortRy = putStrLn "Aborted receive on y" in let abortTz = putStrLn "Aborted transmit on z" in let abortRz = putStrLn "Aborted receive on z" in let guardTx = putStrLn "Trying transmit on x" in let guardRx = putStrLn "Trying receive on x" in let guardTy = putStrLn "Trying transmit on y" in let guardRy = putStrLn "Trying receive on y" in let guardTz = putStrLn "Trying transmit on z" in let guardRz = putStrLn "Trying receive on z" in let wrapTx = (\_ -> putStrLn "Done transmit on x") in let wrapRx = (\_ -> putStrLn "Done receive on x") in let wrapTy = (\_ -> putStrLn "Done transmit on y") in let wrapRy = (\_ -> putStrLn "Done receive on y") in let wrapTz = (\_ -> putStrLn "Done transmit on z") in let wrapRz = (\_ -> putStrLn "Done receive on z") in spawn (sync (choose [guard (guardTx >> return (wrapabort abortTx (wrap (transmit x "Mx") wrapTx))), guard (guardTy >> return (wrapabort abortTy (wrap (transmit y "My") wrapTy))) ] ) ) >> spawn (sync (choose [guard (guardRy >> return (wrapabort abortRy (wrap (receive y (\_ -> True)) wrapRy))), guard (guardRx >> return (wrapabort abortRx (wrap (receive x (\_ -> True)) wrapRx))) ] ) ) >> spawn (sync (guard (guardRz >> return (wrapabort abortRz (wrap (receive z (\_ -> True)) wrapRz))) ) ) >> sync (guard (guardTz >> return (wrapabort abortTz (wrap (transmit z "Mz") wrapTz))) )