Skip to content

Commit

Permalink
Update and clean split and compact operations
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Oct 6, 2024
1 parent 27cecf0 commit 96e26d5
Show file tree
Hide file tree
Showing 35 changed files with 1,765 additions and 730 deletions.
8 changes: 4 additions & 4 deletions benchmark/Streamly/Benchmark/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ toarr = Array.fromList . map (fromIntegral . ord)
fileInfixTakeEndBy_ :: Handle -> IO Int
fileInfixTakeEndBy_ inh =
Stream.fold Fold.length
$ Stream.foldMany1 (FL.takeEndBy_ (== lf) Fold.drain)
$ Stream.foldManyPost (FL.takeEndBy_ (== lf) Fold.drain)
$ Handle.read inh -- >>= print

#ifdef INSPECTION
Expand Down Expand Up @@ -256,7 +256,7 @@ inspect $ 'fileSuffixTakeEndBy `hasNoType` ''MutArray.ArrayUnsafe -- FH.read/A.
splitOnSeq :: String -> Handle -> IO Int
splitOnSeq str inh =
Stream.fold Fold.length
$ Stream.foldMany1 (Fold.takeEndBySeq_ (toarr str) Fold.drain)
$ Stream.foldManyPost (Fold.takeEndBySeq_ (toarr str) Fold.drain)
$ Handle.read inh -- >>= print

#ifdef INSPECTION
Expand All @@ -269,7 +269,7 @@ splitOnSeq100k :: Handle -> IO Int
splitOnSeq100k inh = do
arr <- Stream.fold Array.create $ Stream.replicate 100000 123
Stream.fold Fold.length
$ Stream.foldMany1 (Fold.takeEndBySeq_ arr Fold.drain)
$ Stream.foldManyPost (Fold.takeEndBySeq_ arr Fold.drain)
$ Handle.read inh -- >>= print

-- | Split on suffix sequence.
Expand Down Expand Up @@ -356,7 +356,7 @@ o_1_space_reduce_read_split env =
splitOnSeqUtf8 :: String -> Handle -> IO Int
splitOnSeqUtf8 str inh =
Stream.fold Fold.length
$ Stream.foldMany1 (Fold.takeEndBySeq_ (Array.fromList str) Fold.drain)
$ Stream.foldManyPost (Fold.takeEndBySeq_ (Array.fromList str) Fold.drain)
$ Unicode.decodeUtf8Chunks
$ Handle.readChunks inh -- >>= print

Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Data/Stream/Expand.hs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ inspect $ 'concatMapRepl `hasNoType` ''SPEC
unfoldManyRepl :: Int -> Int -> Int -> IO ()
unfoldManyRepl outer inner n =
drain
$ S.unfoldMany
$ S.unfoldEach
UF.replicateM
(fmap ((inner,) . return) (sourceUnfoldrM outer n))

Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ foldMany1 :: Monad m => Stream m Int -> m ()
foldMany1 =
Common.drain
. fmap getSum
. S.foldMany1 (FL.take 2 FL.mconcat)
. S.foldManyPost (FL.take 2 FL.mconcat)
. fmap Sum

