Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parTeeWith for folds #2829

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Streamly/Internal/Data/Fold/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Streamly.Internal.Data.Fold.Channel.Type
, newChannelWith
, newChannelWithScan
, newChannel
, newScanChannel
, sendToWorker
, sendToWorker_
, checkFoldStatus -- XXX collectFoldOutput
Expand Down Expand Up @@ -340,6 +341,8 @@ scanToChannel chan (Scanl step initial extract final) =
r <- initial
case r of
Fold.Partial s -> do
b <- extract s
void $ sendPartialToDriver chan b
return $ Fold.Partial s
Fold.Done b ->
Fold.Done <$> void (sendYieldToDriver chan b)
Expand All @@ -356,6 +359,7 @@ scanToChannel chan (Scanl step initial extract final) =

extract1 _ = return ()

-- XXX Should we not discard the result?
final1 st = void (final st)

{-# INLINABLE newChannelWithScan #-}
Expand Down Expand Up @@ -394,6 +398,16 @@ newChannel modifier f = do
outQMvRev <- liftIO newEmptyMVar
fmap fst (newChannelWith outQRev outQMvRev modifier f)

{-# INLINABLE newScanChannel #-}
{-# SPECIALIZE newScanChannel ::
(Config -> Config) -> Scanl IO a b -> IO (Channel IO a b) #-}
newScanChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Scanl m a b -> m (Channel m a b)
newScanChannel modifier f = do
outQRev <- liftIO $ newIORef ([], 0)
outQMvRev <- liftIO newEmptyMVar
fmap fst (newChannelWithScan outQRev outQMvRev modifier f)

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------
Expand Down
85 changes: 84 additions & 1 deletion src/Streamly/Internal/Data/Scanl/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

module Streamly.Internal.Data.Scanl.Concurrent
(
parDistributeScan
parTeeWith
, parDistributeScan
, parDemuxScan
)
where
Expand All @@ -21,9 +22,12 @@ import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import Streamly.Internal.Data.Fold (Step (..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))

import qualified Data.Map.Strict as Map

Expand All @@ -34,6 +38,85 @@ import Streamly.Internal.Data.Channel.Types
-- Concurrent scans
-------------------------------------------------------------------------------

-- | Execute both the scans in a tee concurrently.
--
-- Example:
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Scanl.lmapM delay Scanl.sum
-- >>> c2 = Scanl.lmapM delay Scanl.length
-- >>> dst = Scanl.parTeeWith id (,) c1 c2
-- >>> Stream.toList $ Stream.scanl dst src
-- ...
--
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
(Config -> Config)
-> (a -> b -> c)
-> Scanl m x a
-> Scanl m x b
-> Scanl m x c
parTeeWith cfg f c1 c2 = Scanl step initial extract final

where

getResponse ch1 ch2 = do
-- NOTE: We do not need a queue and doorbell mechanism for this, a single
-- MVar should be enough. Also, there is only one writer and it writes
-- only once before we read it.
let db1 = outputDoorBell ch1
let q1 = outputQueue ch1
(xs1, _) <- liftIO $ atomicModifyIORefCAS q1 $ \x -> (([],0), x)
case xs1 of
[] -> do
liftIO $ takeMVar db1
getResponse ch1 ch2
x1 : [] -> do
case x1 of
FoldException _tid ex -> do
-- XXX
-- liftIO $ throwTo ch2Tid ThreadAbort
cleanup ch1
cleanup ch2
liftIO $ throwM ex
FoldDone _tid b -> return (Left b)
FoldPartial b -> return (Right b)
_ -> error "parTeeWith: not expecting more than one msg in q"

processResponses ch1 ch2 r1 r2 =
return $ case r1 of
Left b1 -> do
case r2 of
Left b2 -> Done (f b1 b2)
Right b2 -> Done (f b1 b2)
Right b1 -> do
case r2 of
Left b2 -> Done (f b1 b2)
Right b2 -> Partial $ Tuple3' ch1 ch2 (f b1 b2)

initial = do
ch1 <- newScanChannel cfg c1
ch2 <- newScanChannel cfg c2
r1 <- getResponse ch1 ch2
r2 <- getResponse ch2 ch1
processResponses ch1 ch2 r1 r2

step (Tuple3' ch1 ch2 _) x = do
sendToWorker_ ch1 x
sendToWorker_ ch2 x
r1 <- getResponse ch1 ch2
r2 <- getResponse ch2 ch1
processResponses ch1 ch2 r1 r2

extract (Tuple3' _ _ x) = return x

final (Tuple3' ch1 ch2 x) = do
finalize ch1
finalize ch2
-- XXX generate the final value?
return x

-- There are two ways to implement a concurrent scan.
--
-- 1. Make the scan itself asynchronous, add the input to the queue, and then
Expand Down
Loading