Skip to content

Commit

Permalink
Merge pull request #397 from IntersectMBO/dcoutts/lookupsIO-writebuffer
Browse files Browse the repository at this point in the history
Refactor write buffer in lookupsIO
  • Loading branch information
dcoutts authored Sep 30, 2024
2 parents bbc4025 + af0beab commit 25ced18
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 109 deletions.
28 changes: 21 additions & 7 deletions bench/macro/lsm-tree-bench-lookups.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import qualified Database.LSMTree.Internal.RunBuilder as RunBuilder
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.Serialise (SerialisedKey,
serialiseKey, serialiseValue)
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import Debug.Trace (traceMarkerIO)
import GHC.Stats
import Numeric
Expand Down Expand Up @@ -197,8 +199,15 @@ benchmarks !caching = withFS $ \hfs hbio -> do
_blookupsIO <-
benchmark "benchLookupsIO"
"Calculate batches of keys, and perform disk lookups for each batch. This is roughly doing the same as benchPrepLookups, but also performing the disk I/O and resolving values. Net time/allocation is the result of subtracting the cost of benchGenKeyBatches."
(benchLookupsIO hbio arenaManager benchmarkResolveSerialisedValue runs blooms indexes handles keyRng0) benchmarkNumLookups
(\n -> do
let wb_unused = WB.empty
wbblobs_unused <- WBB.new hfs (FS.mkFsPath [])
benchLookupsIO hbio arenaManager benchmarkResolveSerialisedValue
wb_unused wbblobs_unused runs blooms indexes handles
keyRng0 n)
benchmarkNumLookups
bgenKeyBatches
--TODO: consider adding benchmarks that also use the write buffer