{-# INLINE refoldMany #-}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Data/Stream/StreamD.hs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ inspect $ 'concatMapRepl `hasNoType` ''SPEC
unfoldManyRepl :: Int -> Int -> Int -> IO ()
unfoldManyRepl outer inner n =
S.drain
$ S.unfoldMany
$ S.unfoldEach
UF.replicateM
(S.map ((inner,) . return) (sourceUnfoldrMN outer n))

Expand Down
4 changes: 2 additions & 2 deletions benchmark/Streamly/Benchmark/Data/Stream/Transform.hs
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,12 @@ insertBy value n = composeN n $ Stream.insertBy compare (value + 1)
{-# INLINE interposeSuffix #-}
interposeSuffix :: Monad m => Int -> Int -> Stream m Int -> m ()
interposeSuffix value n =
composeN n $ Stream.interposeSuffix (value + 1) Unfold.identity
composeN n $ Stream.unfoldEachSepBy (value + 1) Unfold.identity

{-# INLINE intercalateSuffix #-}
intercalateSuffix :: Monad m => Int -> Int -> Stream m Int -> m ()
intercalateSuffix value n =
composeN n $ Stream.intercalateSuffix Unfold.identity (value + 1)
composeN n $ Stream.unfoldEachSepBySeq (value + 1) Unfold.identity

o_1_space_inserting :: Int -> [Benchmark]
o_1_space_inserting value =
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/Data/Unfold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ concatCount linearCount =
many :: Monad m => Int -> Int -> m ()
many linearCount start = do
let end = start + concatCount linearCount
UF.fold FL.drain (UF.many (source end) (source end)) start
UF.fold FL.drain (UF.unfoldEach (source end) (source end)) start

-------------------------------------------------------------------------------
-- Benchmarks
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ chunksOfSum n inh =
foldMany1ChunksOfSum :: Int -> Handle -> IO Int
foldMany1ChunksOfSum n inh =
S.fold Fold.length
$ IP.foldMany1 (FL.take n FL.sum) (S.unfold FH.reader inh)
$ IP.foldManyPost (FL.take n FL.sum) (S.unfold FH.reader inh)

foldManyChunksOfSum :: Int -> Handle -> IO Int
foldManyChunksOfSum n inh =
Expand Down
27 changes: 16 additions & 11 deletions core/src/Streamly/Data/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ module Streamly.Data.Stream
-- >>> elemIndices a = findIndices (== a)
-- >>> uniq = Stream.scanMaybe (Fold.uniqBy (==))
-- >>> partition p = Stream.fold (Fold.partition Fold.toList Fold.toList) . fmap (if p then Left else Right)
-- >>> takeLast n s = Stream.fromEffect $ fmap Array.read $ Array.createOfLast n s
-- , scanlMaybe
, take
, takeWhile
Expand Down Expand Up @@ -505,15 +506,15 @@ module Streamly.Data.Stream
-- * Unfold Each
-- Idioms and equivalents of Data.List APIs:
--
-- >>> cycle = Stream.unfoldMany Unfold.fromList . Stream.repeat
-- >>> unlines = Stream.interposeSuffix '\n'
-- >>> unwords = Stream.interpose ' '
-- >>> unlines = Stream.intercalateSuffix Unfold.fromList "\n"
-- >>> unwords = Stream.intercalate Unfold.fromList " "
-- >>> cycle = Stream.unfoldEach Unfold.fromList . Stream.repeat
-- >>> unlines = Stream.unfoldEachEndBy '\n'
-- >>> unwords = Stream.unfoldEachSepBy ' '
-- >>> unlines = Stream.unfoldEachEndBySeq "\n" Unfold.fromList
-- >>> unwords = Stream.unfoldEachSepBySeq " " Unfold.fromList
--
, unfoldMany
, intercalate
, intercalateSuffix
, unfoldEach
, unfoldEachSepBySeq
, unfoldEachEndBySeq

-- * Stream of streams
-- | Stream operations like map and filter represent loops in
Expand Down Expand Up @@ -550,7 +551,6 @@ module Streamly.Data.Stream
-- >>> groupsByRolling eq = Stream.parseMany (Parser.groupByRolling eq Fold.toList)
-- >>> groups = groupBy (==)
, foldMany
, foldMany1
, groupsOf
, parseMany

Expand All @@ -564,8 +564,8 @@ module Streamly.Data.Stream
-- >>> splitAt n = Stream.fold (Fold.splitAt n Fold.toList Fold.toList)
-- >>> span p = Parser.splitWith (,) (Parser.takeWhile p Fold.toList) (Parser.fromFold Fold.toList)
-- >>> break p = span (not . p)
, splitOn
, splitOnSeq
, splitSepBy_
, splitSepBySeq_
, splitEndBySeq
, splitEndBySeq_
, wordsBy
Expand Down Expand Up @@ -673,6 +673,11 @@ module Streamly.Data.Stream
, scan
, scanMaybe
, postscan
, splitOn
, splitOnSeq
, unfoldMany
, intercalate
, intercalateSuffix
)
where

Expand Down
5 changes: 4 additions & 1 deletion core/src/Streamly/Data/Unfold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-- Fast, composable stream producers with ability to terminate, supporting
-- nested stream fusion. Nested stream operations like
-- 'Streamly.Data.Stream.concatMap' in the "Streamly.Data.Stream" module do not
-- fuse, however, the 'Streamly.Data.Stream.unfoldMany' operation, using the
-- fuse, however, the 'Streamly.Data.Stream.unfoldEach' operation, using the
-- 'Unfold' type, is a fully fusible alternative to
-- 'Streamly.Data.Stream.concatMap'.
--
Expand Down Expand Up @@ -88,6 +88,9 @@ module Streamly.Data.Unfold
, crossWith

-- ** Nesting
, unfoldEach

-- * Deprecated
, many

)
Expand Down
3 changes: 1 addition & 2 deletions core/src/Streamly/Internal/Console/Stdio.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import Streamly.Internal.Data.Fold (Fold)

import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Stream as Stream
(intersperseMSuffix)
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Streamly.Internal.FileSystem.Handle as Handle
import qualified Streamly.Internal.Unicode.Stream as Unicode
Expand Down Expand Up @@ -224,5 +223,5 @@ putStrings = putStringsWith Unicode.encodeUtf8
putStringsLn :: MonadIO m => Stream m String -> m ()
putStringsLn =
putChunks
. Stream.intersperseMSuffix (return $ Array.fromList [10])
. Stream.intersperseEndByM (return $ Array.fromList [10])
. Unicode.encodeStrings Unicode.encodeUtf8
Loading

0 comments on commit 96e26d5

Please sign in to comment.