2011-11-14 22:10:43 +00:00
|
|
|
-- create a similar concept than a unix pipe.
|
2012-12-05 07:57:13 +00:00
|
|
|
module PipeChan
|
2013-07-21 06:00:35 +00:00
|
|
|
( PipeChan(..)
|
|
|
|
, newPipe
|
|
|
|
, runPipe
|
|
|
|
, readPipeA
|
|
|
|
, readPipeB
|
|
|
|
, writePipeA
|
|
|
|
, writePipeB
|
|
|
|
) where
|
2011-11-14 22:10:43 +00:00
|
|
|
|
|
|
|
import Control.Applicative
|
|
|
|
import Control.Concurrent.Chan
|
|
|
|
import Control.Concurrent
|
|
|
|
import Control.Monad (forever)
|
|
|
|
import Data.ByteString (ByteString)
|
|
|
|
import Data.IORef
|
|
|
|
import qualified Data.ByteString as B
|
|
|
|
|
|
|
|
-- | represent a unidirectional pipe with a buffered read channel and a write channel
|
|
|
|
data UniPipeChan = UniPipeChan (Chan ByteString) (Chan ByteString)
|
|
|
|
|
|
|
|
newUniPipeChan = UniPipeChan <$> newChan <*> newChan
|
|
|
|
|
|
|
|
runUniPipe (UniPipeChan r w) = forkIO $ forever $ readChan r >>= writeChan w
|
|
|
|
|
|
|
|
getReadUniPipe (UniPipeChan r _) = r
|
|
|
|
getWriteUniPipe (UniPipeChan _ w) = w
|
|
|
|
|
|
|
|
-- | Represent a bidirectional pipe with 2 nodes A and B
|
|
|
|
data PipeChan = PipeChan (IORef ByteString) (IORef ByteString) UniPipeChan UniPipeChan
|
|
|
|
|
|
|
|
newPipe = PipeChan <$> newIORef B.empty <*> newIORef B.empty <*> newUniPipeChan <*> newUniPipeChan
|
|
|
|
|
|
|
|
runPipe (PipeChan _ _ cToS sToC) = runUniPipe cToS >> runUniPipe sToC
|
|
|
|
|
|
|
|
readPipeA (PipeChan _ b _ s) sz = readBuffered b (getWriteUniPipe s) sz
|
|
|
|
writePipeA (PipeChan _ _ c _) = writeChan $ getWriteUniPipe c
|
|
|
|
|
|
|
|
readPipeB (PipeChan b _ c _) sz = readBuffered b (getWriteUniPipe c) sz
|
|
|
|
writePipeB (PipeChan _ _ _ s) = writeChan $ getReadUniPipe s
|
|
|
|
|
|
|
|
-- helper to read buffered data.
|
|
|
|
readBuffered buf chan sz = do
|
2013-07-21 06:00:35 +00:00
|
|
|
left <- readIORef buf
|
|
|
|
if B.length left >= sz
|
|
|
|
then do
|
|
|
|
let (ret, nleft) = B.splitAt sz left
|
|
|
|
writeIORef buf nleft
|
|
|
|
return ret
|
|
|
|
else do
|
|
|
|
let newSize = (sz - B.length left)
|
|
|
|
newData <- readChan chan
|
|
|
|
writeIORef buf newData
|
|
|
|
remain <- readBuffered buf chan newSize
|
|
|
|
return (left `B.append` remain)
|