CMSC 330, Fall 2009
Organization of Programming Languages
Lecture Notes on Concurrent ML (CML)
In class so far, we have seen how to build concurrent programs using multi threads, which are lightweight tasks that run in a common address space. Threads have a number of interesting features:
We spent considerable time talking about how to make threaded programming safe. The key issue was that in the model we have seen so far, threads communicate via shared memory; one thread writes a value to memory that another thread reads. Because threads might access the same data "simultaneously" and may write to memory, we saw that we could have data races. In order to prevent data races, we need to add synchronization to our program by acquiring and releases locks.
As you either know already or will see in Project 6, using locks correctly is tricky. We need to be sure to always use locks when we access shared data; we need to make sure the regions in which locks are held are large enough, to achieve atomicity where we need; and we need to watch out for deadlock if we use more than one lock. We also talked about how locking is not very modular/compositional.
In this lecture, we're going to talk about a different style of concurrent programming that uses message passing to communicate among threads. To learn about message passing concurrency, we're going to study Concurrent ML (CML), which has some really cool abstractions for building concurrent programs with message passing. There also happens to be a library implementing CML-style programming in OCaml.
Threads in OCaml
There are several possible ways to get access to the threads library in OCaml. One of the easier ways is to create a new OCaml top level that has the thread library baked in. You can do that with the command
ocamlmktop -thread unix.cma threads.cma -o mytopThis creates an executable mytop that you can run as
./mytop -I +threadsNote that you will still need to refer to the thread modules using either the dot notation (e.g., Thread.create) or open.
In CML, threads communicate on synchronous channels. By "synchronous," I mean that when one thread sends a message on a channel, it blocks until another thread reads the sent value from the channel. It's kind of like a one-place producer-consumer buffer where the producer waits for the consumer before continuing. Also, in CML, channels are typed, so that each channel carries a certain kind of value.
Let's begin by showing how to create two threads that communicate with each other. First, we need to know how to create threads. Here is the relevant function:
Thread.create : ('a -> 'b) -> 'a -> Thread.t
Thread.create takes an arbitrary function and an argument for that function, and starts a new thread that begins with the execution of that function on that argument. This is just like thread creation in Ruby. For example:
# let foo () = print_string "Hello, world!\n";; val foo : unit -> unit =Here the printing of Hello, world! happens in another thread. That thread finished when the function that's called (in this case, foo) returns. Notice that when we create the thread, we pass to Thread.create both the function foo and its parameter ().
In order for threads to communicate in CML, we need to create a channel for them to talk on. We'll also define a couple of utility functions for sending and receiving; we'll explain these later. Here are the types and definitions:
open Event new_channel : unit -> 'a channel let post c x = sync (send c x) (* send x on channel c *) let accept c = sync (receive c) (* receive on channel c *)
To create a new channel, we call Event.new_channel, which returns a fresh channel. Then to send a message across a channel, we use post, and to receive a message, we use accept. These functions are defined in terms of Event.sync, Event.receive, and Event.send. Let's see an example in action.
open Thread open Event let post c x = sync (send c x) (* send x on channel c *) let accept c = sync (receive c) (* receive on channel c *) let consumer c () = let s = accept c in print_string s; flush stdout let producer c () = post c "Hello, world!\n"; ;; let c = new_channel ();; ignore (create (consumer c) ());; ignore (create (producer c) ());;
The consumer function takes a channel and the unit argument, gets a message on that channel, and then prints it. The producer takes a channel and the unit argument, and sends the string Hello, world! on that channel. Then the program creates a new channel, launches the closure consumer c in a new thread, passing it the argument (), and then launches producer c in a new thread. The result is that the message sent from one thread to another is printed. There are three things to notice about this example:
First class channels and servers
Just like in the producer-consumer pattern, there can be more than one thread sending or receiving on a channel at a time. In this case, it's unpredictable which thread will "win" the race and be the first one to write/read. To avoid problems with this, most CML programs use many different channels, each for a very specific, narrow purpose. Fortunately, channels in CML are first-class values, i.e., you can pass them around the program, or even send them over other channels. You can use this to write "server" threads that respond to requests from other threads. Here's an example.
Let's consider the problem of writing a multi-threaded bank simulation. One thread will represent the bank, which will accept requests to take various actions on an account from other threads. To keep things simple, we will only have one account, and we'll just keep track of the balance. Here is a simple version of such a system:
open Event open Thread let accept c = sync (receive c) let post c x = sync (send c x) let balance = ref 0 let rec bank c = let (amt, c') = accept c in balance := amt + !balance; post c' (!balance); bank c let c = new_channel ();; create bank c;; (* start up the bank thread *) let d = new_channel ();; post c (10, d);; accept d;; post c (10, d);; accept d;;
The account balance is stored in the global variable balance, which we will ensure is only ever accessed by the bank thread. The behavior of that thread is defined by the bank function, which is an infinite (tail-recursive) loop that does the following: First, it listens on the channel c for a message containing the amount to deposit and the response channel to communicate on. When it receives such a message, it increments the balance and then sends the new balance back to the other thread over the supplied channel. We exercise this program by starting up a bank thread, and then sending two deposit requests to it, listening for the response.
In anticipation of the next section, also notice that instead of making balance a ref, we could also carry its value around the loop:
let rec bank c balance = let (amt, c') = accept c in let new_balance = balance + amt in post c' new_balance; bank c new_balance
This is kind of nice, because this way we can be sure that balance cannot be directly affected by any other threads. (Of course, we could also have made the ref local to the definition of bank, but that would be slightly messier.)
Emulating Shared Memory with Channels
In essence, the above code gives us a way to have a piece of updatable memory shared between threads: One thread (in this case, the bank thread) is the only one that can actually touch the shared memory. When other threads want to get to that memory, they communicate with the bank thread.
We can generalize this pattern to build an updatable cell type that can hold an arbitrary value, and that can respond to different requests to set and get the cell's value. Here's the code:
open Event open Thread let accept c = sync (receive c) let post c x = sync (send c x) type 'a cmd = Get of 'a channel | Set of 'a let rec cell c v = match (accept c) with | Get c' -> post c' v; cell c v | Set v' -> cell c v' let c = new_channel ();; create (cell c) 0;; let d = new_channel ();; post c (Get d);; accept d;; post c (Set 42);; post c (Get d);; accept d;;
The "server" code is defined by the cell function. Since there are multiple requests we want this data type to respond to, we create a type 'a cmd that describes the two possible requests: either getting a value, in which case you have to supply a channel for the value to come back on; or setting a value, in which case you just supply the value. The cell function just listens on the given channel, responding to Get and Set requests appropriately.
This is kind of neat. Notice that we've actually done another reduction here: We've shown how to implement a shared ref using message passing; this is a hint that message passing and ordinary shared memory concurrency are equally expressive. You should think about what else it would take to fully show that the two are basically equivalent.
In all the examples we've done so far, communication between threads is pretty simple, and is always done with accept and post. However, there is one important limitation: we cannot listen on more than one channel at a time. We might want to do this if we have different private communication channels with different threads, and we can't predict which one we will hear from next.
CML solves this problem by introducing a new abstraction, that of events, defined by the type
open Event type 'a event
A 'a event represents a "suspended" communication that is waiting to happen. Here are three core functions for working with events:
send : 'a channel -> 'a -> unit event receive : 'a channel -> 'a event sync : 'a event -> 'a
The send function takes a channel and a value to send over the channel, and produces an event e of type unit event. Even after we call send, the communication represented by e has not yet occurred. That doesn't happen until we call sync e, which actually fires off the event. Similarly, receive takes a channel and returns an event representing receiving on the channel. When we call receive, it returns immediately, because it doesn't carry out the communication yet. It is not until we sync the return value of receive that we block, waiting for a communication on the channel.
The definitions of accept and post we gave above call receive and send and then immediately sync the result to carry out the communication. However, what is interesting about events is that we don't have to do that. We can create events and then compose them, waiting to sync until later.
The key reason we want to do this is to allow us to listen on multiple channels at once. CML includes a function choose:
choose : 'a event list -> 'a event
This function takes a list of events, and returns a new event that represents the alternative of the events in the list. When you sync on the result of a choose, CML tries all of the events, and whichever one completes first is actually carried out, and the rest are aborted. For example, consider the following code:
open Thread open Event let c = new_channel ();; let d = new_channel ();; let e1 = receive c;; let e2 = receive d;; let e = choose [e1; e2];; sync e
This code creates two channels, makes events representing receives on those channels, and then syncs on an event representing receiving either on c or d. The call sync e will block until either something is received on c or on d. (If something is received on both channels, only one will "win", and the other communication will not take place.)
There are also several other utility methods for working with events. We'll just mention three here.
An Example: Cells with Multiple Channels
As an example use of events, let's revisit the cell example from above. Instead of having a single channel that accepts multiple commands, let's recode it to have two channels, each used for one command.
open Event open Thread let rec cell get_c set_c v = let e1 = receive get_c in let handle_get ch = sync (send ch v); cell get_c set_c v in let e1' = wrap e1 handle_get in let e2 = receive set_c in let handle_set v' = cell get_c set_c v' in let e2' = wrap e2 handle_set in sync (choose [e1'; e2']) let g = new_channel ();; let s = new_channel ();; create (cell g s) 0;; let d = new_channel ();; sync (send g d);; sync (receive d);; sync (send s 42);; sync (send g d);; sync (receive d);;
Now the cell function takes two channels as input, one for get requests and one for set requests. It creates two events, e1' and e2'. The event e1' first receives on the get channel and then calls handle_get on the result, which sends the value and recursively calls cell. The event e2' first receives on the set channel and similarly does a recursive call to cell. Then the body of cell. chooses among the two events.
The Flexibility of Events
It turns out that most message passing systems have a capability similar to choose. For example, Unix has a select function to see which file descriptors (typically, sockets) are available for communication:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout);
When you call select, you pass in sets of file descriptions for reading (readfds), writing (writefds), and that may have exceptional conditions (errorfds). The call to select blocks until at least one of those file descriptors is ready. When it returns, it tells you how many descriptors are ready. You can also specify an optional timeout value.
One nice thing about using events is that we don't need to have yucky function signatures like above, where we have to specialize select to take exactly three lists of file descriptors, one for each kind of communication we might do on a descriptor. With events, we just have a single choose function that takes an arbitrary list of events, which is cleaner.
Another benefit that events have over the Unix select call is that you can dynamically compose events. For example, you could do something like
let e = choose [e1; e2; e3];; let e' = chose [e4; e5; e6];; let e'' = choose [e; e'];;
Here when we call choose, we create a new event rather than blocking. This means that we can use that event in other choose calls (or other wrap, guard, etc calls) as we like.
In this lecture, we've seen a new way of building up multi-threaded programs: by dividing them into separate small tasks that communicate via message passing. It turns out that CML is not the only library for message passing. A much more popular system is MPI, a Message Passing Interface commonly used in scientific computing. MPI doesn't quite look like CML, but the idea of coordinating threads via message passing is the same in both.
One of the nice things about using message passing is that we're forced, much more deliberately, to structure our program around inter-thread communication events. In "ordinary" concurrency with locks, we have separate threads of computation running, and every once in a while we access something shared, and then we have to remember to hold locks. The concurrent structure of the computation is often somewhat hard to see in such programs. In message passing programs the concurrent can often be a little more explicit, because you build your threads as event listeners: threads typically listen on a channel, do some computation, and then send something on a channel to finish, and then either loop back to the start or terminate if their computation is complete.
Despite all of this work being well-established, the jury is still out on which approach is "better." The answer is really that both have pluses and minuses, and the choice of which to use really depends on the application. And, of course, as you saw above we can essentially implement each style within the other (creating abstractions like cells to do standard shared memory concurrency in message passing, or using wait/notify to do message passing in shared memory concurrency). When you write concurrent programs in the future, you should think about which approach is most natural for the program at hand, and use the style of concurrent programming that will make it easiest to solve your problem.