traceMarkerIO "Cleaning up"
putStrLn "Cleaning up"
Expand Down Expand Up @@ -454,19 +463,24 @@ benchLookupsIO ::
FS.HasBlockIO IO h
-> ArenaManager RealWorld
-> ResolveSerialisedValue
-> WB.WriteBuffer
-> WBB.WriteBufferBlobs IO h
-> V.Vector (Run IO (FS.Handle h))
-> V.Vector (Bloom SerialisedKey)
-> V.Vector IndexCompact
-> V.Vector (FS.Handle h)
-> StdGen
-> Int
-> IO ()
benchLookupsIO !hbio !arenaManager !resolve !rs !bs !ics !hs !keyRng !n
| n <= 0 = pure ()
| otherwise = do
let (!ks, !keyRng') = genLookupBatch keyRng benchmarkGenBatchSize
!_ <- lookupsIO hbio arenaManager resolve rs bs ics hs ks
benchLookupsIO hbio arenaManager resolve rs bs ics hs keyRng' (n-benchmarkGenBatchSize)
benchLookupsIO !hbio !arenaManager !resolve !wb !wbblobs !rs !bs !ics !hs =
go
where
go !keyRng !n
| n <= 0 = pure ()
| otherwise = do
let (!ks, !keyRng') = genLookupBatch keyRng benchmarkGenBatchSize
!_ <- lookupsIO hbio arenaManager resolve wb wbblobs rs bs ics hs ks
go keyRng' (n-benchmarkGenBatchSize)

{-------------------------------------------------------------------------------
Utilities
Expand Down
30 changes: 20 additions & 10 deletions bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Lookup" [

benchLookups :: Config -> Benchmark
benchLookups conf@Config{name} =
withEnv $ \ ~(_dir, arenaManager, _hasFS, hasBlockIO, rs, ks) ->
withEnv $ \ ~(_dir, arenaManager, hasFS, hasBlockIO, rs, ks) ->
env ( pure ( V.map Run.runFilter rs
, V.map Run.runIndex rs
, V.map Run.runKOpsFile rs
Expand Down Expand Up @@ -122,21 +122,31 @@ benchLookups conf@Config{name} =
-- only compute WHNF.
, bench "Perform intra-page lookups" $
perRunEnvWithCleanup
( newArena arenaManager >>= \arena ->
stToIO (prepLookups arena blooms indexes kopsFiles ks) >>= \(rkixs, ioops) ->
FS.submitIO hasBlockIO ioops >>= \ioress ->
pure (rkixs, ioops, ioress, arena)
( do arena <- newArena arenaManager
(rkixs, ioops) <- stToIO (prepLookups arena blooms indexes kopsFiles ks)
ioress <- FS.submitIO hasBlockIO ioops
wbblobs <- WBB.new hasFS (FS.mkFsPath [])
pure (rkixs, ioops, ioress, arena, wbblobs)
)
(\(_, _, _, arena) -> closeArena arenaManager arena) $ \ ~(rkixs, ioops, ioress, _) -> do
!_ <- intraPageLookups resolveV rs ks rkixs ioops ioress
pure ()
(\(_, _, _, arena, wbblobs) -> do
closeArena arenaManager arena
WBB.removeReference wbblobs)
(\ ~(rkixs, ioops, ioress, _, wbblobs_unused) -> do
!_ <- intraPageLookups resolveV WB.empty wbblobs_unused
rs ks rkixs ioops ioress
pure ())
-- The whole shebang: lookup preparation, doing the IO, and then
-- performing intra-page-lookups. Again, we evaluate the result to
-- WHNF because it is the same result that intraPageLookups produces
-- (see above).
, bench "Lookups in IO" $
whnfAppIO (\ks' -> lookupsIO hasBlockIO arenaManager resolveV rs blooms indexes kopsFiles ks') ks
, let wb_unused = WB.empty in
env (WBB.new hasFS (FS.mkFsPath [])) $ \wbblobs_unused ->
bench "Lookups in IO" $
whnfAppIO (\ks' -> lookupsIO hasBlockIO arenaManager resolveV
wb_unused wbblobs_unused
rs blooms indexes kopsFiles ks') ks
]
--TODO: consider adding benchmarks that also use the write buffer
where
withEnv = envWithCleanup
(lookupsInBatchesEnv conf)
Expand Down
50 changes: 17 additions & 33 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -664,46 +664,30 @@ close th = do
pure tc
pure TableHandleClosed

{-# SPECIALISE lookups :: ResolveSerialisedValue -> V.Vector SerialisedKey -> TableHandle IO h -> (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))) -> lookupResult) -> IO (V.Vector lookupResult) #-}
{-# SPECIALISE lookups :: ResolveSerialisedValue -> V.Vector SerialisedKey -> TableHandle IO h -> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h))))) #-}
-- | See 'Database.LSMTree.Normal.lookups'.
lookups ::
m ~ IO -- TODO: replace by @io-classes@ constraints for IO simulation.
=> ResolveSerialisedValue
-> V.Vector SerialisedKey
-> TableHandle m h
-> (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h))) -> lookupResult)
-- ^ How to map from an entry to a lookup result.
-> m (V.Vector lookupResult)
lookups resolve ks th fromEntry = do
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h)))))
lookups resolve ks th = do
traceWith (tableTracer th) $ TraceLookups (V.length ks)
withOpenTable th $ \thEnv -> do
let arenaManager = tableHandleArenaManager th
RW.withReadAccess (tableContent thEnv) $ \tableContent -> do
let !cache = tableCache tableContent
ioRes <-
lookupsIO
(tableHasBlockIO thEnv)
arenaManager
resolve
(cachedRuns cache)
(cachedFilters cache)
(cachedIndexes cache)
(cachedKOpsFiles cache)
ks
--TODO: this bit is all a bit of a mess, not well factored
--TODO: incorporate write buffer lookups into the lookupsIO
--TODO: reduce allocations involved with converting BlobSpan to BlobRef
-- and Entry to the lookup result. Try one single conversion rather
-- than multiple steps that each allocate.
let !wb = tableWriteBuffer tableContent
!wbblobs = tableWriteBufferBlobs tableContent
toBlobRef <- WBB.mkBlobRef wbblobs
let wbLookup = fmap (fmap (WeakBlobRef . toBlobRef))
. WB.lookup wb
pure $!
V.zipWithStrict
(\k1 e2 -> fromEntry $ Entry.combineMaybe resolve (wbLookup k1) e2)
ks ioRes
withOpenTable th $ \thEnv ->
RW.withReadAccess (tableContent thEnv) $ \tableContent ->
let !cache = tableCache tableContent in
lookupsIO
(tableHasBlockIO thEnv)
(tableHandleArenaManager th)
resolve
(tableWriteBuffer tableContent)
(tableWriteBufferBlobs tableContent)
(cachedRuns cache)
(cachedFilters cache)
(cachedIndexes cache)
(cachedKOpsFiles cache)
ks

