Skip to content

Commit

Permalink
Add parTeeWith for folds
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Aug 27, 2024
1 parent 32350bc commit dbf921b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 1 deletion.
14 changes: 14 additions & 0 deletions src/Streamly/Internal/Data/Fold/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Streamly.Internal.Data.Fold.Channel.Type
, newChannelWith
, newChannelWithScan
, newChannel
, newScanChannel
, sendToWorker
, sendToWorker_
, checkFoldStatus -- XXX collectFoldOutput
Expand Down Expand Up @@ -340,6 +341,8 @@ scanToChannel chan (Scanl step initial extract final) =
r <- initial
case r of
Fold.Partial s -> do
b <- extract s
void $ sendPartialToDriver chan b
return $ Fold.Partial s
Fold.Done b ->
Fold.Done <$> void (sendYieldToDriver chan b)
Expand All @@ -356,6 +359,7 @@ scanToChannel chan (Scanl step initial extract final) =

extract1 _ = return ()

-- XXX Should we not discard the result?
final1 st = void (final st)

{-# INLINABLE newChannelWithScan #-}
Expand Down Expand Up @@ -394,6 +398,16 @@ newChannel modifier f = do
outQMvRev <- liftIO newEmptyMVar
fmap fst (newChannelWith outQRev outQMvRev modifier f)

{-# INLINABLE newScanChannel #-}
{-# SPECIALIZE newScanChannel ::
(Config -> Config) -> Scanl IO a b -> IO (Channel IO a b) #-}
newScanChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel modifier f = do
outQRev <- liftIO $ newIORef ([], 0)
outQMvRev <- liftIO newEmptyMVar
fmap fst (newChannelWithScan outQRev outQMvRev modifier f)

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------
Expand Down
85 changes: 84 additions & 1 deletion src/Streamly/Internal/Data/Scanl/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

module Streamly.Internal.Data.Scanl.Concurrent
(
parDistributeScan
parTeeWith
, parDistributeScan
, parDemuxScan
)
where
Expand All @@ -21,9 +22,12 @@ import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Fold (Step (..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))

import qualified Data.Map.Strict as Map

Expand All @@ -34,6 +38,85 @@ import Streamly.Internal.Data.Channel.Types
-- Concurrent scans
-------------------------------------------------------------------------------

-- | Execute both the scans in a tee concurrently.
--
-- Example:
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Scanl.lmapM delay Scanl.sum
-- >>> c2 = Scanl.lmapM delay Scanl.length
-- >>> dst = Scanl.parTeeWith id (,) c1 c2
-- >>> Stream.toList $ Stream.scanl dst src
-- ...
--
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
(Config -> Config)
-> (a -> b -> c)
-> Scanl m x a
-> Scanl m x b
-> Scanl m x c
parTeeWith cfg f c1 c2 = Scanl step initial extract final

where

getResponse ch1 ch2 = do
-- NOTE: We do not need a queue and doorbell mechanism for this, a single
-- MVar should be enough. Also, there is only one writer and it writes
-- only once before we read it.
let db1 = outputDoorBell ch1
let q1 = outputQueue ch1
(xs1, _) <- liftIO $ atomicModifyIORefCAS q1 $ \x -> (([],0), x)
case xs1 of
[] -> do
liftIO $ takeMVar db1
getResponse ch1 ch2
x1 : [] -> do
case x1 of
FoldException _tid ex -> do
-- XXX
-- liftIO $ throwTo ch2Tid ThreadAbort
cleanup ch1
cleanup ch2
liftIO $ throwM ex
FoldDone _tid b -> return (Left b)
FoldPartial b -> return (Right b)
_ -> error "parTeeWith: not expecting more than one msg in q"

processResponses ch1 ch2 r1 r2 =
return $ case r1 of
Left b1 -> do
case r2 of
Left b2 -> Done (f b1 b2)
Right b2 -> Done (f b1 b2)
Right b1 -> do
case r2 of
Left b2 -> Done (f b1 b2)
Right b2 -> Partial $ Tuple3' ch1 ch2 (f b1 b2)

initial = do
ch1 <- newScanChannel cfg c1
ch2 <- newScanChannel cfg c2
r1 <- getResponse ch1 ch2
r2 <- getResponse ch2 ch1
processResponses ch1 ch2 r1 r2

step (Tuple3' ch1 ch2 _) x = do
sendToWorker_ ch1 x
sendToWorker_ ch2 x
r1 <- getResponse ch1 ch2
r2 <- getResponse ch2 ch1
processResponses ch1 ch2 r1 r2

extract (Tuple3' _ _ x) = return x

final (Tuple3' ch1 ch2 x) = do
finalize ch1
finalize ch2
-- XXX generate the final value?
return x

-- There are two ways to implement a concurrent scan.
--
-- 1. Make the scan itself asynchronous, add the input to the queue, and then
Expand Down

0 comments on commit dbf921b

Please sign in to comment.