Structured Concurrent Programming
Concurrency with Free Monads
In Chapter 4 we saw an example of Asynchronous Programming with Free Monads. The principles underlying asynchronous programming are quite similar to those of concurrent programming. We can actually express a concurrent system using a free monad, and then interpret the system using either true IO-based concurrency, or simulation. This raises the potential of using the simulation to systematically (or probabilistically) explore the potential execution paths possible in the system, in order to discover race conditions.
In the following we will design a concurrency abstraction that is
rather similar to the one described in Chapter 5, based on threads,
channels, and messages. One concession we will make for simplicity is
that our channels will be monomorphic, and capable of sending only a
single type of messages, for which we pick String
:
type Msg = String
This restriction can be lifted, but requires a significant amount of Haskell type-level trickery, which is not the point of this chapter.
An Initial Attempt
Let us now define a data type CCOp
for representing concurrency
effects. We allow four effects: forking a thread, creating a channel,
sending a message on a channel, and receiving a message from a
channel. We also instantiate the Free
monad with the CCOp
effect
to produce CC
, a monadic representation of concurrency:
data CCOp a
= CCFork (CC ()) a
| CCNewChan (Chan Msg -> a)
| CCSend (Chan Msg) Msg a
| CCReceive (Chan Msg) (Msg -> a)
type CC a = Free CCOp a
Note that we are using the Control.Concurrent.Chan
type as our
channel representation - we will change that later. We must of course
also define the usual Functor
instance for CCOp
:
instance Functor CCOp where
fmap f (CCFork m c) = CCFork m (f c)
fmap f (CCNewChan c) = CCNewChan $ f . c
fmap f (CCSend chan msg c) = CCSend chan msg $ f c
fmap f (CCReceive chan c) = CCReceive chan $ f . c
And finally we define accessor functions for constructing monadic operations with these effects:
ccNewChan :: CC (Chan Msg)
ccNewChan = Free $ CCNewChan pure
ccFork :: CC () -> CC ()
ccFork m = Free $ CCFork m $ pure ()
ccSend :: Chan Msg -> Msg -> CC ()
ccSend chan msg = Free $ CCSend chan msg $ pure ()
ccReceive :: Chan Msg -> CC Msg
ccReceive chan = Free $ CCReceive chan pure
Interpreting CC
computations in IO
is quite straightforward, due
to how closely our effects match the interface provided by
Control.Concurrency
:
interpCCIO :: CC a -> IO a
interpCCIO (Pure x) =
pure x
interpCCIO (Free (CCFork m c)) = do
_ <- forkIO $ interpCCIO m
interpCCIO c
interpCCIO (Free (CCNewChan c)) = do
chan <- newChan
interpCCIO $ c chan
interpCCIO (Free (CCSend chan msg c)) = do
writeChan chan msg
interpCCIO c
interpCCIO (Free (CCReceive chan c)) = do
msg <- readChan chan
interpCCIO $ c msg
And now we can write a contrived little program that passes a message through a chain of threads, each adding a token to the message and passing it to the next thread:
pipeline :: CC String
pipeline = do
chan_0 <- ccNewChan
chan_1 <- ccNewChan
chan_2 <- ccNewChan
chan_3 <- ccNewChan
chan_4 <- ccNewChan
let passOn tok from to = do
x <- ccReceive from
ccSend to $ x ++ tok
ccFork $ passOn "a" chan_0 chan_1
ccFork $ passOn "b" chan_1 chan_2
ccFork $ passOn "c" chan_2 chan_3
ccFork $ passOn "d" chan_3 chan_4
ccSend chan_0 ""
ccReceive chan_4
Running it yields the expected results:
> interpCCIO pipeline
"abcd"
Abstract Channels
Unfortunately, the definition of concurrent effects given above is not
suitable for simulation. The reason is that the we specified that a
channel is always of type Chan Msg
, meaning it is intrinsically tied
to the interface provided by Control.Concurrent
. In order to allow
multiple interpretations of concurrency, we need to make CCOp
polymorphic in its representation of channel. To this end, we add a
type parameter chan
, which we use instead of Chan Msg
:
data CCOp chan a
= CCFork (CC chan ()) a
| CCNewChan (chan -> a)
| CCSend chan Msg a
| CCReceive chan (Msg -> a)
instance Functor (CCOp chan) where
fmap f (CCFork m c) = CCFork m (f c)
fmap f (CCNewChan c) = CCNewChan $ f . c
fmap f (CCSend chan msg c) = CCSend chan msg $ f c
fmap f (CCReceive chan c) = CCReceive chan $ f . c
In our definition of the CC
type alias, we also add chan
as a type
parameter:
type CC chan a = Free (CCOp chan) a
Now a value of type CC chan a
represents a concurrent execution that
produces a value of type a
, and uses a channel representation
chan
. When we actually write computations in CC
, we will leave
chan
polymorphic - only the interpretation functions will impose
constraints on it. This sounds a bit abstract (because it is), but
will become clear later. First, however, we have to redefine the
accessor functions, which will also have to use a chan
type
parameter:
ccNewChan :: CC chan chan
ccNewChan = Free $ CCNewChan pure
ccFork :: CC chan () -> CC chan ()
ccFork m = Free $ CCFork m $ pure ()
ccSend :: chan -> Msg -> CC chan ()
ccSend chan msg = Free $ CCSend chan msg $ pure ()
ccReceive :: chan -> CC chan Msg
ccReceive chan = Free $ CCReceive chan pure
Now we can make the pipeline
example work with our new definition.
This is quite straightforward - in fact, we only have to change the
type, and the definition can be unchanged:
pipeline :: CC chan String
pipeline = ...
A similar situation arises for the interpretation function
interpCCIO
. Here we simply require that the channel representation
is Chan Msg
, but otherwise the implementation is the same:
interpCCIO :: CC (Chan Msg) a -> IO a
interpCCIO = ...
Now consider what happens when we run the example:
> interpCCIO pipeline
"abcd"
This type checks because pipeline
has the polymorphic type CC chan String
where chan
can be instantiated with any type, and in
particular it can be instantiate with Chan Msg
- which is what
interpCCIO
requires. In this way we can write generic code that
delays the concrete choice of channel representation. Let us now
exploit this to actually write a pure interpreter for CC
.
A Pure Interpreter
The pure interpreter will more complicated than interpCCIO
, because
we cannot piggyback on the existing Haskell runtime system for
concurrency. Our approach will essentially be that of a state monad,
where we maintain the following main bits of state:
-
A collection of all channels and the messages they currently contain.
-
A collection of all threads that can be executed further.
Since channels have a notion of identity, we need a way to uniquely
identify them, which we will do with the ChanId
type:
type ChanId = Int
Each channel will be associated with a unique integer. This means we also need to have a source of fresh integers, which we will accomplish by maintaining a counter in our state.
Now we are ready to define a Haskell type encapsulating our concurrency simulator state:
data CCState = CCState
{ ccCounter :: ChanId,
ccChans :: [(ChanId, [Msg])],
ccThreads :: [CC ChanId ()]
}
The ccChans
field stores all existing channels, keyed by a ChanId
,
with each channel storing a list of messages with the oldest first.
The ccThreads
stores suspended threads, which are represented as
monadic computations of type CC ChanId ()
.
The actual monad we will use is the State
monad from Chapter
2 with CCState
as the state. When programming with
state monads it is usually a good idea to define higher-level utility
functions rather than using get
/put
directly. First we define a
function getChan
that retrieves the messages associated with a
specific channel:
getChan :: ChanId -> State CCState [Msg]
getChan chan_id = do
state <- get
pure $
fromMaybe (error "unknown channel") $
lookup chan_id $
ccChans state
And its counterpart, setChan
, that sets the messages associated with
a channel.
setChan :: ChanId -> [Msg] -> State CCState ()
setChan chan_id msgs = do
state <- get
put $
state
{ ccChans =
(chan_id, msgs)
: filter ((/= chan_id) . fst) (ccChans state)
}
It is not difficult to imagine how we will implement reading from a
channel: use getChan
to fetch the inbox, remove the first message,
and use setChan
to put back the remainder. The tricky part is how to
handle the situation when no messages are available, but we will come
back to that.
We also need to be able to add threads to the state, which is done by
addThread
.
addThread :: CC ChanId () -> State CCState ()
addThread m = do
state <- get
put $ state {ccThreads = m : ccThreads state}
And finally, incCounter
increments the counter in the state and
returns the old value. It serves as our mechanism for obtaining fresh
ChanId
s.
incCounter :: State CCState ChanId
incCounter = do
state <- get
put $ state {ccCounter = ccCounter state + 1}
pure $ ccCounter state
We can now define a function step
that evaluates a CC Int ()
computation as far as possible, meaning until it blocks or
terminates. Remember that the only way a thread can block in our
system is by trying to read from an empty channel. The step
function
is not a full interpretation function, but we will use it to build
one.
The step
function has the following type:
step :: CC Int a -> State CCState (CC ChanId a)
Note that it returns a CC ChanId a
because there is no guarantee that
it is able to run the computation to completion (which would produce
an a
). The simplest case is the one for Pure
, which represents a
finished computation for which there is nothing further to do:
step (Pure x) = pure $ Pure x
Now we need to handle the various effects. Creating a channel is done
by retrieving an unused ChanId
, then adding a channel with an
initially empty message queue:
step (Free (CCNewChan c)) = do
chan_id <- incCounter
setChan chan_id []
step $ c chan_id
Forking a thread simply adds the computation to the state with
addThread
:
step (Free (CCFork m c)) = do
addThread m
step c
The CCSend
effect is executed by appending the given message to the
specified channel, then executing the continuation:
step (Free (CCSend chan_id msg c)) = do
msgs <- getChan chan_id
setChan chan_id $ msgs ++ [msg]
step c
Finally, the most interesting effect is CCReceive
, because it can
block when the channel is empty. This is represented by simply
returning the monadic computation unchanged:
step (Free (CCReceive chan_id c)) = do
msgs <- getChan chan_id
case msgs of
[] -> pure $ Free $ CCReceive chan_id c
msg : msgs' -> do
setChan chan_id msgs'
step $ c msg
We now have a step
function for running as much of a single
computation (i.e., thread!) as possible. But if step
is stuck on a
CCReceive
, then no amount of re-running step
is going to make
progress - rather, some other thread must be given a chance to run,
which may end up putting a message in the queue of some channel that
the original thread was stuck on. To accomplish this, the function
stepThreads
invokes step
on every thread in the system. First we
write an incorrect implementation:
-- BEWARE: WRONG!
stepThreads :: State CCState ()
stepThreads = do
state <- get
threads <- mapM step $ ccThreads state
put $ state {ccThreads = threads}
At first glance, this may look right: fetch all the threads, advance
them a step, then put them back in the state. But that final put
is
a problem, because it effectively reverts any state modifications done
inside step
. We can attempt a fix:
-- BEWARE: STILL WRONG!
stepThreads :: State CCState ()
stepThreads = do
state <- get
threads <- mapM step $ ccThreads state
new_state <- get
put $ new_state {ccThreads = threads}
This is still wrong. While we now maintain some state modifications
done in step
, we completely overwrite the list of threads. This
means that if step
creates any new threads, they are thrown away.
Let us try again:
-- BEWARE: STILL WRONG!
stepThreads :: State CCState ()
stepThreads = do
state <- get
threads <- mapM step $ ccThreads state
new_state <- get
put $ new_state {ccThreads = threads ++ ccThreads new_state}
Instead of overwriting the list of threads, we not simply prepend to
it. But now we end up duplicating the threads, since the original
threads from state
(the ones we pass to step
) are still present in
new_state
. A correct solution requires is to remove the threads
from the state before we step them:
stepThreads :: State CCState ()
stepThreads = do
state <- get
put $ state {ccThreads = []}
threads <- mapM step $ ccThreads state
new_state <- get
put $ new_state {ccThreads = threads ++ ccThreads new_state}
The point of this progression through incorrect implementations of
stepThreads
was not to demonstrate how to write incorrect code, but
as an illustration of the subtleties of working with mutable state.
Now we can write a function that evaluates a "main thread" (the one
producing the ultimate execution result), but first uses stepThreads
to advance all "background threads".
interp :: CC ChanId a -> State CCState a
interp (Pure x) = pure x
interp (Free op) = do
stepThreads
op' <- step $ Free op
interp op'
As soon as the "main thread" reaches the Pure
constructor, the
system is considered terminated. This is different from
Control.Concurrency
, which has no notion of a "main thread", and
where forked threads can continue to do arbitrary side effects
forever.
The final bit of machinery we need is is a bit of boilerplate for running our state monad with an initial state, and projecting out the result we care about:
interpCCPure :: CC ChanId a -> a
interpCCPure orig =
fst $ runState initial_state $ interp orig
where
initial_state =
CCState
{ ccCounter = 0,
ccChans = [],
ccThreads = []
}
And we can see that things work:
> interpCCPure pipeline
"abcd"
One limitation of this approach, which is not present in interpCCIO
,
is that we cannot handle infinite loops in pure code. We are only
able to switch between threads when step
encounters an effect.
Remaining Issues
Although the interpreter developed above works in some cases, it has undesirable behaviour for others. Consider a program that creates a channel and thread that runs an infinite loop that continuously writes to the channel. The main thread reads twice from the channel, concatenates the messages, and returns:
infiniteWrite :: CC chan String
infiniteWrite = do
chan <- ccNewChan
ccFork $ forever $ ccSend chan "x"
a <- ccReceive chan
b <- ccReceive chan
pure $ a ++ b
Using the IO-based interpreter, we observe the following result, as expected:
> interpCCIO infiniteWrite
"xx"
Since the main thread terminates in finite time, it doesn't matter
that some secondary thread is still running (although it's not great
that it continues to run and causes the channel to grow infinitely -
it would be better if interpCCIO
kept track of launched threads and
killed them at the end).
Now let us try running the program using the pure interpreter:
> interpCCPure infiniteWrite
You will find that this goes into an infinite loop. This occurs due to
stepThreads
, which will evaluate every thread with step
until the
next time it blocks on a channel. But since the thread we have forked
will never read from a channel, step
will continue recursively
evaluating it forever - preventing any other thread from being
executed. To fix this, we modify the CCSend
case in step
such that
we do not evaluate the continuation, but merely return it:
step (Free (CCSend chan_id msg c)) = do
msgs <- getChan chan_id
setChan chan_id $ msgs ++ [msg]
pure c -- This line was prevously 'step c'.
This does not prevent the computation from progressing, since we still
do some work whenever step
is invoked, and step
is anyway
invoked repeatedly in a loop.
Now computation terminates as expected:
> interpCCPure infiniteWrite
"xx"
But consider now this program:
infiniteLoop :: CC chan String
infiniteLoop = do
chan <- ccNewChan
ccFork $ forever $ pure ()
ccFork $ ccSend chan "x"
ccReceive chan
Here we fork two threads: one goes into an infinite loop that does
nothing, while the other thread sends a message on a channel. The main
thread receives a message from a channel, which is returned. Execution
with interpCCIO
works fine, but interpCCPure
goes into an infinite
loop. In this case, however, the loop is in pure code, rather than
being an infinite list of effects. This means that we have no way of
interrupting it. Operationally, step
is stuck on evaluating the
(never terminating) thread to figure out which kind of effect it
evaluates to.
There is a deep lesson here: when interpreting a free monad, the only
time we can "interrupt" computation and get back control is when an
effect occurs, so there is no way we can avoid this problem in
interpCCPure
. The IO-based concurrency from Control.Concurrency
uses machine-level interrupts to suspend even pure computation, and
therefore does not have this problem. If we could somehow force
computations to issue an effect from time to time, this would not be a
problem. In fact, one could easily imagine that Haskell itself could
have implemented concurrency by injecting "suspension effects" into
the generated code. However, Haskell does not allow us to inject
effects into otherwise pure code. If we have control over how the
computations using the free monad are constructed, we can of course
ensure that effects occur regularly - we could even imagine a "step"
effect that serves no purpose except to interrupt computation.