{-# SPECIALISE rangeLookup :: ResolveSerialisedValue -> Range SerialisedKey -> TableHandle IO h -> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef IO (Handle h)) -> res) -> IO (V.Vector res) #-}
-- | See 'Database.LSMTree.Normal.rangeLookup'.
Expand Down
49 changes: 42 additions & 7 deletions src/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import Database.LSMTree.Internal.RawPage
import Database.LSMTree.Internal.Run (Run, mkBlobRefForRun)
import Database.LSMTree.Internal.Serialise
import qualified Database.LSMTree.Internal.Vector as V
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
import System.FS.API (BufferOffset (..), Handle)
import System.FS.BlockIO.API

Expand Down Expand Up @@ -217,6 +219,8 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy {
HasBlockIO IO h
-> ArenaManager RealWorld
-> ResolveSerialisedValue
-> WB.WriteBuffer
-> WBB.WriteBufferBlobs IO h
-> V.Vector (Run IO (Handle h))
-> V.Vector (Bloom SerialisedKey)
-> V.Vector IndexCompact
Expand All @@ -235,16 +239,20 @@ lookupsIO ::
=> HasBlockIO m h
-> ArenaManager (PrimState m)
-> ResolveSerialisedValue
-> WB.WriteBuffer
-> WBB.WriteBufferBlobs m h
-> V.Vector (Run m (Handle h)) -- ^ Runs @rs@
-> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@
-> V.Vector IndexCompact -- ^ The indexes inside @rs@
-> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
-> V.Vector SerialisedKey
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h)))))
lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert precondition $ withArena mgr $ \arena -> do
(rkixs, ioops) <- Class.stToIO $ prepLookups arena blooms indexes kopsFiles ks
ioress <- submitIO hbio ioops
intraPageLookups resolveV rs ks rkixs ioops ioress
lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks =
assert precondition $
withArena mgr $ \arena -> do
(rkixs, ioops) <- Class.stToIO $ prepLookups arena blooms indexes kopsFiles ks
ioress <- submitIO hbio ioops
intraPageLookups resolveV wb wbblobs rs ks rkixs ioops ioress
where
-- we check only that the lengths match, because checking the contents is
-- too expensive.
Expand All @@ -256,29 +264,54 @@ lookupsIO !hbio !mgr !resolveV !rs !blooms !indexes !kopsFiles !ks = assert prec

