375 lines
12 KiB
Markdown
375 lines
12 KiB
Markdown
|
I want to discuss two limitations in standard Haskell libraries around
|
||
|
concurrency, and discuss methods of improving the status quo. Overall,
|
||
|
Haskell's [concurrency](https://haskell-lang.org/library/async)
|
||
|
[story](https://haskell-lang.org/library/stm) is - in my opinion - the
|
||
|
best in class versus any other language I'm aware of, at least for the
|
||
|
single-machine use case. The following are two issues that I run into
|
||
|
fairly regularly and are a surprising wart:
|
||
|
|
||
|
* `putStrLn` is not thread-safe
|
||
|
* Channels cannot be closed
|
||
|
|
||
|
Let me back up these claims, and then ask for some feedback on how to
|
||
|
solve them.
|
||
|
|
||
|
## `putStrLn` is not thread-safe
|
||
|
|
||
|
The example below is, in my opinion, a prime example of beautiful
|
||
|
concurrency in Haskell:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Monad (replicateM_)
|
||
|
|
||
|
worker :: Int -> IO ()
|
||
|
worker num = replicateM_ 5 $ putStrLn $ "Hi, I'm worker #" ++ show num
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
mapConcurrently worker [1..5]
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
Well, it's beautiful until you see the (abridged) output:
|
||
|
|
||
|
```
|
||
|
Hi, HIiH'HH,imii , ,,I w 'IoIIm'r'' mkmmw e owrwwro ookr#rrek2kkre
|
||
|
ee rrr# H 3#i##
|
||
|
4,51
|
||
|
```
|
||
|
|
||
|
Your mileage may vary of course. The issue here is that
|
||
|
`Prelude.putStrLn` works on `String`, which is a lazy list of `Char`s,
|
||
|
and in fact sends one character at a time to `stdout`. This is clearly
|
||
|
_not_ what we want. However, at the same time, many Haskellers -
|
||
|
myself included - consider `String`-based I/O a bad choice anyway. So
|
||
|
let's replace this with `Text`-based I/O:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Monad (replicateM_)
|
||
|
import qualified Data.Text as T
|
||
|
import qualified Data.Text.IO as T
|
||
|
|
||
|
worker :: Int -> IO ()
|
||
|
worker num = replicateM_ 5
|
||
|
$ T.putStrLn
|
||
|
$ T.pack
|
||
|
$ "Hi, I'm worker #" ++ show num
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
mapConcurrently worker [1..5]
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
Unfortunately, if you run this (at least via `runghc`), the results
|
||
|
are the same. If you
|
||
|
[look at the implementation of `Data.Text.IO.hPutStr`](https://www.stackage.org/haddock/lts-7.9/text-1.2.2.1/src/Data.Text.IO.html#hPutStr),
|
||
|
you'll see that there are different implementations of that function
|
||
|
depending on the buffering straregy of the `Handle` we're writing
|
||
|
to. In the case of `NoBuffering` (which is the default with GHCi and
|
||
|
`runghc`), this will output one character at a time (just like
|
||
|
`String`), whereas `LineBuffering` and `BlockBuffering` have batch
|
||
|
behavior. You can see this with:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Monad (replicateM_)
|
||
|
import qualified Data.Text as T
|
||
|
import qualified Data.Text.IO as T
|
||
|
import System.IO
|
||
|
|
||
|
worker :: Int -> IO ()
|
||
|
worker num = replicateM_ 5
|
||
|
$ T.putStrLn
|
||
|
$ T.pack
|
||
|
$ "Hi, I'm worker #" ++ show num
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
hSetBuffering stdout LineBuffering
|
||
|
mapConcurrently worker [1..5]
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
While better, this still isn't perfect:
|
||
|
|
||
|
```
|
||
|
Hi, I'm worker #4Hi, I'm worker #5Hi, I'm worker #1
|
||
|
|
||
|
|
||
|
Hi, I'm worker #4Hi, I'm worker #5Hi, I'm worker #1
|
||
|
|
||
|
|
||
|
Hi, I'm worker #4Hi, I'm worker #5
|
||
|
```
|
||
|
|
||
|
Unfortunately, because
|
||
|
[newlines are written to stdout separately from the message](https://www.stackage.org/haddock/lts-7.9/text-1.2.2.1/src/Data.Text.IO.html#hPutStrLn),
|
||
|
these kinds of issues happen too frequently. This can be worked around
|
||
|
too by using `putStr` instead and manually appending a newline
|
||
|
character:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Monad (replicateM_)
|
||
|
import qualified Data.Text as T
|
||
|
import qualified Data.Text.IO as T
|
||
|
import System.IO
|
||
|
|
||
|
worker :: Int -> IO ()
|
||
|
worker num = replicateM_ 5
|
||
|
$ T.putStr
|
||
|
$ T.pack
|
||
|
$ "Hi, I'm worker #" ++ show num ++ "\n"
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
hSetBuffering stdout LineBuffering
|
||
|
mapConcurrently worker [1..5]
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
Finally, we can avoid the buffering-dependent code in the text package
|
||
|
and use `ByteString` output, which has the advantage of automatically
|
||
|
using this append-a-newline logic for small-ish `ByteString`s:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Monad (replicateM_)
|
||
|
import qualified Data.ByteString.Char8 as S8
|
||
|
|
||
|
worker :: Int -> IO ()
|
||
|
worker num = replicateM_ 100
|
||
|
$ S8.putStrLn
|
||
|
$ S8.pack
|
||
|
$ "Hi, I'm worker #" ++ show num
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
mapConcurrently worker [1..100]
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
However, this has the downside of assuming a certain character
|
||
|
encoding, which may be different from the encoding of the `Handle`.
|
||
|
|
||
|
__What I'd like__ I would like a function `Text -> IO ()` which -
|
||
|
regardless of buffering strategy - appends a newline to the `Text`
|
||
|
value and sends the entire chunk of data to a `Handle` in a
|
||
|
thread-safe manner. Ideally it would account for character encoding
|
||
|
(though assuming UTF8 may be an acceptable compromise for most use
|
||
|
cases), and it would be OK if very large values are occassionally
|
||
|
compromised during output (due to the `write` system call not
|
||
|
accepting the entire chunk at once).
|
||
|
|
||
|
__What I'd recommend today__ In a number of my smaller
|
||
|
applications/scripts, I've become accustomed to defining a `say =
|
||
|
BS.hPutStrLn stdout . encodeUtf8`. I'm tempted to add this to a
|
||
|
library - possibly even `classy-prelude` - along with either
|
||
|
reimplementing `print` as `print = say . T.pack . show` (or providing
|
||
|
an alternative to `print`). I've also considered replacing the `putStrLn` in
|
||
|
`classy-prelude` with this implementation of `say`.
|
||
|
|
||
|
However, I'm hoping others have some better thoughts on this, because
|
||
|
I don't really find these solutions very appealing.
|
||
|
|
||
|
## Non-closable channels
|
||
|
|
||
|
Let's implement a very simple multi-worker application with
|
||
|
communication over a `Chan`:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Concurrent
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Monad (forever)
|
||
|
import Data.Text (Text, pack)
|
||
|
import Data.Text.Encoding (encodeUtf8)
|
||
|
import qualified Data.ByteString.Char8 as S8
|
||
|
|
||
|
say :: Text -> IO ()
|
||
|
say = S8.putStrLn . encodeUtf8
|
||
|
|
||
|
worker :: Chan Int -> Int -> IO ()
|
||
|
worker chan num = forever $ do
|
||
|
i <- readChan chan
|
||
|
say $ pack $ concat
|
||
|
[ "Worker #"
|
||
|
, show num
|
||
|
, " received value "
|
||
|
, show i
|
||
|
]
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
chan <- newChan
|
||
|
mapConcurrently (worker chan) [1..5] `concurrently`
|
||
|
mapM_ (writeChan chan) [1..10]
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
(Yes, I used the aforementioned `say` function.)
|
||
|
|
||
|
This looks all well and good, but check out the end of the output:
|
||
|
|
||
|
```haskell
|
||
|
Worker #5 received value 8
|
||
|
Worker #3 received value 9
|
||
|
Worker #1 received value 10
|
||
|
Main: thread blocked indefinitely in an MVar operation
|
||
|
```
|
||
|
|
||
|
You see, the worker threads have no way of knowing that there are no more `writeChan` calls incoming, so they continue to block. The runtime system notes this, and sends them an async exception to kill them. This is [a really bad idea for program structure](https://www.fpcomplete.com/blog/2016/06/async-exceptions-stm-deadlocks) as it can easily lead to deadlocks. Said more simply:
|
||
|
|
||
|
![If you rely on exceptions for non-exceptional cases, you're gonna have a bad time](https://i.sli.mg/eX0QY1.jpg)
|
||
|
|
||
|
Instead, the workers should have some way of knowing that the channel
|
||
|
is closed. This is a common pattern in other languages, and one I
|
||
|
think we should borrow. Implementing this with STM isn't too bad
|
||
|
actually, and can easily have an `IO`-based API if desired:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Applicative ((<|>))
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Concurrent.STM
|
||
|
import Data.Text (Text, pack)
|
||
|
import Data.Text.Encoding (encodeUtf8)
|
||
|
import qualified Data.ByteString.Char8 as S8
|
||
|
|
||
|
say :: Text -> IO ()
|
||
|
say = S8.putStrLn . encodeUtf8
|
||
|
|
||
|
data TCChan a = TCChan (TChan a) (TVar Bool)
|
||
|
|
||
|
newTCChan :: IO (TCChan a)
|
||
|
newTCChan = atomically $ TCChan <$> newTChan <*> newTVar False
|
||
|
|
||
|
closeTCChan :: TCChan a -> IO ()
|
||
|
closeTCChan (TCChan _ var) = atomically $ writeTVar var True
|
||
|
|
||
|
writeTCChan :: TCChan a -> a -> IO ()
|
||
|
writeTCChan (TCChan chan var) val = atomically $ do
|
||
|
closed <- readTVar var
|
||
|
if closed
|
||
|
-- Could use nicer exception types, or return a Bool to
|
||
|
-- indicate if writing failed
|
||
|
then error "Wrote to a closed TCChan"
|
||
|
else writeTChan chan val
|
||
|
|
||
|
readTCChan :: TCChan a -> IO (Maybe a)
|
||
|
readTCChan (TCChan chan var) = atomically $
|
||
|
(Just <$> readTChan chan) <|> (do
|
||
|
closed <- readTVar var
|
||
|
check closed
|
||
|
return Nothing)
|
||
|
|
||
|
worker :: TCChan Int -> Int -> IO ()
|
||
|
worker chan num =
|
||
|
loop
|
||
|
where
|
||
|
loop = do
|
||
|
mi <- readTCChan chan
|
||
|
case mi of
|
||
|
Nothing -> return ()
|
||
|
Just i -> do
|
||
|
say $ pack $ concat
|
||
|
[ "Worker #"
|
||
|
, show num
|
||
|
, " received value "
|
||
|
, show i
|
||
|
]
|
||
|
loop
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
chan <- newTCChan
|
||
|
mapConcurrently (worker chan) [1..5] `concurrently` do
|
||
|
mapM_ (writeTCChan chan) [1..10]
|
||
|
closeTCChan chan
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
Fortunately, this problem has a preexisting solution: the
|
||
|
[stm-chans package](https://www.stackage.org/package/stm-chans), which
|
||
|
provides closable and bounded channels and queues. Our problem above
|
||
|
can be more easily implemented with:
|
||
|
|
||
|
```haskell
|
||
|
#!/usr/bin/env stack
|
||
|
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text --package stm-chans
|
||
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
import Control.Concurrent.Async
|
||
|
import Control.Concurrent.STM
|
||
|
import Control.Concurrent.STM.TMQueue
|
||
|
import Data.Text (Text, pack)
|
||
|
import Data.Text.Encoding (encodeUtf8)
|
||
|
import qualified Data.ByteString.Char8 as S8
|
||
|
|
||
|
say :: Text -> IO ()
|
||
|
say = S8.putStrLn . encodeUtf8
|
||
|
|
||
|
worker :: TMQueue Int -> Int -> IO ()
|
||
|
worker q num =
|
||
|
loop
|
||
|
where
|
||
|
loop = do
|
||
|
mi <- atomically $ readTMQueue q
|
||
|
case mi of
|
||
|
Nothing -> return ()
|
||
|
Just i -> do
|
||
|
say $ pack $ concat
|
||
|
[ "Worker #"
|
||
|
, show num
|
||
|
, " received value "
|
||
|
, show i
|
||
|
]
|
||
|
loop
|
||
|
|
||
|
main :: IO ()
|
||
|
main = do
|
||
|
q <- newTMQueueIO
|
||
|
mapConcurrently (worker q) [1..5] `concurrently` do
|
||
|
mapM_ (atomically . writeTMQueue q) [1..10]
|
||
|
atomically $ closeTMQueue q
|
||
|
return ()
|
||
|
```
|
||
|
|
||
|
__What I'd like__ The biggest change needed here is just to get
|
||
|
knowledge of this very awesome `stm-chans` package out there
|
||
|
more. That could be with blog posts, or even better with links from
|
||
|
the `stm` package itself. A step up from there could be to include
|
||
|
this functionality in the `stm` package itself. Another possible
|
||
|
niceity would be to add a non-STM API for these - whether based on STM
|
||
|
or MVars internally - for more ease of use. I may take a first step
|
||
|
here by simply depending on and reexporting `stm-chans` from
|
||
|
`classy-prelude`.
|
||
|
|
||
|
__What I'd recommend__ Probably pretty obvious: use `stm-chans`!
|
||
|
|
||
|
Like the previous point though, I'm interested to see how other people
|
||
|
have approached this problem, since I haven't heard it discussed much
|
||
|
in the past. Either others haven't run into this issue as frequently
|
||
|
as I have, everyone already knows about `stm-chans`, or there's some
|
||
|
other solution people prefer.
|