Skip to content

Commit

Permalink
Updates to intervalsOf family of operations
Browse files Browse the repository at this point in the history
Rename groupsOfTimeout.
Add skeleton of boundedIntervalsOf.
Update docs of intervalsOf.
  • Loading branch information
harendra-kumar committed Oct 7, 2024
1 parent eed1da5 commit 17cba34
Showing 1 changed file with 45 additions and 9 deletions.
54 changes: 45 additions & 9 deletions src/Streamly/Internal/Data/Stream/Time.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ module Streamly.Internal.Data.Stream.Time

-- * Chunking
, intervalsOf
, groupsOfTimeout
, boundedIntervalsOf
, timedGroupsOf

-- * Sampling
, sampleIntervalEnd
Expand All @@ -48,6 +49,9 @@ module Streamly.Internal.Data.Stream.Time
, bufferLatest
, bufferLatestN
, bufferOldestN

-- * Deprecated
, groupsOfTimeout
)
where

Expand Down Expand Up @@ -220,6 +224,13 @@ dropLastInterval = undefined
-- | Group the input stream into windows of @n@ second each and then fold each
-- group using the provided fold function.
--
-- If the fold terminates before the interval is over, the next collection is
-- started, thus multiple collection can happen in the same interval. If the
-- fold does not terminate before the interval is over, the fold will be forced
-- to terminate at the interval end.
--
-- Example:
--
-- >>> twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 1
-- >>> intervals = Stream.intervalsOf 1 Fold.toList twoPerSec
-- >>> Stream.fold Fold.toList $ Stream.take 2 intervals
Expand All @@ -232,28 +243,53 @@ intervalsOf n f xs =
(Fold.takeEndBy isNothing (Fold.catMaybes f))
(interject (return Nothing) n (fmap Just xs))

-- | Like 'intervalsOf' but with an additional argument to limit the number of
-- collected items to a max limit. If the limit is reached, the fold output is
-- emitted and the next interval is started.
--
-- An alternative behavior would be to emit multiple elements in the same
-- interval if the size is exceeded, keeping the intervals fixed. That can be
-- achieved by using 'intervalsOf' and using a fold that terminates on the
-- limit.
--
-- /Unimplemented/
{-# INLINE boundedIntervalsOf #-}
boundedIntervalsOf :: -- MonadAsync m =>
Int -> Double -> Int -> Fold m a b -> Stream m a -> Stream m b
boundedIntervalsOf _len _time _f _xs = undefined

-- XXX This can be implemented more efficiently by sharing a Clock.Timer across
-- parallel threads and resetting it whenever a span is emitted.

-- | Like 'chunksOf' but if the chunk is not completed within the specified
-- time interval then emit whatever we have collected till now. The chunk
-- timeout is reset whenever a chunk is emitted. The granularity of the clock
-- | Like 'groupsOf' but if the group is not completed within the specified
-- time interval then emit whatever we have collected till now. The group
-- timeout is reset whenever a group is emitted. The granularity of the clock
-- is 100 ms.
--
-- Note that it will not emit any output unless at least one item has been
-- collected in the group. The time interval is only for timing out the already
-- collected but not yet complete group.
--
-- >>> s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
-- >>> f = Stream.fold (Fold.drainMapM print) $ Stream.groupsOfTimeout 5 1 Fold.toList s
-- >>> f = Stream.fold (Fold.drainMapM print) $ Stream.timedGroupsOf 1 5 Fold.toList s
--
-- /Pre-release/
{-# INLINE groupsOfTimeout #-}
groupsOfTimeout :: MonadAsync m
=> Int -> Double -> Fold m a b -> Stream m a -> Stream m b
groupsOfTimeout n timeout f =
{-# INLINE timedGroupsOf #-}
timedGroupsOf :: MonadAsync m
=> Double -> Int -> Fold m a b -> Stream m a -> Stream m b
timedGroupsOf timeout n f =
fmap snd
. classifySessionsBy
0.1 False (const (return False)) timeout (Fold.take n f)
. Stream.timestamped
. fmap ((),)

{-# Deprecated groupsOfTimeout "Please use timedGroupsOf instead." #-}
{-# INLINE groupsOfTimeout #-}
groupsOfTimeout :: MonadAsync m
=> Int -> Double -> Fold m a b -> Stream m a -> Stream m b
groupsOfTimeout n timeout = timedGroupsOf timeout n

------------------------------------------------------------------------------
-- Windowed classification
------------------------------------------------------------------------------
Expand Down

0 comments on commit 17cba34

Please sign in to comment.