Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parDistributeScan using folds #2826

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

## Unreleased

* Add the following modules
- Streamly.Data.Scan
* Add several concurrent combinators for folds in `Streamly.Data.Fold.Prelude`.
* Split the `Fold` type in two, `Fold` and `Scanl`. `Streamly.Data.Scanl`
module is added for the new `Scanl` type.
* Add a `Path` type for representing file system paths, following modules are
added:
- Streamly.FileSystem.Path
- Streamly.FileSystem.Path.LocSeg
- Streamly.FileSystem.Path.FileDir
Expand Down
6 changes: 6 additions & 0 deletions core/src/Streamly/Internal/Data/Fold/Combinators.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,12 @@ distribute = Prelude.foldr (teeWith (:)) (fromPure [])
-- return the folds only once e.g. it can be implemented using modifyIORef
-- replacing the original value by an empty list before returning it.
--
-- >>> import Data.IORef
-- >>> ref <- newIORef [Fold.take 2 Fold.sum, Fold.take 2 Fold.length :: Fold IO Int Int]
-- >>> gen = atomicModifyIORef ref (\xs -> ([], xs))
-- >>> Stream.toList $ Stream.scanl (Fold.distributeScan gen) (Stream.enumerateFromTo 1 10)
-- [[],[],[],[2,3],[],[],[],[],[],[],[]]
--
{-# INLINE distributeScan #-}
distributeScan :: Monad m => m [Fold m a b] -> Scanl m a [b]
distributeScan getFolds = Scanl consume initial extract final
Expand Down
4 changes: 2 additions & 2 deletions src/Streamly/Internal/Data/Channel/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ incrementYieldLimit remaining =
-- and 0 count.
{-# INLINE readOutputQBasic #-}
readOutputQBasic ::
IORef ([ChildEvent a], Int) -- ^ The channel output queue
-> IO ([ChildEvent a], Int) -- ^ (events, count)
IORef ([a], Int) -- ^ The channel output queue
-> IO ([a], Int) -- ^ (events, count)
readOutputQBasic q = atomicModifyIORefCAS q $ \x -> (([],0), x)

-- | Same as 'readOutputQBasic' but additionally update the max output queue
Expand Down
4 changes: 2 additions & 2 deletions src/Streamly/Internal/Data/Channel/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ isBeyondMaxYield cnt winfo =
-- consumer thread.
{-# INLINE sendEvent #-}
sendEvent ::
IORef ([ChildEvent a], Int) -- ^ Queue where the event is added
IORef ([a], Int) -- ^ Queue where the event is added
-> MVar () -- ^ Door bell to ring
-> ChildEvent a -- ^ The event to be added
-> a -- ^ The event to be added
-> IO Int -- ^ Length of the queue before adding this event
sendEvent q bell msg = do
-- XXX can the access to outputQueue be made faster somehow?
Expand Down
109 changes: 84 additions & 25 deletions src/Streamly/Internal/Data/Fold/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Streamly.Internal.Data.Fold.Channel.Type
(
-- ** Type
Channel (..)
, OutEvent (..)

-- ** Configuration
, Config
Expand All @@ -18,9 +19,11 @@ module Streamly.Internal.Data.Fold.Channel.Type
, inspect

-- ** Operations
, newChannelWith
, newChannel
, sendToWorker
, checkFoldStatus
, sendToWorker_
, checkFoldStatus -- XXX collectFoldOutput
, dumpChannel
)
where
Expand Down Expand Up @@ -53,6 +56,10 @@ import Streamly.Internal.Data.Channel.Types
-- queue the accumulator and it will be picked by the next worker to accumulate
-- the next value.

data OutEvent b =
FoldException ThreadId SomeException
| FoldDone ThreadId b

-- | The fold driver thread queues the input of the fold in the 'inputQueue'
-- The driver rings the doorbell when the queue transitions from empty to
-- non-empty state.
Expand Down Expand Up @@ -102,7 +109,10 @@ data Channel m a b = Channel

-- | Final output and exceptions, if any, queued by the fold and read by
-- the fold driver.
, outputQueue :: IORef ([ChildEvent b], Int)
--
-- [LOCKING] atomicModifyIORef. Output is queued infrequently by the fold
-- and read frequently by the driver.
, outputQueue :: IORef ([OutEvent b], Int)

-- | Doorbell for the 'outputQueue', rung by the fold when the queue
-- transitions from empty to non-empty.
Expand Down Expand Up @@ -177,7 +187,7 @@ dumpChannel sv = do
-- Process events received by a fold worker from a fold driver
-------------------------------------------------------------------------------

sendToDriver :: Channel m a b -> ChildEvent b -> IO Int
sendToDriver :: Channel m a b -> OutEvent b -> IO Int
sendToDriver sv msg = do
-- In case the producer stream is blocked on pushing to the fold buffer
-- then wake it up so that it can check for the stop event or exception
Expand All @@ -188,13 +198,14 @@ sendToDriver sv msg = do

sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver sv res = liftIO $ do
void $ sendToDriver sv (ChildYield res)
tid <- myThreadId
void $ sendToDriver sv (FoldDone tid res)

{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver sv e = do
tid <- myThreadId
void $ sendToDriver sv (ChildStop tid (Just e))
void $ sendToDriver sv (FoldException tid e)

data FromSVarState m a b =
FromSVarRead (Channel m a b)
Expand All @@ -217,6 +228,9 @@ fromInputQueue svar = D.Stream step (FromSVarRead svar)
step _ (FromSVarLoop sv []) = return $ D.Skip $ FromSVarRead sv
step _ (FromSVarLoop sv (ev : es)) = do
case ev of
-- XXX Separate input and output events. Input events cannot have
-- Stop event and output events cannot have ChildStopChannel
-- event.
ChildYield a -> return $ D.Yield a (FromSVarLoop sv es)
ChildStopChannel -> return D.Stop
_ -> undefined
Expand Down Expand Up @@ -247,12 +261,14 @@ readInputQWithDB chan = do
_ <- tryPutMVar (inputSpaceDoorBell chan) ()
return r

mkNewChannel :: forall m a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel cfg = do
mkNewChannelWith :: forall m a b. MonadIO m =>
IORef ([OutEvent b], Int)
-> MVar ()
-> Config
-> IO (Channel m a b)
mkNewChannelWith outQRev outQMvRev cfg = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
outQRev <- newIORef ([], 0)
outQMvRev <- newEmptyMVar
bufferMv <- newEmptyMVar

stats <- newSVarStats
Expand All @@ -275,18 +291,26 @@ mkNewChannel cfg = do

let sv = getSVar sv in return sv

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
(Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel modifier f = do
{-# INLINABLE newChannelWith #-}
{-# SPECIALIZE newChannelWith ::
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold IO a b
-> IO (Channel IO a b, ThreadId) #-}
newChannelWith :: (MonadRunInIO m) =>
IORef ([OutEvent b], Int)
-> MVar ()
-> (Config -> Config)
-> Fold m a b
-> m (Channel m a b, ThreadId)
newChannelWith outq outqDBell modifier f = do
let config = modifier defaultConfig
sv <- liftIO $ mkNewChannel config
sv <- liftIO $ mkNewChannelWith outq outqDBell config
mrun <- askRunInIO
void $ doForkWith
tid <- doForkWith
(getBound config) (work sv) mrun (sendExceptionToDriver sv)
return sv
return (sv, tid)

where

Expand All @@ -295,6 +319,16 @@ newChannel modifier f = do
let f1 = Fold.rmapM (void . sendYieldToDriver chan) f
in D.fold f1 $ fromInputQueue chan

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
(Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel modifier f = do
outQRev <- liftIO $ newIORef ([], 0)
outQMvRev <- liftIO newEmptyMVar
fmap fst (newChannelWith outQRev outQMvRev modifier f)

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------
Expand Down Expand Up @@ -326,9 +360,8 @@ checkFoldStatus sv = do
processEvents [] = return Nothing
processEvents (ev : _) = do
case ev of
ChildStop _ e -> maybe undefined throwM e
ChildStopChannel -> undefined
ChildYield b -> return (Just b)
FoldException _ e -> throwM e
FoldDone _ b -> return (Just b)

{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
Expand All @@ -340,10 +373,11 @@ isBufferAvailable sv = do
(_, n) <- liftIO $ readIORef (inputQueue sv)
return $ fromIntegral lim > n

-- | Push values from a driver to a fold worker via a Channel. Before pushing a
-- value to the Channel it polls for events received from the fold worker. If a
-- stop event is received then it returns 'True' otherwise false. Propagates
-- exceptions received from the fold wroker.
-- | Push values from a driver to a fold worker via a Channel. Blocks if no
-- space is available in the buffer. Before pushing a value to the Channel it
-- polls for events received from the fold worker. If a stop event is received
-- then it returns 'True' otherwise false. Propagates exceptions received from
-- the fold worker.
--
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
Expand Down Expand Up @@ -375,3 +409,28 @@ sendToWorker chan a = go
else do
() <- liftIO $ takeMVar (inputSpaceDoorBell chan)
go

-- | Like sendToWorker but only sends, does not receive any events from the
-- fold.
{-# INLINE sendToWorker_ #-}
sendToWorker_ :: MonadAsync m => Channel m a b -> a -> m ()
sendToWorker_ chan a = go

where

-- Recursive function, should we use SPEC?
go = do
r <- isBufferAvailable chan
if r
then do
liftIO
$ void
$ sendEvent
(inputQueue chan)
(inputItemDoorBell chan)
(ChildYield a)
else do
error "sendToWorker_: No space available in the buffer"
-- Block for space
-- () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
-- go
Loading
Loading