Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
harendra-kumar committed Sep 25, 2024
1 parent f29d086 commit ac11942
Showing 1 changed file with 19 additions and 48 deletions.
67 changes: 19 additions & 48 deletions core/src/Streamly/Internal/Data/Ring.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import qualified Streamly.Internal.Data.Array.Type as Array
import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.MutArray.Type as MutArray
import qualified Streamly.Internal.Data.MutByteArray.Type as MutByteArray
import qualified Streamly.Internal.Data.Scanl.Type as Scanl
import qualified Streamly.Internal.Data.Stream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.Type as Stream
Expand Down Expand Up @@ -164,27 +165,11 @@ data Ring a = Ring
, ringSize :: {-# UNPACK #-} !Int -- size of array in bytes
, ringHead :: {-# UNPACK #-} !Int -- byte index in the array
}
-- XXX How do we differentiate between byte-ish and element-ish numbers?
-- This was a problem when I was playing with arrays.

{-
XXX
Maintaining ringHead in the Ring means we need to pass the ring around in the
loop if we ever modify the ringHead.
Consider functions like splitOnSuffixSeq and friends. Currently we only thread
the ringHead in the loop but keeping the ringHead in the Ring means we'll have
to thread the entire ring.
-}

-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------

-- XXX We should suffix byte based APIs with *Bytes

-- | Given byte offset relative to the ring head, compute the linear byte
-- offset in the array. Offset can be positive or negative. Invariants:
--
Expand All @@ -200,9 +185,6 @@ unsafeChangeHeadByOffset rh rs i =
else if i1 < 0
then i1 + rs
else i1
-- XXX Petty observation:
-- Don't use the term "change"?
-- When I look at this function used in other functions I get confused.

-- | Convert a byte offset relative to the ring head to a byte offset in the
-- underlying mutable array. Offset can be positive or negative.
Expand Down Expand Up @@ -237,7 +219,6 @@ incrHeadByOffset rh rs n =
in if rh1 >= rs
then 0
else rh1
-- Isn't this the same as "unsafeChangeHeadByOffset"?

-- | Advance the ring head forward by 1 slot, the ring head will now point to
-- the next (newer) item, and the old ring head position will become the latest
Expand Down Expand Up @@ -271,7 +252,6 @@ decrHeadByOffset rh rs n =
moveReverse :: forall a. Unbox a => Ring a -> Ring a
moveReverse rb@Ring{..} =
rb { ringHead = decrHeadByOffset ringHead ringSize (SIZE_OF(a)) }
-- XXX moveBackward?

-------------------------------------------------------------------------------
-- Conversions
Expand Down Expand Up @@ -386,8 +366,6 @@ replace_ rb newVal = do
when (ringSize rb /= 0)
$ liftIO $ pokeAt (ringHead rb) (ringContents rb) newVal
pure $ moveForward rb
-- XXX replace_ does not perform a check done by replace
-- Maybe consider using the prefix unsafe?

-- | Return the element at the specified index without checking the bounds.
--
Expand All @@ -410,8 +388,6 @@ replace rb newVal = do
old <- unsafeGetRawIndex (ringHead rb) rb
liftIO $ pokeAt (ringHead rb) (ringContents rb) newVal
pure (moveForward rb, old)
-- XXX The terminology of oldest and newest is rather confusing
-- Would advancing a pointer make the next element the oldest?

-------------------------------------------------------------------------------
-- Random reads
Expand Down Expand Up @@ -486,25 +462,23 @@ reader = Unfold step inject
x <- unsafeGetHead rb
return $ Yield x (moveForward rb, n - SIZE_OF(a))

-- | Read the entire ring, starting at the item before the ring head i.e. from
-- newest to oldest
-- | Read the entire ring in reverse order, starting at the item before the
-- ring head i.e. from newest to oldest
--
{-# INLINE_NORMAL readerRev #-}
readerRev :: forall m a. (MonadIO m, Unbox a) => Unfold m (Ring a) a
readerRev = Unfold step inject

where

inject rb = return (rb, ringSize rb)
inject rb = return (moveReverse rb, ringSize rb)

step (rb, n) = do
if n <= 0
then return Stop
else do
let rb1 = moveReverse rb
x <- unsafeGetHead rb1
return $ Yield x (rb1, n - SIZE_OF(a))
-- XXX Move the moveReverse to the inject and use the same code as reader?
x <- unsafeGetHead rb
return $ Yield x (moveReverse rb, n - SIZE_OF(a))

-- | Read the entire ring as a stream, starting at the ring head i.e. from
-- oldest to newest.
Expand Down Expand Up @@ -538,24 +512,26 @@ scanRingsOf n = Scanl step initial extract extract

where

rSize = n * SIZE_OF(a)

initial =
if n <= 0
then error "ringsOf: window size must be > 0"
then error "scanRingsOf: window size must be > 0"
else do
arr :: MutArray.MutArray a <- liftIO $ MutArray.emptyOf n
return $ Partial $ Tuple3Fused' (MutArray.arrContents arr) 0 0
mba <- liftIO $ MutByteArray.new rSize
return $ Partial $ Tuple3Fused' mba 0 0

-- XXX Move "n * SIZE_OF(a)" out?
step (Tuple3Fused' mba rh i) a = do
Ring _ _ rh1 <- replace_ (Ring mba (n * SIZE_OF(a)) rh) a
return $ Partial $ Tuple3Fused' mba rh1 (i + 1)
step (Tuple3Fused' mba rh offset) a = do
Ring _ _ rh1 <- replace_ (Ring mba rSize rh) a
let offset1 = offset + SIZE_OF(a)
return $ Partial $ Tuple3Fused' mba rh1 offset1

-- XXX exitify optimization causes a problem here when modular folds are
-- used. Sometimes inlining "extract" is helpful.
{-# INLINE extract #-}
extract (Tuple3Fused' mba rh i) =
let rs = min i n * SIZE_OF(a)
rh1 = if i <= n then 0 else rh
extract (Tuple3Fused' mba rh offset) =
let rs = min offset rSize
rh1 = if offset <= rSize then 0 else rh
in pure $ Ring mba rs rh1

-- | @ringsOf n stream@ groups the input stream into a stream of ring arrays of
Expand Down Expand Up @@ -627,8 +603,7 @@ unsafeCast Ring{..} =
{ ringContents = ringContents
, ringHead = ringHead
, ringSize = ringSize
}
-- XXX Indentation fix
}

-- | Cast a @Ring a@ into a @Ring Word8@.
--
Expand Down Expand Up @@ -775,7 +750,6 @@ unsafeFoldRing !len f z rb = go z 0
| otherwise = do
x <- unsafeGetRawIndex index rb
go (f acc x) (index + SIZE_OF(a))
-- XXX Deprecated message does not say what to replace it with.

-- | Like unsafeFoldRing but with a monadic step function.
{-# DEPRECATED unsafeFoldRingM "This function will be removed in future." #-}
Expand All @@ -792,7 +766,6 @@ unsafeFoldRingM !len f z rb = go z 0
x <- unsafeGetRawIndex index rb
acc1 <- f acc x
go acc1 (index + SIZE_OF(a))
-- XXX Deprecated message does not say what to replace it with.

-- | Fold the entire length of a ring buffer starting at the current ring head.
--
Expand Down Expand Up @@ -829,7 +802,6 @@ foldlM' f z rb = go z rh
unsafeFoldRingFullM :: forall m a b. (MonadIO m, Unbox a)
=> (b -> a -> m b) -> b -> Ring a -> m b
unsafeFoldRingFullM = foldlM'
-- XXX Deprecated message does not say what to replace it with.

-- | Fold @n@ items in the ring starting at the ring head. Won't fold more
-- than the length of the ring even if @n@ is larger.
Expand All @@ -854,7 +826,6 @@ unsafeFoldRingNM count f z rb = go count z rh
if next == rh || n == 0
then return acc'
else go (n - 1) acc' next
-- XXX Deprecated message does not say what to replace it with.

-- | Cast the ring to a mutable array. Return the mutable array as well as the
-- current position of the ring head. Note that the array does not start with
Expand Down

0 comments on commit ac11942

Please sign in to comment.