{-# SPECIALIZE intraPageLookups ::
ResolveSerialisedValue
-> WB.WriteBuffer
-> WBB.WriteBufferBlobs IO h
-> V.Vector (Run IO (Handle h))
-> V.Vector SerialisedKey
-> VU.Vector (RunIx, KeyIx)
-> V.Vector (IOOp RealWorld h)
-> VU.Vector IOResult
-> IO (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef IO (Handle h)))))
#-}
-- | Intra-page lookups.
-- | Intra-page lookups, and combining lookup results from multiple runs and
-- the write buffer.
--
-- This function assumes that @rkixs@ is ordered such that newer runs are
-- handled first. The order matters for resolving cases where we find the same
-- key in multiple runs.
--
intraPageLookups ::
forall m h. (PrimMonad m, MonadThrow m)
=> ResolveSerialisedValue
-> WB.WriteBuffer
-> WBB.WriteBufferBlobs m h
-> V.Vector (Run m (Handle h))
-> V.Vector SerialisedKey
-> VU.Vector (RunIx, KeyIx)
-> V.Vector (IOOp (PrimState m) h)
-> VU.Vector IOResult
-> m (V.Vector (Maybe (Entry SerialisedValue (WeakBlobRef m (Handle h)))))
intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress = do
res <- VM.replicate (V.length ks) Nothing
intraPageLookups !resolveV !wb !wbblobs !rs !ks !rkixs !ioops !ioress = do
-- We accumulate results into the 'res' vector. When there are several
-- lookup hits for the same key then we combine the results. The combining
-- operator is associative but not commutative, so we must do this in the
-- right order. We start with the write buffer lookup results and then go
-- through the run lookup results in rkixs, which must be ordered by run.
--
-- TODO: reassess the representation of the result vector to try to reduce
-- intermediate allocations. For example use a less convenient
-- representation with several vectors (e.g. separate blob info) and
-- convert to the final convenient representation in a single pass near
-- the surface API so that all the conversions can be done in one pass
-- without intermediate allocations.
--
toBlobRef <- WBB.mkBlobRef wbblobs
res <- VM.generateM (V.length ks) $ \ki ->
case WB.lookup wb (V.unsafeIndex ks ki) of
Nothing -> pure Nothing
Just e -> pure $! Just $! fmap (WeakBlobRef . toBlobRef) e
-- TODO: ^^ we should be able to avoid this allocation by
-- combining the conversion with other later conversions.
loop res 0
V.unsafeFreeze res
where
Expand Down Expand Up @@ -307,6 +340,8 @@ intraPageLookups !resolveV !rs !ks !rkixs !ioops !ioress = do
LookupEntry e -> do
let e' = bimap copySerialisedValue
(WeakBlobRef . mkBlobRefForRun r) e
-- TODO: ^^ we should be able to avoid this allocation by
-- combining the conversion with other later conversions.
V.unsafeInsertWithMStrict res (combine resolveV) kix e'
-- Laziness ensures that we only compute the appending of the prefix
-- and suffix when the result is needed. We do not use 'force' here,
Expand Down
5 changes: 5 additions & 0 deletions src/Database/LSMTree/Internal/WriteBufferBlobs.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# OPTIONS_GHC -Wno-partial-fields #-}
{- HLINT ignore "Use record patterns" -}

-- | An on-disk store for blobs for the write buffer.
--
Expand Down Expand Up @@ -33,6 +34,7 @@ module Database.LSMTree.Internal.WriteBufferBlobs (
FilePointer (..)
) where

import Control.DeepSeq (NFData (..))
import Control.Monad.Class.MonadThrow
import Control.Monad.Primitive (PrimMonad, PrimState)
import qualified Control.RefCount as RC
Expand Down Expand Up @@ -123,6 +125,9 @@ data BlobFileState m h =
, blobFilePointer :: !(FilePointer m)
}

instance NFData (WriteBufferBlobs m h) where
rnf (WriteBufferBlobs _ b c) = rnf b `seq` rnf c

new :: PrimMonad m
=> HasFS m h
-> FS.FsPath
Expand Down
18 changes: 8 additions & 10 deletions src/Database/LSMTree/Monoidal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ close (Internal.MonoidalTable th) = Internal.close th
-------------------------------------------------------------------------------}

