Skip to content

Commit

Permalink
Add parDemuxScan
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Aug 27, 2024
1 parent c96ece8 commit 81f4790
Showing 1 changed file with 143 additions and 7 deletions.
150 changes: 143 additions & 7 deletions src/Streamly/Internal/Data/Fold/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ module Streamly.Internal.Data.Fold.Concurrent
, parPartition
, parUnzipWithM
, parDistributeScan
, parDemuxScan
)
where

Expand All @@ -72,6 +73,7 @@ import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)

import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Fold as Fold

import Streamly.Internal.Data.Fold.Channel.Type
Expand Down Expand Up @@ -289,6 +291,14 @@ parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (parEval cfg c1) (parEval cfg c2)
-- 2. A monolithic implementation of concurrent Stream->Stream scan, using a
-- custom implementation of the scan and the driver.

finalize :: MonadIO m => Channel m a b -> m ()
finalize chan = do
liftIO $ void
$ sendEvent
(inputQueue chan)
(inputItemDoorBell chan)
ChildStopChannel

{-# ANN type ScanState Fuse #-}
data ScanState s q db f =
ScanInit
Expand Down Expand Up @@ -343,13 +353,6 @@ parDistributeScan cfg getFolds (Stream sstep state) =
processOutputs chans r []
else return (chans, [])

finalize chan = do
liftIO $ void
$ sendEvent
(inputQueue chan)
(inputItemDoorBell chan)
ChildStopChannel

step _ ScanInit = do
q <- liftIO $ newIORef ([], 0)
db <- liftIO $ newEmptyMVar
Expand Down Expand Up @@ -403,3 +406,136 @@ parDistributeScan cfg getFolds (Stream sstep state) =
return $ Skip (ScanDrain q db running)
else return $ Yield outputs (ScanDrain q db running)
step _ ScanStop = return Stop

{-# ANN type DemuxState Fuse #-}
data DemuxState s q db f =
DemuxInit
| DemuxGo s q db f
| DemuxDrain q db f
| DemuxStop

-- XXX We need to either (1) remember a key when done so that we do not add the
-- fold again because some inputs would be lost in between, or (2) have a
-- FoldYield constructor to yield repeatedly so that we can restart the
-- existing fold itself when it is done. But in that case we cannot change the
-- fold once it is started. Whatever we do we should keep the non-concurrent
-- fold as well consistent with that.

-- | Evaluate a stream and send its outputs to the selected fold. The fold is
-- dynamically selected using a key at the time of the first input seen for
-- that key. Any new fold is added to the list of folds which are currently
-- running. If there are no folds available for a given key, the input is
-- discarded. If a fold completes its output is emitted in the output of the
-- scan.
--
-- >>> import qualified Data.Map.Strict as Map
-- >>> import Data.Maybe (fromJust)
-- >>> f1 = ("even", Fold.take 2 Fold.sum)
-- >>> f2 = ("odd", Fold.take 2 Fold.sum)
-- >>> kv = Map.fromList [f1, f2]
-- >>> getFold k = return (fromJust $ Map.lookup k kv)
-- >>> getKey x = if even x then "even" else "odd"
-- >>> input = Stream.enumerateFromTo 1 10
-- >>> Stream.toList $ Fold.parDemuxScan id getKey getFold input
-- ...
--
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
(Config -> Config)
-> (a -> k)
-> (k -> m (Fold m a b))
-> Stream m a
-> Stream m [(k, b)]
parDemuxScan cfg getKey getFold (Stream sstep state) =
Stream step DemuxInit

where

-- XXX can be written as a fold
processOutputs keyToChan events done = do
case events of
[] -> return (keyToChan, done)
(x:xs) ->
case x of
FoldException _tid ex -> do
-- XXX report the fold that threw the exception
let chans = fmap snd $ Map.toList keyToChan
liftIO $ mapM_ (`throwTo` ThreadAbort) (fmap snd chans)
mapM_ cleanup (fmap fst chans)
liftIO $ throwM ex
FoldDone _tid o@(k, _) ->
let ch = Map.delete k keyToChan
in processOutputs ch xs (o:done)

collectOutputs qref keyToChan = do
(_, n) <- liftIO $ readIORef qref
if n > 0
then do
r <- fmap fst $ liftIO $ readOutputQBasic qref
processOutputs keyToChan r []
else return (keyToChan, [])

step _ DemuxInit = do
q <- liftIO $ newIORef ([], 0)
db <- liftIO $ newEmptyMVar
return $ Skip (DemuxGo state q db Map.empty)

step gst (DemuxGo st q db keyToChan) = do
-- Collect outputs from running channels
(keyToChan1, outputs) <- collectOutputs q keyToChan

-- Send input to the selected fold
res <- sstep (adaptState gst) st

next <- case res of
Yield x s -> do
-- XXX If the fold for a particular key is done and we see that
-- key again. If we have not yet collected the done event we
-- cannot restart the fold because the previous key is already
-- installed. Thererfore, restarting the fold for the same key
-- fraught with races.
let k = getKey x
(keyToChan2, ch) <-
case Map.lookup k keyToChan1 of
Nothing -> do
fld <- getFold k
r@(chan, _) <- newChannelWith q db cfg (fmap (k,) fld)
return (Map.insert k r keyToChan1, chan)
Just (chan, _) -> return (keyToChan1, chan)
-- XXX We might block forever if some folds are already
-- done but we have not read the output queue yet. To
-- avoid that we have to either (1) precheck if space
-- is available in the input queues of all folds so
-- that this does not block, or (2) we have to use a
-- non-blocking read and track progress so that we can
-- restart from where we left.
--
-- If there is no space available then we should block
-- on doorbell db or inputSpaceDoorBell of the relevant
-- channel. To avoid deadlock the output space can be
-- kept unlimited. However, the blocking will delay the
-- processing of outputs. We should yield the outputs
-- before blocking.
sendToWorker_ ch x
return $ DemuxGo s q db keyToChan2
Skip s ->
return $ DemuxGo s q db keyToChan1
Stop -> do
let chans = fmap fst $ fmap snd $ Map.toList keyToChan1
Prelude.mapM_ finalize chans
return $ DemuxDrain q db keyToChan1
if null outputs
then return $ Skip next
else return $ Yield outputs next
step _ (DemuxDrain q db keyToChan) = do
(keyToChan1, outputs) <- collectOutputs q keyToChan
if Map.null keyToChan1
-- XXX null outputs case
then return $ Yield outputs DemuxStop
else do
if null outputs
then do
liftIO $ takeMVar db
return $ Skip (DemuxDrain q db keyToChan1)
else return $ Yield outputs (DemuxDrain q db keyToChan1)
step _ DemuxStop = return Stop

0 comments on commit 81f4790

Please sign in to comment.