diff --git a/core/src/Streamly/Internal/Data/Fold/Combinators.hs b/core/src/Streamly/Internal/Data/Fold/Combinators.hs index e1337f70ef..ec6bb9cafa 100644 --- a/core/src/Streamly/Internal/Data/Fold/Combinators.hs +++ b/core/src/Streamly/Internal/Data/Fold/Combinators.hs @@ -182,6 +182,7 @@ module Streamly.Internal.Data.Fold.Combinators -- ** Parallel Distribution , tee , distribute + , distributeScan -- , distributeFst -- , distributeMin @@ -2041,6 +2042,45 @@ tee = teeWith (,) distribute :: Monad m => [Fold m a b] -> Fold m a [b] distribute = Prelude.foldr (teeWith (:)) (fromPure []) +-- XXX use mutable cells for better performance. + +-- | Distribute the input to the folds returned by an effect. The effect is +-- executed every time an input is processed, and the folds returned by it are +-- added to the distribution list. The scan returns the results of the folds as +-- they complete. To avoid adding the same folds repeatedly, the action must +-- return the folds only once e.g. it can be implemented using modifyIORef +-- replacing the original value by an empty list before returning it. +-- +{-# INLINE distributeScan #-} +distributeScan :: Monad m => m [Fold m a b] -> Scanl m a [b] +distributeScan getFolds = Scanl consume initial extract final + + where + + initial = return $ Partial (Tuple' [] []) + + run st [] _ = return $ Partial st + run (Tuple' ys zs) (Fold step init extr fin : xs) a = do + res <- init + case res of + Partial fs -> do + r <- step fs a + run (Tuple' (Fold step (return r) extr fin : ys) zs) xs a + Done b -> do + run (Tuple' ys (b : zs)) xs a + + consume (Tuple' st _) x = do + xs <- getFolds + xs1 <- Prelude.mapM reduce xs + let st1 = st ++ xs1 + run (Tuple' [] []) st1 x + + extract (Tuple' _ done) = return done + + final (Tuple' st done) = do + Prelude.mapM_ finalM st + return done + ------------------------------------------------------------------------------ -- Partitioning ------------------------------------------------------------------------------