{-# SPECIALISE lookups :: (SerialiseKey k, SerialiseValue v, ResolveValue v) => V.Vector k -> TableHandle IO k v -> IO (V.Vector (LookupResult v)) #-}
{-# INLINEABLE lookups #-}
-- | Perform a batch of lookups.
--
-- Lookups can be performed concurrently from multiple Haskell threads.
Expand All @@ -217,21 +218,18 @@ lookups ::
-> TableHandle m k v
-> m (V.Vector (LookupResult v))
lookups ks (Internal.MonoidalTable th) =
V.mapStrict (fmap Internal.deserialiseValue) <$!>
V.map toLookupResult <$>
Internal.lookups
(resolve @v Proxy)
(V.map Internal.serialiseKey ks)
th
toMonoidalLookupResult

toMonoidalLookupResult :: Maybe (Entry.Entry v b) -> LookupResult v
toMonoidalLookupResult = \case
Just e -> case e of
Entry.Insert v -> Found v
Entry.InsertWithBlob _ _ -> error "toMonoidalLookupResult: InsertWithBlob unexpected"
Entry.Mupdate v -> Found v
where
toLookupResult (Just e) = case e of
Entry.Insert v -> Found (Internal.deserialiseValue v)
Entry.InsertWithBlob _ _ -> error "Monoidal.lookups: unexpected InsertWithBlob"
Entry.Mupdate v -> Found (Internal.deserialiseValue v)
Entry.Delete -> NotFound
Nothing -> NotFound
toLookupResult Nothing = NotFound

{-# SPECIALISE rangeLookup :: (SerialiseKey k, SerialiseValue v, ResolveValue v) => Range k -> TableHandle IO k v -> IO (V.Vector (QueryResult k v)) #-}
-- | Perform a range lookup.
Expand Down
20 changes: 10 additions & 10 deletions src/Database/LSMTree/Normal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ close (Internal.NormalTable th) = Internal.close th
-------------------------------------------------------------------------------}

{-# SPECIALISE lookups :: (SerialiseKey k, SerialiseValue v) => V.Vector k -> TableHandle IO k v blob -> IO (V.Vector (LookupResult v (BlobRef IO blob))) #-}
{-# INLINEABLE lookups #-}
-- | Perform a batch of lookups.
--
-- Lookups can be performed concurrently from multiple Haskell threads.
Expand All @@ -289,17 +290,16 @@ lookups ::
-> TableHandle m k v blob
-> m (V.Vector (LookupResult v (BlobRef m blob)))
lookups ks (Internal.NormalTable th) =
V.mapStrict (bimap Internal.deserialiseValue BlobRef) <$!>
Internal.lookups const (V.map Internal.serialiseKey ks) th toNormalLookupResult

toNormalLookupResult :: Maybe (Entry.Entry v b) -> LookupResult v b
toNormalLookupResult = \case
Just e -> case e of
Entry.Insert v -> Found v
Entry.InsertWithBlob v br -> FoundWithBlob v br
Entry.Mupdate _ -> error "toNormalLookupResult: Mupdate unexpected"
V.map toLookupResult <$>
Internal.lookups const (V.map Internal.serialiseKey ks) th
where
toLookupResult (Just e) = case e of
Entry.Insert v -> Found (Internal.deserialiseValue v)
Entry.InsertWithBlob v br -> FoundWithBlob (Internal.deserialiseValue v)
(BlobRef br)
Entry.Mupdate _ -> error "Normal.lookups: unexpected Mupdate"
Entry.Delete -> NotFound
Nothing -> NotFound
toLookupResult Nothing = NotFound

{-# SPECIALISE rangeLookup :: (SerialiseKey k, SerialiseValue v) => Range k -> TableHandle IO k v blob -> IO (V.Vector (QueryResult k v (BlobRef IO blob))) #-}
-- | Perform a range lookup.
Expand Down
4 changes: 2 additions & 2 deletions test/Test/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ prop_interimOpenTable dat = ioProperty $
let snap = fromMaybe (error "invalid name") $ mkSnapshotName "snap"
numRunsSnapped <- snapshot const snap "someLabel" th
th' <- open sesh "someLabel" configNoOverride snap
lhs <- lookups const ks th id
rhs <- lookups const ks th' id
lhs <- lookups const ks th
rhs <- lookups const ks th'
close th
close th'
-- TODO: checking lookups is a simple check, but we could have stronger
Expand Down
Loading

0 comments on commit 25ced18

Please sign in to comment.