Skip to content

Commit

Permalink
Add distributeScan
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Aug 19, 2024
1 parent e299522 commit 37067c3
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions core/src/Streamly/Internal/Data/Fold/Combinators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ module Streamly.Internal.Data.Fold.Combinators
-- ** Parallel Distribution
, tee
, distribute
, distributeScan
-- , distributeFst
-- , distributeMin

Expand Down Expand Up @@ -2041,6 +2042,45 @@ tee = teeWith (,)
distribute :: Monad m => [Fold m a b] -> Fold m a [b]
distribute = Prelude.foldr (teeWith (:)) (fromPure [])

-- XXX use mutable cells for better performance.

-- | Distribute the input to the folds returned by an effect. The effect is
-- executed every time an input is processed, and the folds returned by it are
-- added to the distribution list. The scan returns the results of the folds as
-- they complete. To avoid adding the same folds repeatedly, the action must
-- return the folds only once e.g. it can be implemented using modifyIORef
-- replacing the original value by an empty list before returning it.
--
{-# INLINE distributeScan #-}
distributeScan :: Monad m => m [Fold m a b] -> Scanl m a [b]
distributeScan getFolds = Scanl consume initial extract final

where

initial = return $ Partial (Tuple' [] [])

run st [] _ = return $ Partial st
run (Tuple' ys zs) (Fold step init extr fin : xs) a = do
res <- init
case res of
Partial fs -> do
r <- step fs a
run (Tuple' (Fold step (return r) extr fin : ys) zs) xs a
Done b -> do
run (Tuple' ys (b : zs)) xs a

consume (Tuple' st _) x = do
xs <- getFolds
xs1 <- Prelude.mapM reduce xs
let st1 = st ++ xs1
run (Tuple' [] []) st1 x

extract (Tuple' _ done) = return done

final (Tuple' st done) = do
Prelude.mapM_ finalM st
return done

------------------------------------------------------------------------------
-- Partitioning
------------------------------------------------------------------------------
Expand Down

0 comments on commit 37067c3

Please sign in to comment.