Concurrent Programming
Haskell offers a wide range of facilities to support for different styles of concurrent and parallel programming. In this chapter we focus on a limited set of primitives for concurrent programming which can often be found in similar form in other modern programming languages. We use:
-
Lightweight independent threads of control.
Lightweight means that it is not a big concern to keep the number of threads low (up to a couple of hundred thousand is usually fine).
-
Channels for communicating between threads.
The approach we take to concurrent programming in AP is based on the paradigm of message passing, and is heavily inspired by languages such as Erlang. The idea is to structure concurrent programs as independent servers (a.k.a. concurrent objects or actors in other languages and frameworks) that communicate with each other by sending asynchronous messages. Each server maintains its own private state, and servers cannot directly modify the state of other servers, except by sending messages.
Although the concurrent systems we will construct in AP will run only on a single machine (and within a single Haskell process), the approach is perfectly suitable for distributed systems, as seen in for example Cloud Haskell, or languages such as Erlang and Elixir.
Concurrency is closely related (but not the same as) parallelism. While multi-threading in Haskell is indeed one way to take advantage of parallel multi-core computers, this is not an explicit aspect of our study of concurrency. Instead, we focus on concurrency as a programming model which happens to be convenient for expressing certain forms of event-driven systems.
Concurrent programming in Haskell is done in the IO
monad because
threads (can) have effects. Effects from multiple threads are
interleaved nondeterministically at runtime.
Concurrent programming allows programs that interact with multiple external agents to be modular:
- The interaction with each agent is programmed separately
- Allows programs to be structured as a collection of interacting agents, sometimes called actors or (mini) servers.
This chapter is about a principled and systematic way of constructing concurrent programs as a collection of interacting servers.
Basics Primitives
We use the Haskell modules Control.Concurrent and Control.Concurrent.Chan. The latter is implicitly re-exported by the former. As always, we will use only a fairly small subset of the facilities provided by the Haskell standard library. You are welcome (and encouraged) to peruse the documentation to enlighten yourself, but in this particular case you should be careful not to be tempted by functions that subvert the notion of message-passing.
We will not use these primitives directly. Instead we wrap
these primitive in the module Genserver
, and in the rest of this
note we use the Genserver
module to write our servers.
Implementation of the Genserver
module
Assume that we have the following import and type alias:
import qualified Control.Concurrent as CC
type Chan a = CC.Chan a
Servers
The basic requirement of concurrency is to be able to fork a new
thread of control. In Haskell we do this with the forkIO
operation:
forkIO :: IO () -> IO ThreadId
The function forkIO
takes a computation of type IO ()
as its
argument; that is, a computation in the IO
monad that eventually
returns a value of type ()
. The computation passed to forkIO
is
executed in a new thread that runs concurrently with the other
threads in the system.
The ThreadId
that is returned is an opaque handle to the thread. We
can use it to perform a few low-level operations, such as killing the
thread or waiting for it to finish, but this will not be the main way
we interact with threads.
However, we want a canonical way to communicate with our
servers. Thus, we introduce the notion of a server, we represent a
server as a pair: a ThreadId
and an input channel:
type Server message = (CC.ThreadId, Chan message)
Here we use the type variable message
to denote the type of messages
that a server can receive, which can be different for each kind of server.
spawn :: (Chan a -> t -> IO ()) -> t -> IO (Server a)
spawn server initial = do
input <- CC.newChan
tid <- CC.forkIO $ server input initial
return (tid, input)
Channels
Conceptually, a channel is an unbounded queue of messages. Writing to a channel is an asynchronous operation - it immediately and always succeeds. Reading from a channel retrieves the oldest message in the channel. If the channel is empty, reading blocks until a message is available.
send :: Chan a -> a -> IO ()
send chan msg =
CC.writeChan chan msg
sendTo :: Server a -> a -> IO ()
sendTo (_tid, input) msg =
send input msg
receive :: Chan a -> IO a
receive = CC.readChan
Single-reader principle: we adopt the rule that a channel may have
only a single reader, meaning only a single thread is allowed to
call readChan
on any given channel. This is typically the thread
that we created the channel for. This is not enforced by the Haskell
type system, and there are indeed forms of concurrent programming
that are more flexible, but they are outside the scope of this note.
It is perfectly acceptable (and often necessary) for a channel to have multiple writers.
If we call readChan
on a channel where we hold the only reference
(meaning we would in principle wait forever), the Haskell runtime
system will raise an exception that will cause the thread to be
terminated. This is a natural and safe way to shut down a thread that
is no longer necessary, assuming the thread does not hold resources
(e.g., open files) that must be manually closed. Handling such cases
is outside the scope of these notes.
Request-Reply Pattern
requestReply :: Server a -> (Chan b -> a) -> IO b
requestReply serv con = do
reply_chan <- CC.newChan
sendTo serv $ con reply_chan
receive reply_chan
Method
-
Determine what data (the state) the server should keep track of, declare a type for this.
-
Determine the interface for the server, that is a set of functions:
- The type of each function
- If the function is blocking or non-blocking
-
Declare a message type
-
Implement a server-loop function
-
Implement API functions
Timeouts
The channel abstraction does not directly support timeouts for RPC calls. However, we can build our own support for timeouts. The technique we employ is to allow the reply to be either the intended value or a special timeout value. When we perform an RPC, we then also launch a new thread that sleeps for some period of time, then write the timeout value to the channel. If the non-timeout response is the first to arrive, then the timeout value is ignored and harmless.
First we must import the threadDelay
function.
import Control.Concurrent (threadDelay)
Then we define a type Timeout
with a single value Timeout
.
data Timeout = Timeout
Then we define a message type (in this case polymorphic in a
) where
the reply channel accepts messages of type Either Timeout a
.
data Msg a = MsgDoIt (Chan (Either Timeout a)) (IO a)
A Msg a
denotes a request to perform some impure operation IO a
(perhaps a network request), then reply with the resulting value of
type a
.
We can use this to build a facility for performing an action with a timeout:
actionWithTimeout :: Int -> IO a -> IO (Either Timeout a)
actionWithTimeout seconds action = do
reply_chan <- newChan
_ <- forkIO $ do -- worker thread
x <- action
writeChan reply_chan $ Right x
_ <- forkIO $ do -- timeout thread
threadDelay (seconds * 1000000)
writeChan reply_chan $ Left Timeout
readChan reply_chan
You will note that this is not a server in the usual sense, as it does not loop: it simply launches two threads.
One downside of this function is that the worker thread (the one that
runs action
, and might take too long) is not terminated after the
timeout. This is a problem if it is, for example, stuck in an infinite
loop that consumes ever more memory. To fix this, we can have the
timeout thread explicitly kill the worker thread. First we have to
import the killThread
function.
import Control.Concurrent (killThread)
Then we can use it as follows.
actionWithTimeout2 :: Int -> IO a -> IO (Either Timeout a)
actionWithTimeout2 seconds action = do
reply_chan <- newChan
worker_tid <- forkIO $ do
-- worker thread
x <- action
writeChan reply_chan $ Right x
_ <- forkIO $ do
-- timeout thread
threadDelay (seconds * 1000000)
killThread worker_tid
writeChan reply_chan $ Left Timeout
readChan reply_chan
Note that killing a thread is a dangerous operation in general. It may be the case that the worker thread is stuck in some loop or waiting for a network request, in which case it is harmless, but killing it may also leave some shared state in an unspecified state. We will (hopefully) not encounter such cases in AP, but it is something to be aware of in the future.