From 4cb40c9945020da4de57cac0f640454a683a6461 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 6 Sep 2024 17:32:49 -0700 Subject: [PATCH 01/23] Change allocator to enqueue with the invariant that the first record of page (p+1) does not fit at the end of page (p). This allows replication to independently replay records and guarantee that they fit on the log exactly in the same way as the primary. --- .../cs/src/core/Allocator/AllocatorBase.cs | 131 +++++++++++------- 1 file changed, 83 insertions(+), 48 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 83cf9073c1..cb0f335ee1 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -781,10 +781,19 @@ protected void IncrementAllocatedPageCount() public long GetTailAddress() { var local = TailPageOffset; - if (local.Offset >= PageSize) + + // Handle corner cases during page overflow + while (local.Offset >= PageSize) { - local.Page++; - local.Offset = 0; + if (local.Offset == PageSize) + { + local.Page++; + local.Offset = 0; + break; + } + // Offset is being adjusted by overflow thread, spin-wait + Thread.Yield(); + local = TailPageOffset; } return ((long)local.Page << LogPageSizeBits) | (uint)local.Offset; } @@ -830,6 +839,75 @@ void AllocatePagesWithException(int pageIndex, PageOffset localTailPageOffset) } } + [MethodImpl(MethodImplOptions.NoInlining)] + static void ThrowTsavoriteException(string message) + => throw new TsavoriteException(message); + + void IssueShiftAddress(long pageIndex) + { + // Issue the shift of address + var shiftAddress = pageIndex << LogPageSizeBits; + PageAlignedShiftReadOnlyAddress(shiftAddress); + PageAlignedShiftHeadAddress(shiftAddress); + } + + [MethodImpl(MethodImplOptions.NoInlining)] + long HandlePageOverflow(ref PageOffset localTailPageOffset, int page, int offset, int numSlots) + { + int pageIndex = localTailPageOffset.Page + 1; + + // This thread is trying to allocate at an offset past where one or more previous threads + // already overflowed; exit and allow the first overflow thread to proceed + if (offset > PageSize) + { + if (NeedToWait(pageIndex)) + return 0; // RETRY_LATER + return -1; // RETRY_NOW + } + + // Thread that owns the page-increment owns the latch now + if (NeedToWait(pageIndex)) + { + // Reset to previous tail so that next attempt can retry + localTailPageOffset.PageAndOffset -= numSlots; + Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); + + // Shift only after TailPageOffset is reset to a valid state + IssueShiftAddress(pageIndex); + + // Re-check as the shifting may have allowed us to proceed + if (NeedToWait(pageIndex)) + return 0; // RETRY_LATER + } + + // The thread that "makes" the offset incorrect should allocate next page and set new tail + if (CannotAllocate(pageIndex)) + { + // Reset to previous tail so that next attempt can retry + localTailPageOffset.PageAndOffset -= numSlots; + Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); + + // Shift only after TailPageOffset is reset to a valid state + IssueShiftAddress(pageIndex); + + // Re-check as the shifting may have allowed us to proceed + if (CannotAllocate(pageIndex)) + return -1; // RETRY_NOW + } + + IssueShiftAddress(pageIndex); + + if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize)) + AllocatePagesWithException(pageIndex, localTailPageOffset); + + localTailPageOffset.Page++; + localTailPageOffset.Offset = numSlots; + TailPageOffset = localTailPageOffset; + page++; + offset = 0; + return (((long)page) << LogPageSizeBits) | ((long)offset); + } + /// Try allocate, no thread spinning allowed /// Number of slots to allocate /// The allocated logical address, or 0 in case of inability to allocate @@ -837,7 +915,7 @@ void AllocatePagesWithException(int pageIndex, PageOffset localTailPageOffset) public long TryAllocate(int numSlots = 1) { if (numSlots > PageSize) - throw new TsavoriteException("Entry does not fit on page"); + ThrowTsavoriteException("Entry does not fit on page"); PageOffset localTailPageOffset = default; localTailPageOffset.PageAndOffset = TailPageOffset.PageAndOffset; @@ -857,53 +935,10 @@ public long TryAllocate(int numSlots = 1) int page = localTailPageOffset.Page; int offset = localTailPageOffset.Offset - numSlots; - #region HANDLE PAGE OVERFLOW if (localTailPageOffset.Offset > PageSize) { - int pageIndex = localTailPageOffset.Page + 1; - - // All overflow threads try to shift addresses - long shiftAddress = ((long)pageIndex) << LogPageSizeBits; - PageAlignedShiftReadOnlyAddress(shiftAddress); - PageAlignedShiftHeadAddress(shiftAddress); - - // This thread is trying to allocate at an offset past where one or more previous threads - // already overflowed; exit and allow the first overflow thread to proceed - if (offset > PageSize) - { - if (NeedToWait(pageIndex)) - return 0; // RETRY_LATER - return -1; // RETRY_NOW - } - - if (NeedToWait(pageIndex)) - { - // Reset to end of page so that next attempt can retry - localTailPageOffset.Offset = PageSize; - Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); - return 0; // RETRY_LATER - } - - // The thread that "makes" the offset incorrect should allocate next page and set new tail - if (CannotAllocate(pageIndex)) - { - // Reset to end of page so that next attempt can retry - localTailPageOffset.Offset = PageSize; - Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); - return -1; // RETRY_NOW - } - - if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize)) - AllocatePagesWithException(pageIndex, localTailPageOffset); - - localTailPageOffset.Page++; - localTailPageOffset.Offset = numSlots; - TailPageOffset = localTailPageOffset; - page++; - offset = 0; + return HandlePageOverflow(ref localTailPageOffset, page, offset, numSlots); } - #endregion - return (((long)page) << LogPageSizeBits) | ((long)offset); } From 198e8797ed9e8b09bd08b2ac2a913ec7364768b2 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Sun, 8 Sep 2024 11:40:11 -0700 Subject: [PATCH 02/23] fixes based on comments --- .../cs/src/core/Allocator/AllocatorBase.cs | 50 ++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index cb0f335ee1..5695a91667 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -819,7 +819,8 @@ public long GetTailAddress() /// Get sector size for main hlog device public int GetDeviceSectorSize() => sectorSize; - void AllocatePagesWithException(int pageIndex, PageOffset localTailPageOffset) + [MethodImpl(MethodImplOptions.NoInlining)] + void AllocatePagesWithException(int pageIndex, PageOffset localTailPageOffset, int numSlots) { try { @@ -833,16 +834,27 @@ void AllocatePagesWithException(int pageIndex, PageOffset localTailPageOffset) } catch { - localTailPageOffset.Offset = PageSize; + // Reset to previous tail + localTailPageOffset.PageAndOffset -= numSlots; Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); throw; } } + /// + /// Throw Tsavorite exception with message. We use a method wrapper so that + /// the caller method can execute inlined. + /// + /// + /// [MethodImpl(MethodImplOptions.NoInlining)] static void ThrowTsavoriteException(string message) => throw new TsavoriteException(message); + /// + /// Shift log addresses when turning the page. + /// + /// The page we are turning to void IssueShiftAddress(long pageIndex) { // Issue the shift of address @@ -852,20 +864,20 @@ void IssueShiftAddress(long pageIndex) } [MethodImpl(MethodImplOptions.NoInlining)] - long HandlePageOverflow(ref PageOffset localTailPageOffset, int page, int offset, int numSlots) + long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) { int pageIndex = localTailPageOffset.Page + 1; // This thread is trying to allocate at an offset past where one or more previous threads - // already overflowed; exit and allow the first overflow thread to proceed - if (offset > PageSize) + // already overflowed; exit and allow the first overflow thread to proceed. + if (localTailPageOffset.Offset - numSlots > PageSize) { if (NeedToWait(pageIndex)) return 0; // RETRY_LATER return -1; // RETRY_NOW } - // Thread that owns the page-increment owns the latch now + // The single thread that "owns" the page-increment proceeds below. if (NeedToWait(pageIndex)) { // Reset to previous tail so that next attempt can retry @@ -895,17 +907,18 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int page, int offset return -1; // RETRY_NOW } - IssueShiftAddress(pageIndex); - if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize)) - AllocatePagesWithException(pageIndex, localTailPageOffset); + AllocatePagesWithException(pageIndex, localTailPageOffset, numSlots); localTailPageOffset.Page++; localTailPageOffset.Offset = numSlots; TailPageOffset = localTailPageOffset; - page++; - offset = 0; - return (((long)page) << LogPageSizeBits) | ((long)offset); + + // Shift only after TailPageOffset is reset to a valid state + IssueShiftAddress(pageIndex); + + // Offset is zero, for the first allocation on the new page + return ((long)localTailPageOffset.Page) << LogPageSizeBits; } /// Try allocate, no thread spinning allowed @@ -921,7 +934,7 @@ public long TryAllocate(int numSlots = 1) localTailPageOffset.PageAndOffset = TailPageOffset.PageAndOffset; // Necessary to check because threads keep retrying and we do not - // want to overflow offset more than once per thread + // want to overflow the offset more than once per thread if (localTailPageOffset.Offset > PageSize) { if (NeedToWait(localTailPageOffset.Page + 1)) @@ -932,14 +945,15 @@ public long TryAllocate(int numSlots = 1) // Determine insertion index. localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots); - int page = localTailPageOffset.Page; - int offset = localTailPageOffset.Offset - numSlots; - + // Slow path when we reach the end of a page. if (localTailPageOffset.Offset > PageSize) { - return HandlePageOverflow(ref localTailPageOffset, page, offset, numSlots); + // Note that TailPageOffset is now unstable -- there may be a GetTailAddress call spinning for + // it to stabilize. Therefore, HandlePageOverflow needs to stabilize TailPageOffset immediately, + // before performing any epoch bumps or system calls. + return HandlePageOverflow(ref localTailPageOffset, numSlots); } - return (((long)page) << LogPageSizeBits) | ((long)offset); + return (((long)localTailPageOffset.Page) << LogPageSizeBits) | ((long)(localTailPageOffset.Offset - numSlots)); } /// Try allocate, spin for RETRY_NOW case From 772cb56726c17f8fa509769cedc4c01efcb72397 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Sun, 8 Sep 2024 11:43:14 -0700 Subject: [PATCH 03/23] add another comment --- libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 5695a91667..41baeb7e6a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -783,6 +783,8 @@ public long GetTailAddress() var local = TailPageOffset; // Handle corner cases during page overflow + // The while loop is guaranteed to terminate because HandlePageOverflow + // ensures that it fixes the unstable TailPageOffset immediately. while (local.Offset >= PageSize) { if (local.Offset == PageSize) From 8b5708e1f6e2bdb835e0e89a0a9269c4ffd70b6b Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Mon, 9 Sep 2024 09:53:49 -0700 Subject: [PATCH 04/23] add comments --- .../Tsavorite/cs/src/core/Allocator/AllocatorBase.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 41baeb7e6a..d710d0fab4 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -879,7 +879,9 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) return -1; // RETRY_NOW } - // The single thread that "owns" the page-increment proceeds below. + // The single thread that "owns" the page-increment proceeds below. This is the thread for which: + // 1. Old image of offset (pre-Interlocked.Increment) is <= PageSize, and + // 2. New image of offset (post-Interlocked.Increment) is > PageSize. if (NeedToWait(pageIndex)) { // Reset to previous tail so that next attempt can retry @@ -894,7 +896,7 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) return 0; // RETRY_LATER } - // The thread that "makes" the offset incorrect should allocate next page and set new tail + // Allocate next page and set new tail if (CannotAllocate(pageIndex)) { // Reset to previous tail so that next attempt can retry From f53f5169742ac5bc1ae7cda6b0feb5b8c0020e23 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Mon, 9 Sep 2024 10:40:49 -0700 Subject: [PATCH 05/23] fixes - we now always wrap TryAllocate with TryAllocateRetryNow. --- .../cs/src/core/Allocator/AllocatorBase.cs | 14 ++++----- .../Tsavorite/Implementation/BlockAllocate.cs | 16 +++------- .../cs/src/core/TsavoriteLog/TsavoriteLog.cs | 29 +++++++------------ 3 files changed, 21 insertions(+), 38 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index d710d0fab4..cb5d135dc1 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -891,9 +891,7 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) // Shift only after TailPageOffset is reset to a valid state IssueShiftAddress(pageIndex); - // Re-check as the shifting may have allowed us to proceed - if (NeedToWait(pageIndex)) - return 0; // RETRY_LATER + return 0; // RETRY_LATER } // Allocate next page and set new tail @@ -906,9 +904,7 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) // Shift only after TailPageOffset is reset to a valid state IssueShiftAddress(pageIndex); - // Re-check as the shifting may have allowed us to proceed - if (CannotAllocate(pageIndex)) - return -1; // RETRY_NOW + return -1; // RETRY_NOW } if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize)) @@ -929,7 +925,7 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) /// Number of slots to allocate /// The allocated logical address, or 0 in case of inability to allocate [MethodImpl(MethodImplOptions.AggressiveInlining)] - public long TryAllocate(int numSlots = 1) + long TryAllocate(int numSlots = 1) { if (numSlots > PageSize) ThrowTsavoriteException("Entry does not fit on page"); @@ -968,7 +964,11 @@ public long TryAllocateRetryNow(int numSlots = 1) { long logicalAddress; while ((logicalAddress = TryAllocate(numSlots)) < 0) + { + _ = TryComplete(); epoch.ProtectAndDrain(); + Thread.Yield(); + } return logicalAddress; } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs index 6fc1b38727..737330e6c2 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs @@ -20,7 +20,7 @@ private static bool TryBlockAllocate( out OperationStatus internalStatus) { pendingContext.flushEvent = allocator.FlushEvent; - logicalAddress = allocator.TryAllocate(recordSize); + logicalAddress = allocator.TryAllocateRetryNow(recordSize); if (logicalAddress > 0) { pendingContext.flushEvent = default; @@ -28,17 +28,9 @@ private static bool TryBlockAllocate( return true; } - if (logicalAddress == 0) - { - // We expect flushEvent to be signaled. - internalStatus = OperationStatus.ALLOCATE_FAILED; - return false; - } - - // logicalAddress is < 0 so we do not expect flushEvent to be signaled; return RETRY_LATER to refresh the epoch. - pendingContext.flushEvent = default; - allocator.TryComplete(); - internalStatus = OperationStatus.RETRY_LATER; + Debug.Assert(logicalAddress == 0); + // We expect flushEvent to be signaled. + internalStatus = OperationStatus.ALLOCATE_FAILED; return false; } diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index 6b7c358251..33989bf0d3 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -7,7 +7,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; -using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -862,28 +861,20 @@ private long AllocateBlock(int recordSize) while (true) { var flushEvent = allocator.FlushEvent; - var logicalAddress = allocator.TryAllocate(recordSize); + var logicalAddress = allocator.TryAllocateRetryNow(recordSize); if (logicalAddress > 0) return logicalAddress; - - if (logicalAddress == 0) + Debug.Assert(logicalAddress == 0); + epoch.Suspend(); + if (cannedException != null) throw cannedException; + try { - epoch.Suspend(); - if (cannedException != null) throw cannedException; - try - { - flushEvent.Wait(); - } - finally - { - epoch.Resume(); - } + flushEvent.Wait(); + } + finally + { + epoch.Resume(); } - - // logicalAddress is < 0 so we do not expect flushEvent to be signaled; refresh the epoch and retry now - allocator.TryComplete(); - epoch.ProtectAndDrain(); - Thread.Yield(); } } From b0f38860f54d22d731bb87ade19631737522b104 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Mon, 9 Sep 2024 11:48:42 -0700 Subject: [PATCH 06/23] Add Non-readcache "Insert At Tail" stress test --- .../test/InsertAtTailSpanByteStressTests.cs | 294 ++++++++++++++++++ .../Tsavorite/cs/test/ReadCacheChainTests.cs | 31 -- libs/storage/Tsavorite/cs/test/TestUtils.cs | 28 ++ 3 files changed, 322 insertions(+), 31 deletions(-) create mode 100644 libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs diff --git a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs new file mode 100644 index 0000000000..d26eae5a54 --- /dev/null +++ b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs @@ -0,0 +1,294 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading.Tasks; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using Tsavorite.core; +using static Tsavorite.test.TestUtils; + +#pragma warning disable // Add parentheses for clarity + +namespace Tsavorite.test.InsertAtTailStressTests +{ + using SpanByteStoreFunctions = StoreFunctions; + + class SpanByteInsertAtTailChainTests + { + private TsavoriteKV> store; + private IDevice log; + SpanByteComparerModulo comparer; + + const long ValueAdd = 1_000_000_000; + const long NumKeys = 2_000; + + [SetUp] + public void Setup() + { + DeleteDirectory(MethodTestDir, wait: true); + + string filename = Path.Join(MethodTestDir, $"{GetType().Name}.log"); + log = new NullDevice(); + + HashModulo modRange = HashModulo.NoMod; + foreach (var arg in TestContext.CurrentContext.Test.Arguments) + { + if (arg is HashModulo cr) + { + modRange = cr; + continue; + } + } + + // Make the main log mutable region small enough that we force the readonly region to stay close to tail, causing inserts. + int pageBits = 15, memoryBits = 34; + store = new(new() + { + LogDevice = log, + PageSize = 1L << 15, + MemorySize = 1L << 34, + MutableFraction = 1.0 / (1 << memoryBits - pageBits + 2), + }, StoreFunctions.Create(comparer, SpanByteRecordDisposer.Instance) + , (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions) + ); + + comparer = new SpanByteComparerModulo(modRange); + } + + [TearDown] + public void TearDown() + { + store?.Dispose(); + store = null; + log?.Dispose(); + log = null; + DeleteDirectory(MethodTestDir); + } + + internal class RmwSpanByteFunctions : SpanByteFunctions + { + /// + public override bool ConcurrentWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, ref RecordInfo recordInfo) + { + src.CopyTo(ref dst); + src.CopyTo(ref output, memoryPool); + return true; + } + + /// + public override bool SingleWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, WriteReason reason, ref RecordInfo recordInfo) + { + src.CopyTo(ref dst); + src.CopyTo(ref output, memoryPool); + return true; + } + + /// + public override bool CopyUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte oldValue, ref SpanByte newValue, ref SpanByteAndMemory output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) + { + input.CopyTo(ref newValue); + input.CopyTo(ref output, memoryPool); + return true; + } + + /// + public override bool InPlaceUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) + { + // The default implementation of IPU simply writes input to destination, if there is space + base.InPlaceUpdater(ref key, ref input, ref value, ref output, ref rmwInfo, ref recordInfo); + input.CopyTo(ref output, memoryPool); + return true; + } + + /// + public override bool InitialUpdater(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref SpanByteAndMemory output, ref RMWInfo rmwInfo, ref RecordInfo recordInfo) + { + Assert.Fail("For these tests, InitialUpdater should never be called"); + return false; + } + } + + unsafe void PopulateAndSetReadOnlyToTail() + { + using var session = store.NewSession>(new SpanByteFunctions()); + var bContext = session.BasicContext; + + Span keyVec = stackalloc byte[sizeof(long)]; + var key = SpanByte.FromPinnedSpan(keyVec); + + for (long ii = 0; ii < NumKeys; ii++) + { + ClassicAssert.IsTrue(BitConverter.TryWriteBytes(keyVec, ii)); + var status = bContext.Upsert(ref key, ref key); + ClassicAssert.IsTrue(status.Record.Created, status.ToString()); + } + bContext.CompletePending(true); + store.Log.ShiftReadOnlyAddress(store.Log.TailAddress, wait: true); + } + + [Test] + [Category(TsavoriteKVTestCategory)] + [Category(StressTestCategory)] + //[Repeat(300)] + public void SpanByteTailInsertMultiThreadTest([Values] HashModulo modRange, [Values(0, 1, 2, 8)] int numReadThreads, [Values(0, 1, 2, 8)] int numWriteThreads, + [Values(UpdateOp.Upsert, UpdateOp.RMW)] UpdateOp updateOp) + { + if (numReadThreads == 0 && numWriteThreads == 0) + Assert.Ignore("Skipped due to 0 threads for both read and update"); + if ((numReadThreads > 2 || numWriteThreads > 2) && IsRunningAzureTests) + Assert.Ignore("Skipped because > 2 threads when IsRunningAzureTests"); + if (TestContext.CurrentContext.CurrentRepeatCount > 0) + Debug.WriteLine($"*** Current test iteration: {TestContext.CurrentContext.CurrentRepeatCount + 1} ***"); + + // Initial population so we know we can read the keys. + PopulateAndSetReadOnlyToTail(); + + const int numIterations = 10; + unsafe void runReadThread(int tid) + { + using var session = store.NewSession>(new SpanByteFunctions()); + var bContext = session.BasicContext; + + Span keyVec = stackalloc byte[sizeof(long)]; + var key = SpanByte.FromPinnedSpan(keyVec); + + for (var iteration = 0; iteration < numIterations; ++iteration) + { + var numCompleted = 0; + for (var ii = 0; ii < NumKeys; ++ii) + { + SpanByteAndMemory output = default; + + ClassicAssert.IsTrue(BitConverter.TryWriteBytes(keyVec, ii)); + var status = bContext.Read(ref key, ref output); + + var numPending = ii - numCompleted; + if (status.IsPending) + ++numPending; + else + { + ++numCompleted; + + ClassicAssert.IsTrue(status.Found, $"tid {tid}, key {ii}, {status}, wasPending {false}, pt 1"); + ClassicAssert.IsNotNull(output.Memory, $"tid {tid}, key {ii}, wasPending {false}, pt 2"); + long value = BitConverter.ToInt64(output.AsReadOnlySpan()); + ClassicAssert.AreEqual(ii, value % ValueAdd, $"tid {tid}, key {ii}, wasPending {false}, pt 3"); + output.Memory.Dispose(); + } + + if (numPending > 0) + { + bContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); + using (completedOutputs) + { + while (completedOutputs.Next()) + { + ++numCompleted; + + status = completedOutputs.Current.Status; + output = completedOutputs.Current.Output; + // Note: do NOT overwrite 'key' here + long keyLong = BitConverter.ToInt64(completedOutputs.Current.Key.AsReadOnlySpan()); + + ClassicAssert.AreEqual(completedOutputs.Current.RecordMetadata.Address == Constants.kInvalidAddress, status.Record.CopiedToReadCache, $"key {keyLong}: {status}"); + + ClassicAssert.IsTrue(status.Found, $"tid {tid}, key {keyLong}, {status}, wasPending {true}, pt 1"); + ClassicAssert.IsNotNull(output.Memory, $"tid {tid}, key {keyLong}, wasPending {true}, pt 2"); + long value = BitConverter.ToInt64(output.AsReadOnlySpan()); + ClassicAssert.AreEqual(keyLong, value % ValueAdd, $"tid {tid}, key {keyLong}, wasPending {true}, pt 3"); + output.Memory.Dispose(); + } + } + } + } + ClassicAssert.AreEqual(NumKeys, numCompleted, "numCompleted"); + } + } + + unsafe void runUpdateThread(int tid) + { + using var session = store.NewSession>(new RmwSpanByteFunctions()); + var bContext = session.BasicContext; + + Span keyVec = stackalloc byte[sizeof(long)]; + var key = SpanByte.FromPinnedSpan(keyVec); + Span inputVec = stackalloc byte[sizeof(long)]; + var input = SpanByte.FromPinnedSpan(inputVec); + + for (var iteration = 0; iteration < numIterations; ++iteration) + { + var numCompleted = 0; + for (var ii = 0; ii < NumKeys; ++ii) + { + SpanByteAndMemory output = default; + + ClassicAssert.IsTrue(BitConverter.TryWriteBytes(keyVec, ii)); + ClassicAssert.IsTrue(BitConverter.TryWriteBytes(inputVec, ii + ValueAdd)); + var status = updateOp == UpdateOp.RMW + ? bContext.RMW(ref key, ref input, ref output) + : bContext.Upsert(ref key, ref input, ref input, ref output); + + var numPending = ii - numCompleted; + if (status.IsPending) + { + ClassicAssert.AreNotEqual(UpdateOp.Upsert, updateOp, "Upsert should not go pending"); + ++numPending; + } + else + { + ++numCompleted; + if (updateOp == UpdateOp.RMW) // Upsert will not try to find records below HeadAddress, but it may find them in-memory + ClassicAssert.IsTrue(status.Found, $"tid {tid}, key {ii}, {status}"); + + long value = BitConverter.ToInt64(output.AsReadOnlySpan()); + ClassicAssert.AreEqual(ii + ValueAdd, value, $"tid {tid}, key {ii}, wasPending {false}"); + + output.Memory?.Dispose(); + } + + if (numPending > 0) + { + bContext.CompletePendingWithOutputs(out var completedOutputs, wait: true); + using (completedOutputs) + { + while (completedOutputs.Next()) + { + ++numCompleted; + + status = completedOutputs.Current.Status; + output = completedOutputs.Current.Output; + // Note: do NOT overwrite 'key' here + long keyLong = BitConverter.ToInt64(completedOutputs.Current.Key.AsReadOnlySpan()); + + if (updateOp == UpdateOp.RMW) // Upsert will not try to find records below HeadAddress, but it may find them in-memory + ClassicAssert.IsTrue(status.Found, $"tid {tid}, key {keyLong}, {status}"); + + long value = BitConverter.ToInt64(output.AsReadOnlySpan()); + ClassicAssert.AreEqual(keyLong + ValueAdd, value, $"tid {tid}, key {keyLong}, wasPending {true}"); + + output.Memory?.Dispose(); + } + } + } + } + ClassicAssert.AreEqual(NumKeys, numCompleted, "numCompleted"); + } + } + + List tasks = new(); // Task rather than Thread for propagation of exception. + for (int t = 1; t <= numReadThreads + numWriteThreads; t++) + { + var tid = t; + if (t <= numReadThreads) + tasks.Add(Task.Factory.StartNew(() => runReadThread(tid))); + else + tasks.Add(Task.Factory.StartNew(() => runUpdateThread(tid))); + } + Task.WaitAll(tasks.ToArray()); + } + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/test/ReadCacheChainTests.cs b/libs/storage/Tsavorite/cs/test/ReadCacheChainTests.cs index 6996c00f34..adefeb6bbd 100644 --- a/libs/storage/Tsavorite/cs/test/ReadCacheChainTests.cs +++ b/libs/storage/Tsavorite/cs/test/ReadCacheChainTests.cs @@ -16,37 +16,6 @@ #pragma warning disable // Add parentheses for clarity -namespace Tsavorite.test.ReadCacheTests -{ - // Must be in a separate block so the "using StructStoreFunctions" is the first line in its namespace declaration. - internal class LongComparerModulo : IKeyComparer - { - readonly long mod; - - internal LongComparerModulo(long mod) => this.mod = mod; - - public bool Equals(ref long k1, ref long k2) => k1 == k2; - - public long GetHashCode64(ref long k) => mod == 0 ? k : k % mod; - } - - internal struct SpanByteComparerModulo : IKeyComparer - { - readonly HashModulo modRange; - - internal SpanByteComparerModulo(HashModulo mod) => modRange = mod; - - public readonly bool Equals(ref SpanByte k1, ref SpanByte k2) => SpanByteComparer.StaticEquals(ref k1, ref k2); - - // Force collisions to create a chain - public readonly long GetHashCode64(ref SpanByte k) - { - var value = SpanByteComparer.StaticGetHashCode64(ref k); - return modRange != HashModulo.NoMod ? value % (long)modRange : value; - } - } -} - namespace Tsavorite.test.ReadCacheTests { using LongAllocator = BlittableAllocator>>; diff --git a/libs/storage/Tsavorite/cs/test/TestUtils.cs b/libs/storage/Tsavorite/cs/test/TestUtils.cs index 6997ec34a6..751e2b75b3 100644 --- a/libs/storage/Tsavorite/cs/test/TestUtils.cs +++ b/libs/storage/Tsavorite/cs/test/TestUtils.cs @@ -10,6 +10,7 @@ using NUnit.Framework.Legacy; using Tsavorite.core; using Tsavorite.devices; +using static Tsavorite.test.TestUtils; namespace Tsavorite.test { @@ -269,6 +270,33 @@ internal static unsafe bool FindHashBucketEntryForKey + { + readonly long mod; + + internal LongComparerModulo(long mod) => this.mod = mod; + + public bool Equals(ref long k1, ref long k2) => k1 == k2; + + public long GetHashCode64(ref long k) => mod == 0 ? k : k % mod; + } + + internal struct SpanByteComparerModulo : IKeyComparer + { + readonly HashModulo modRange; + + internal SpanByteComparerModulo(HashModulo mod) => modRange = mod; + + public readonly bool Equals(ref SpanByte k1, ref SpanByte k2) => SpanByteComparer.StaticEquals(ref k1, ref k2); + + // Force collisions to create a chain + public readonly long GetHashCode64(ref SpanByte k) + { + var value = SpanByteComparer.StaticGetHashCode64(ref k); + return modRange != HashModulo.NoMod ? value % (long)modRange : value; + } + } + static class StaticTestUtils { internal static (Status status, TOutput output) GetSinglePendingResult( From fb0a8076c7265bdb52fbdfc96bb42113b8617e1c Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Wed, 11 Sep 2024 16:20:21 -0700 Subject: [PATCH 07/23] support 0% mutable fraction. --- .../cs/src/core/Allocator/AllocatorBase.cs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index cb5d135dc1..0c7b5fcca2 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -66,7 +66,7 @@ public abstract partial class AllocatorBaseHow many pages do we leave empty in the in-memory buffer (between 0 and BufferSize-1) private int emptyPageCount; - /// HeadOFfset lag address + /// HeadOffset lag address internal long HeadOffsetLagAddress; /// @@ -861,8 +861,17 @@ void IssueShiftAddress(long pageIndex) { // Issue the shift of address var shiftAddress = pageIndex << LogPageSizeBits; - PageAlignedShiftReadOnlyAddress(shiftAddress); - PageAlignedShiftHeadAddress(shiftAddress); + var tailAddress = GetTailAddress(); + + long desiredReadOnlyAddress = shiftAddress - ReadOnlyLagAddress; + if (desiredReadOnlyAddress > tailAddress) + desiredReadOnlyAddress = tailAddress; + ShiftReadOnlyAddress(desiredReadOnlyAddress); + + long desiredHeadAddress = shiftAddress - HeadOffsetLagAddress; + if (desiredHeadAddress > tailAddress) + desiredHeadAddress = tailAddress; + ShiftHeadAddress(desiredHeadAddress); } [MethodImpl(MethodImplOptions.NoInlining)] From 1c95a5ac908952225920a2d431a1e8124246b8b2 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Wed, 11 Sep 2024 21:27:57 -0700 Subject: [PATCH 08/23] Fix InernalUpsert srcRecordInfo setting when found below ReadOnlyAddress Add some comments --- .../cs/src/core/Allocator/AllocatorBase.cs | 10 +++-- .../Tsavorite/Implementation/BlockAllocate.cs | 8 +++- .../Tsavorite/Implementation/InternalRMW.cs | 12 +++--- .../Implementation/InternalUpsert.cs | 17 ++++++--- .../test/InsertAtTailSpanByteStressTests.cs | 38 +++++++++++++++---- 5 files changed, 59 insertions(+), 26 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 0c7b5fcca2..f2e60f9d50 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -880,7 +880,9 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) int pageIndex = localTailPageOffset.Page + 1; // This thread is trying to allocate at an offset past where one or more previous threads - // already overflowed; exit and allow the first overflow thread to proceed. + // already overflowed; exit and allow the first overflow thread to proceed. Do not try to remove + // the update to TailPageOffset that was done by this thread; that will be overwritten when + // the first overflow thread finally completes and updates TailPageOffset. if (localTailPageOffset.Offset - numSlots > PageSize) { if (NeedToWait(pageIndex)) @@ -951,7 +953,9 @@ long TryAllocate(int numSlots = 1) return -1; // RETRY_NOW } - // Determine insertion index. + // Determine insertion index. Note that this forms a kind of "lock"; after the first thread does this, other threads that do + // it will see that another thread got there first because the subsequent "back up by numSlots" will still be past PageSize, + // so they will exit and RETRY in HandlePageOverflow; the first thread "owns" the overflow operation and must stabilize it. localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots); // Slow path when we reach the end of a page. @@ -965,7 +969,7 @@ long TryAllocate(int numSlots = 1) return (((long)localTailPageOffset.Page) << LogPageSizeBits) | ((long)(localTailPageOffset.Offset - numSlots)); } - /// Try allocate, spin for RETRY_NOW case + /// Try allocate, spin for RETRY_NOW (logicalAddress < 0) case /// Number of slots to allocate /// The allocated logical address, or 0 in case of inability to allocate [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs index 737330e6c2..f03b1b323a 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs @@ -34,10 +34,14 @@ private static bool TryBlockAllocate( return false; } + /// Options for TryAllocateRecord. internal struct AllocateOptions { + /// If true, use the non-revivification recycling of records that failed to CAS and are carried in PendingContext through RETRY. internal bool Recycle; - internal bool IgnoreHeiAddress; + + /// If true, the source record is elidable so we can try to elide from the tag chain (and transfer it to the FreeList if we're doing Revivification). + internal bool ElideSourceRecord; }; [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -56,7 +60,7 @@ bool TryAllocateRecord(TSes return true; if (RevivificationManager.UseFreeRecordPool) { - if (!options.IgnoreHeiAddress && stackCtx.hei.Address >= minMutableAddress) + if (!options.ElideSourceRecord && stackCtx.hei.Address >= minMutableAddress) minRevivAddress = stackCtx.hei.Address; if (sessionFunctions.Ctx.IsInV1) { diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs index ce614bd852..0652f03c90 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/InternalRMW.cs @@ -406,9 +406,7 @@ private OperationStatus CreateNewRecordRMW(sessionFunctions, ref stackCtx, ref srcRecordInfo) + ElideSourceRecord = stackCtx.recSrc.HasMainLogSrc && CanElide(sessionFunctions, ref stackCtx, ref srcRecordInfo) }; if (!TryAllocateRecord(sessionFunctions, ref pendingContext, ref stackCtx, actualSize, ref allocatedSize, keySize, allocOptions, @@ -416,7 +414,7 @@ private OperationStatus CreateNewRecordRMW= hlogBase.HeadAddress) + { + // Safe Read-Only Region: Create a record in the mutable region, but set srcRecordInfo in case we are eliding. + if (stackCtx.recSrc.HasMainLogSrc) + srcRecordInfo = ref stackCtx.recSrc.GetInfo(); + goto CreateNewRecord; + } // No record exists, or readonly or below. Drop through to create new record. Debug.Assert(!sessionFunctions.IsManualLocking || LockTable.IsLockedExclusive(ref stackCtx.hei), "A Lockable-session Upsert() of an on-disk or non-existent key requires a LockTable lock"); @@ -303,9 +310,7 @@ private OperationStatus CreateNewRecordUpsert(sessionFunctions, ref stackCtx, ref srcRecordInfo) + ElideSourceRecord = stackCtx.recSrc.HasMainLogSrc && CanElide(sessionFunctions, ref stackCtx, ref srcRecordInfo) }; if (!TryAllocateRecord(sessionFunctions, ref pendingContext, ref stackCtx, actualSize, ref allocatedSize, keySize, allocOptions, @@ -313,7 +318,7 @@ private OperationStatus CreateNewRecordUpsert; + // Number of mutable pages for this test + public enum MutablePages + { + Zero, + Eight + } + class SpanByteInsertAtTailChainTests { private TsavoriteKV> store; @@ -26,6 +33,13 @@ class SpanByteInsertAtTailChainTests const long ValueAdd = 1_000_000_000; const long NumKeys = 2_000; + long GetMutablePageCount(MutablePages mp) => mp switch + { + MutablePages.Zero => 0, + MutablePages.Eight => 8, + _ => 8 + }; + [SetUp] public void Setup() { @@ -35,6 +49,7 @@ public void Setup() log = new NullDevice(); HashModulo modRange = HashModulo.NoMod; + long mutablePages = GetMutablePageCount(MutablePages.Eight); foreach (var arg in TestContext.CurrentContext.Test.Arguments) { if (arg is HashModulo cr) @@ -42,17 +57,24 @@ public void Setup() modRange = cr; continue; } + if (arg is MutablePages mp) + { + mutablePages = GetMutablePageCount(mp); + continue; + } } // Make the main log mutable region small enough that we force the readonly region to stay close to tail, causing inserts. int pageBits = 15, memoryBits = 34; - store = new(new() - { - LogDevice = log, - PageSize = 1L << 15, - MemorySize = 1L << 34, - MutableFraction = 1.0 / (1 << memoryBits - pageBits + 2), - }, StoreFunctions.Create(comparer, SpanByteRecordDisposer.Instance) + KVSettings kvSettings = new() + { + LogDevice = log, + PageSize = 1L << pageBits, + MemorySize = 1L << memoryBits, + MutableFraction = 8.0 / (1 << (memoryBits - pageBits)), + }; + store = new(kvSettings + , StoreFunctions.Create(comparer, SpanByteRecordDisposer.Instance) , (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions) ); @@ -135,7 +157,7 @@ unsafe void PopulateAndSetReadOnlyToTail() [Category(StressTestCategory)] //[Repeat(300)] public void SpanByteTailInsertMultiThreadTest([Values] HashModulo modRange, [Values(0, 1, 2, 8)] int numReadThreads, [Values(0, 1, 2, 8)] int numWriteThreads, - [Values(UpdateOp.Upsert, UpdateOp.RMW)] UpdateOp updateOp) + [Values(UpdateOp.Upsert, UpdateOp.RMW)] UpdateOp updateOp, [Values] MutablePages mutablePages) { if (numReadThreads == 0 && numWriteThreads == 0) Assert.Ignore("Skipped due to 0 threads for both read and update"); From 3958da1aacfcf3c7dbf20a703eb463595e3f1dca Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Thu, 12 Sep 2024 09:39:23 -0700 Subject: [PATCH 09/23] Adjust mutable-page counts in stress test --- .../Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs index 8b22bde239..d1c9ee25f1 100644 --- a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs +++ b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs @@ -21,7 +21,8 @@ namespace Tsavorite.test.InsertAtTailStressTests public enum MutablePages { Zero, - Eight + One, + Two } class SpanByteInsertAtTailChainTests @@ -36,7 +37,8 @@ class SpanByteInsertAtTailChainTests long GetMutablePageCount(MutablePages mp) => mp switch { MutablePages.Zero => 0, - MutablePages.Eight => 8, + MutablePages.One => 0, + MutablePages.Two => 2, _ => 8 }; @@ -49,7 +51,7 @@ public void Setup() log = new NullDevice(); HashModulo modRange = HashModulo.NoMod; - long mutablePages = GetMutablePageCount(MutablePages.Eight); + long mutablePages = GetMutablePageCount(MutablePages.Two); foreach (var arg in TestContext.CurrentContext.Test.Arguments) { if (arg is HashModulo cr) From 56db5871d42928b49a865d135faef66b5e3dd6e9 Mon Sep 17 00:00:00 2001 From: TedHartMS <15467143+TedHartMS@users.noreply.github.com> Date: Thu, 12 Sep 2024 09:40:49 -0700 Subject: [PATCH 10/23] fix typo --- .../Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs index d1c9ee25f1..30722bdcf4 100644 --- a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs +++ b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs @@ -37,7 +37,7 @@ class SpanByteInsertAtTailChainTests long GetMutablePageCount(MutablePages mp) => mp switch { MutablePages.Zero => 0, - MutablePages.One => 0, + MutablePages.One => 1, MutablePages.Two => 2, _ => 8 }; From 845504be1a71516000e6fce50ff0d9120c20930d Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 12 Sep 2024 15:05:40 -0700 Subject: [PATCH 11/23] Enforce at least two pages of memory. --- .../cs/src/core/Allocator/AllocatorBase.cs | 4 ++-- libs/storage/Tsavorite/cs/test/LogResumeTests.cs | 16 ++++++++-------- libs/storage/Tsavorite/cs/test/LogTests.cs | 2 +- .../Tsavorite/cs/test/NeedCopyUpdateTests.cs | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index f2e60f9d50..73e06617f5 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -577,8 +577,8 @@ private protected AllocatorBase(LogSettings settings, TStoreFunctions storeFunct if (SegmentSize < PageSize) throw new TsavoriteException($"Segment ({SegmentSize}) must be at least of page size ({PageSize})"); - if ((LogTotalSizeBits != 0) && (LogTotalSizeBytes < PageSize)) - throw new TsavoriteException($"Memory size ({LogTotalSizeBytes}) must be configured to be either 1 (i.e., 0 bits) or at least page size ({PageSize})"); + if ((LogTotalSizeBits != 0) && (LogTotalSizeBytes < PageSize * 2)) + throw new TsavoriteException($"Memory size ({LogTotalSizeBytes}) must be at least twice the page size ({PageSize})"); // Readonlymode has MemorySizeBits 0 => skip the check if (settings.MemorySizeBits > 0 && settings.MinEmptyPageCount > MaxEmptyPageCount) diff --git a/libs/storage/Tsavorite/cs/test/LogResumeTests.cs b/libs/storage/Tsavorite/cs/test/LogResumeTests.cs index 49a201a6f5..8f6766afde 100644 --- a/libs/storage/Tsavorite/cs/test/LogResumeTests.cs +++ b/libs/storage/Tsavorite/cs/test/LogResumeTests.cs @@ -44,7 +44,7 @@ public async Task TsavoriteLogResumePersistedReaderSpec([Values] LogChecksumType var input3 = new byte[] { 11, 12 }; string readerName = "abc"; - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum })) { await l.EnqueueAsync(input1, cancellationToken); await l.EnqueueAsync(input2); @@ -58,7 +58,7 @@ public async Task TsavoriteLogResumePersistedReaderSpec([Values] LogChecksumType await l.CommitAsync(); } - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum })) { using var recoveredIterator = l.Scan(0, long.MaxValue, readerName); ClassicAssert.IsTrue(recoveredIterator.GetNext(out byte[] outBuf, out _, out _, out _)); @@ -77,7 +77,7 @@ public async Task TsavoriteLogResumeViaCompleteUntilRecordAtSpec([Values] LogChe var input3 = new byte[] { 11, 12 }; string readerName = "abc"; - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum })) { await l.EnqueueAsync(input1, cancellationToken); await l.EnqueueAsync(input2); @@ -91,7 +91,7 @@ public async Task TsavoriteLogResumeViaCompleteUntilRecordAtSpec([Values] LogChe await l.CommitAsync(); } - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum })) { using var recoveredIterator = l.Scan(0, long.MaxValue, readerName); ClassicAssert.IsTrue(recoveredIterator.GetNext(out byte[] outBuf, out _, out _, out _)); @@ -112,7 +112,7 @@ public async Task TsavoriteLogResumePersistedReader2([Values] LogChecksumType lo { long originalCompleted; - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) { await l.EnqueueAsync(input1); await l.CommitAsync(); @@ -129,7 +129,7 @@ public async Task TsavoriteLogResumePersistedReader2([Values] LogChecksumType lo originalCompleted = originalIterator.CompletedUntilAddress; } - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) { using var recoveredIterator = l.Scan(0, long.MaxValue, readerName); ClassicAssert.IsTrue(recoveredIterator.GetNext(out byte[] outBuf, out _, out _, out _)); @@ -155,7 +155,7 @@ public async Task TsavoriteLogResumePersistedReader3([Values] LogChecksumType lo { long originalCompleted; - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) { await l.EnqueueAsync(input1); await l.CommitAsync(); @@ -180,7 +180,7 @@ public async Task TsavoriteLogResumePersistedReader3([Values] LogChecksumType lo originalCompleted = originalIterator.CompletedUntilAddress; } - using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) + using (var l = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = logCommitManager })) { using var recoveredIterator = l.Scan(0, l.TailAddress, readerName); diff --git a/libs/storage/Tsavorite/cs/test/LogTests.cs b/libs/storage/Tsavorite/cs/test/LogTests.cs index cfb61398b9..ea597590dc 100644 --- a/libs/storage/Tsavorite/cs/test/LogTests.cs +++ b/libs/storage/Tsavorite/cs/test/LogTests.cs @@ -618,7 +618,7 @@ public async ValueTask EnqueueAndWaitForCommitAsyncBasicTest([Values] LogChecksu { LogDevice = device, PageSizeBits = 16, - MemorySizeBits = 16, + MemorySizeBits = 17, LogChecksum = logChecksum, LogCommitManager = manager, SegmentSizeBits = 22 diff --git a/libs/storage/Tsavorite/cs/test/NeedCopyUpdateTests.cs b/libs/storage/Tsavorite/cs/test/NeedCopyUpdateTests.cs index 7eb1d0fa56..2eeab310e6 100644 --- a/libs/storage/Tsavorite/cs/test/NeedCopyUpdateTests.cs +++ b/libs/storage/Tsavorite/cs/test/NeedCopyUpdateTests.cs @@ -187,7 +187,7 @@ public void Setup() IndexSize = 1L << 13, LogDevice = log, MutableFraction = 0.1, - MemorySize = 1L << PageSizeBits, + MemorySize = 1L << (PageSizeBits + 1), PageSize = 1L << PageSizeBits }, StoreFunctions.Create(LongKeyComparer.Instance) , (allocatorSettings, storeFunctions) => new(allocatorSettings, storeFunctions) From 2b41b1372206c8e7f5891c720db87dbef41521a8 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 12 Sep 2024 15:14:37 -0700 Subject: [PATCH 12/23] nit --- .../cs/test/InsertAtTailSpanByteStressTests.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs index 30722bdcf4..747b9a2353 100644 --- a/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs +++ b/libs/storage/Tsavorite/cs/test/InsertAtTailSpanByteStressTests.cs @@ -19,7 +19,7 @@ namespace Tsavorite.test.InsertAtTailStressTests // Number of mutable pages for this test public enum MutablePages - { + { Zero, One, Two @@ -35,12 +35,12 @@ class SpanByteInsertAtTailChainTests const long NumKeys = 2_000; long GetMutablePageCount(MutablePages mp) => mp switch - { - MutablePages.Zero => 0, - MutablePages.One => 1, - MutablePages.Two => 2, - _ => 8 - }; + { + MutablePages.Zero => 0, + MutablePages.One => 1, + MutablePages.Two => 2, + _ => 8 + }; [SetUp] public void Setup() From 4e1fdd9eba57e8585054cb456f10fa22909b0614 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 12 Sep 2024 16:18:49 -0700 Subject: [PATCH 13/23] update Garnet to use new allocator logic --- .../ReplicaOps/ReplicationReplicaAofSync.cs | 15 +---- .../cs/src/core/Allocator/AllocatorBase.cs | 63 +------------------ .../cs/src/core/TsavoriteLog/TsavoriteLog.cs | 11 ---- 3 files changed, 4 insertions(+), 85 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs index 669ac99637..81e40677d2 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs @@ -38,9 +38,9 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre if (clusterProvider.serverOptions.MainMemoryReplication) { - var firstRecordLength = GetFirstAofEntryLength(record); - if (previousAddress > ReplicationOffset || - currentAddress >= previousAddress + firstRecordLength) + // If the incoming AOF chunk fits in the space between previousAddress and currentAddress (ReplicationOffset), + // an enqueue will result in an offset mismatch. So, we have to first reset the AOF to point to currentAddress. + if (currentAddress >= previousAddress + recordLength) { logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress); @@ -56,15 +56,6 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false); } - // If there is a gap between the local tail and incoming currentAddress, try to skip local AOF to the next page - if (currentAddress >= storeWrapper.appendOnlyFile.TailAddress + recordLength - && storeWrapper.appendOnlyFile.GetPage(currentAddress) == storeWrapper.appendOnlyFile.GetPage(storeWrapper.appendOnlyFile.TailAddress) + 1) - { - logger?.LogWarning("SkipPage from {previousAddress} to {currentAddress}, tail is {tailAddress}", previousAddress, currentAddress, storeWrapper.appendOnlyFile.TailAddress); - storeWrapper.appendOnlyFile.UnsafeSkipPage(); - logger?.LogWarning("New tail after SkipPage is {tailAddress}", storeWrapper.appendOnlyFile.TailAddress); - } - // Enqueue to AOF _ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit); diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 26ce34597a..7af38d1e47 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -969,7 +969,7 @@ long TryAllocate(int numSlots = 1) return (((long)localTailPageOffset.Page) << LogPageSizeBits) | ((long)(localTailPageOffset.Offset - numSlots)); } - /// Try allocate, spin for RETRY_NOW (logicalAddress < 0) case + /// Try allocate, spin for RETRY_NOW (logicalAddress is less than 0) case /// Number of slots to allocate /// The allocated logical address, or 0 in case of inability to allocate [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -985,67 +985,6 @@ public long TryAllocateRetryNow(int numSlots = 1) return logicalAddress; } - /// Skip the rest of the current page - public void SkipPage() - { - PageOffset localTailPageOffset = default; - localTailPageOffset.PageAndOffset = TailPageOffset.PageAndOffset; - - // Force page overflow - int numSlots = PageSize + 1; - - // Determine insertion index. - localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots); - - int page = localTailPageOffset.Page; - int offset = localTailPageOffset.Offset - numSlots; - - #region HANDLE PAGE OVERFLOW - if (localTailPageOffset.Offset > PageSize) - { - int pageIndex = localTailPageOffset.Page + 1; - - // All overflow threads try to shift addresses - long shiftAddress = ((long)pageIndex) << LogPageSizeBits; - PageAlignedShiftReadOnlyAddress(shiftAddress); - PageAlignedShiftHeadAddress(shiftAddress); - - // This thread is trying to allocate at an offset past where one or more previous threads - // already overflowed; exit and allow the first overflow thread to proceed - if (offset > PageSize) - { - if (NeedToWait(pageIndex)) - return; // RETRY_LATER - return; // RETRY_NOW - } - - if (NeedToWait(pageIndex)) - { - // Reset to end of page so that next attempt can retry - localTailPageOffset.Offset = PageSize; - Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); - return; // RETRY_LATER - } - - // The thread that "makes" the offset incorrect should allocate next page and set new tail - if (CannotAllocate(pageIndex)) - { - // Reset to end of page so that next attempt can retry - localTailPageOffset.Offset = PageSize; - Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset); - return; // RETRY_NOW - } - - if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize)) - AllocatePagesWithException(pageIndex, localTailPageOffset); - - localTailPageOffset.Page++; - localTailPageOffset.Offset = 0; - TailPageOffset = localTailPageOffset; - } - #endregion - } - /// /// If the page we are trying to allocate is past the last page with an unclosed address region, /// then we can retry immediately because this is called after NeedToWait, so we know we've diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index b96404eff3..7659c6af1c 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -433,17 +433,6 @@ public void UnsafeCommitMetadataOnly(TsavoriteLogRecoveryInfo info) public int UnsafeGetLogPageSizeBits() => allocator.LogPageSizeBits; - /// - /// Get page number for given address - /// - /// - /// - public long GetPage(long logicalAddress) - => allocator.GetPage(logicalAddress); - - public void UnsafeSkipPage() - => allocator.SkipPage(); - /// /// Get read only lag address /// From 454b8c02de122d787f2e3a0d410ab5ab750f0039 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 12:49:48 -0700 Subject: [PATCH 14/23] Fix --- .../Replication/PrimaryOps/AofTaskStore.cs | 2 +- libs/server/Resp/RespServerSession.cs | 3 +- .../Resp/RespServerSessionSlotVerify.cs | 5 +- .../cs/src/core/Allocator/AllocatorBase.cs | 72 ++++++++++++++----- .../src/core/Index/Tsavorite/LogAccessor.cs | 2 +- .../cs/src/core/TsavoriteLog/TsavoriteLog.cs | 4 +- 6 files changed, 61 insertions(+), 27 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs b/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs index 218241297f..9f1e8635c9 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs @@ -41,7 +41,7 @@ public AofTaskStore(ClusterProvider clusterProvider, int initialSize = 1, ILogge logPageSizeMask = logPageSize - 1; if (clusterProvider.serverOptions.MainMemoryReplication) clusterProvider.storeWrapper.appendOnlyFile.SafeTailShiftCallback = SafeTailShiftCallback; - TruncateLagAddress = clusterProvider.storeWrapper.appendOnlyFile.UnsafeGetReadOnlyLagAddress() - 2 * logPageSize; + TruncateLagAddress = clusterProvider.storeWrapper.appendOnlyFile.UnsafeGetReadOnlyAddressLagOffset() - 2 * logPageSize; } TruncatedUntil = 0; } diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index c0ffdce91c..36d3a4d507 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -3,7 +3,6 @@ using System; using System.Buffers; -using System.Buffers.Binary; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -433,7 +432,7 @@ private void ProcessMessages() } else { - if (CanServeSlot(cmd)) + if (clusterSession == null || CanServeSlot(cmd)) _ = ProcessBasicCommands(cmd, ref basicGarnetApi); } } diff --git a/libs/server/Resp/RespServerSessionSlotVerify.cs b/libs/server/Resp/RespServerSessionSlotVerify.cs index 9497329c98..44b7f60d43 100644 --- a/libs/server/Resp/RespServerSessionSlotVerify.cs +++ b/libs/server/Resp/RespServerSessionSlotVerify.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Diagnostics; using Garnet.common; namespace Garnet.server @@ -33,9 +34,7 @@ bool NetworkKeyArraySlotVerify(Span keys, bool readOnly, int count = - bool CanServeSlot(RespCommand cmd) { - // If cluster is disable all commands - if (clusterSession == null) - return true; + Debug.Assert(clusterSession != null); // Verify slot for command if it falls into data command category if (!cmd.IsDataCommand()) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 7af38d1e47..2508f4199c 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -66,8 +66,8 @@ public abstract partial class AllocatorBaseHow many pages do we leave empty in the in-memory buffer (between 0 and BufferSize-1) private int emptyPageCount; - /// HeadOffset lag address - internal long HeadOffsetLagAddress; + /// HeadAddress offset from tail (currently page-aligned) + internal long HeadAddressLagOffset; /// /// Number of or @@ -78,8 +78,8 @@ public abstract partial class AllocatorBaseLog mutable fraction protected readonly double LogMutableFraction; - /// ReadOnlyOffset lag (from tail) - protected long ReadOnlyLagAddress; + /// ReadOnlyAddress offset from tail (currently page-aligned) + protected long ReadOnlyAddressLagOffset; #endregion @@ -502,7 +502,7 @@ internal void WriteAsync(IntPtr alignedSourceAddress, ulong alignedDes } } - internal long GetReadOnlyLagAddress() => ReadOnlyLagAddress; + internal long GetReadOnlyAddressLagOffset() => ReadOnlyAddressLagOffset; protected readonly ILogger logger; @@ -733,9 +733,9 @@ public int EmptyPageCount emptyPageCount = value; headOffsetLagSize -= emptyPageCount; - // Lag addresses are the number of pages "behind" TailPageOffset (the tail in the circular buffer). - ReadOnlyLagAddress = (long)(LogMutableFraction * headOffsetLagSize) << LogPageSizeBits; - HeadOffsetLagAddress = (long)headOffsetLagSize << LogPageSizeBits; + // Address lag offsets correspond to the number of pages "behind" TailPageOffset (the tail in the circular buffer). + ReadOnlyAddressLagOffset = (long)(LogMutableFraction * headOffsetLagSize) << LogPageSizeBits; + HeadAddressLagOffset = (long)headOffsetLagSize << LogPageSizeBits; } // Force eviction now if empty page count has increased @@ -853,6 +853,38 @@ void AllocatePagesWithException(int pageIndex, PageOffset localTailPageOffset, i static void ThrowTsavoriteException(string message) => throw new TsavoriteException(message); + /// + /// Whether we need to shift addresses when turning the page. + /// + /// The page we are turning to + /// Local copy of PageOffset (includes the addition of numSlots) + /// Size of new allocation + /// + bool NeedToShiftAddress(long pageIndex, PageOffset localTailPageOffset, int numSlots) + { + var tailAddress = (((long)localTailPageOffset.Page) << LogPageSizeBits) | ((long)(localTailPageOffset.Offset - numSlots)); + var shiftAddress = pageIndex << LogPageSizeBits; + + // Check whether we need to shift ROA + var desiredReadOnlyAddress = shiftAddress - ReadOnlyAddressLagOffset; + if (desiredReadOnlyAddress > tailAddress) + desiredReadOnlyAddress = tailAddress; + if (desiredReadOnlyAddress > ReadOnlyAddress) + return true; + + // Check whether we need to shift HA + var desiredHeadAddress = shiftAddress - HeadAddressLagOffset; + var currentFlushedUntilAddress = FlushedUntilAddress; + if (desiredHeadAddress > currentFlushedUntilAddress) + desiredHeadAddress = currentFlushedUntilAddress; + if (desiredHeadAddress > tailAddress) + desiredHeadAddress = tailAddress; + if (desiredHeadAddress > HeadAddress) + return true; + + return false; + } + /// /// Shift log addresses when turning the page. /// @@ -863,12 +895,12 @@ void IssueShiftAddress(long pageIndex) var shiftAddress = pageIndex << LogPageSizeBits; var tailAddress = GetTailAddress(); - long desiredReadOnlyAddress = shiftAddress - ReadOnlyLagAddress; + long desiredReadOnlyAddress = shiftAddress - ReadOnlyAddressLagOffset; if (desiredReadOnlyAddress > tailAddress) desiredReadOnlyAddress = tailAddress; ShiftReadOnlyAddress(desiredReadOnlyAddress); - long desiredHeadAddress = shiftAddress - HeadOffsetLagAddress; + long desiredHeadAddress = shiftAddress - HeadAddressLagOffset; if (desiredHeadAddress > tailAddress) desiredHeadAddress = tailAddress; ShiftHeadAddress(desiredHeadAddress); @@ -905,8 +937,12 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) return 0; // RETRY_LATER } - // Allocate next page and set new tail - if (CannotAllocate(pageIndex)) + // We next verify that: + // 1. The next page (pageIndex) is ready to use (i.e., closed) + // 2. We have issued any necessary address shifting at the page-turn boundary. + // If either cannot be verified, we can ask the caller to retry now (immediately), because it is + // an ephemeral state. + if (CannotAllocate(pageIndex) || NeedToShiftAddress(pageIndex, localTailPageOffset, numSlots)) { // Reset to previous tail so that next attempt can retry localTailPageOffset.PageAndOffset -= numSlots; @@ -918,6 +954,7 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) return -1; // RETRY_NOW } + // Allocate next page and set new tail if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize)) AllocatePagesWithException(pageIndex, localTailPageOffset, numSlots); @@ -925,8 +962,7 @@ long HandlePageOverflow(ref PageOffset localTailPageOffset, int numSlots) localTailPageOffset.Offset = numSlots; TailPageOffset = localTailPageOffset; - // Shift only after TailPageOffset is reset to a valid state - IssueShiftAddress(pageIndex); + // At this point, the slot is allocated and we are not allowed to refresh epochs any longer. // Offset is zero, for the first allocation on the new page return ((long)localTailPageOffset.Page) << LogPageSizeBits; @@ -1214,7 +1250,7 @@ private void DebugPrintAddresses() private void PageAlignedShiftReadOnlyAddress(long currentTailAddress) { long pageAlignedTailAddress = currentTailAddress & ~PageSizeMask; - long desiredReadOnlyAddress = pageAlignedTailAddress - ReadOnlyLagAddress; + long desiredReadOnlyAddress = pageAlignedTailAddress - ReadOnlyAddressLagOffset; if (Utility.MonotonicUpdate(ref ReadOnlyAddress, desiredReadOnlyAddress, out _)) { // Debug.WriteLine("Allocate: Moving read-only offset from {0:X} to {1:X}", oldReadOnlyAddress, desiredReadOnlyAddress); @@ -1229,7 +1265,7 @@ private void PageAlignedShiftReadOnlyAddress(long currentTailAddress) /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private void PageAlignedShiftHeadAddress(long currentTailAddress) - => ShiftHeadAddress((currentTailAddress & ~PageSizeMask) - HeadOffsetLagAddress); + => ShiftHeadAddress((currentTailAddress & ~PageSizeMask) - HeadAddressLagOffset); /// /// Tries to shift head address to specified value @@ -1237,11 +1273,11 @@ private void PageAlignedShiftHeadAddress(long currentTailAddress) /// public long ShiftHeadAddress(long desiredHeadAddress) { - //obtain local values of variables that can change + // Obtain local values of variables that can change long currentFlushedUntilAddress = FlushedUntilAddress; long newHeadAddress = desiredHeadAddress; - if (currentFlushedUntilAddress < newHeadAddress) + if (newHeadAddress > currentFlushedUntilAddress) newHeadAddress = currentFlushedUntilAddress; if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out _)) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/LogAccessor.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/LogAccessor.cs index 5d7c6d3eab..087de8917e 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/LogAccessor.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/LogAccessor.cs @@ -92,7 +92,7 @@ public void SetEmptyPageCount(int pageCount, bool wait = false) allocatorBase.EmptyPageCount = pageCount; if (wait) { - long newHeadAddress = (allocatorBase.GetTailAddress() & ~allocatorBase.PageSizeMask) - allocatorBase.HeadOffsetLagAddress; + long newHeadAddress = (allocatorBase.GetTailAddress() & ~allocatorBase.PageSizeMask) - allocatorBase.HeadAddressLagOffset; ShiftHeadAddress(newHeadAddress, wait); } } diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index 7659c6af1c..34ad771a3a 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -436,8 +436,8 @@ public int UnsafeGetLogPageSizeBits() /// /// Get read only lag address /// - public long UnsafeGetReadOnlyLagAddress() - => allocator.GetReadOnlyLagAddress(); + public long UnsafeGetReadOnlyAddressLagOffset() + => allocator.GetReadOnlyAddressLagOffset(); /// /// Enqueue batch of entries to log (in memory) - no guarantee of flush/commit From d38a9a23dba4cc72e1681937952d737651bca616 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 13:19:17 -0700 Subject: [PATCH 15/23] update low memory to meet new constraint --- .../Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs | 2 +- test/Garnet.test/TestUtils.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs index 81e40677d2..1b83354ff4 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs @@ -42,7 +42,7 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre // an enqueue will result in an offset mismatch. So, we have to first reset the AOF to point to currentAddress. if (currentAddress >= previousAddress + recordLength) { - logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); + //logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress); ReplicationOffset = currentAddress; } diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index f6191a10bc..f7a3fdd591 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -504,7 +504,7 @@ public static GarnetServerOptions GetGarnetServerOptions( if (lowMemory) { - opts.MemorySize = opts.ObjectStoreLogMemorySize = MemorySize == default ? "512" : MemorySize; + opts.MemorySize = opts.ObjectStoreLogMemorySize = MemorySize == default ? "1024" : MemorySize; opts.PageSize = opts.ObjectStorePageSize = PageSize == default ? "512" : PageSize; } From 4867ddb54bed770a015fe916f89aebbc1f582082 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 13:19:53 -0700 Subject: [PATCH 16/23] re-enable warning --- .../Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs index 1b83354ff4..81e40677d2 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs @@ -42,7 +42,7 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre // an enqueue will result in an offset mismatch. So, we have to first reset the AOF to point to currentAddress. if (currentAddress >= previousAddress + recordLength) { - //logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); + logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress); ReplicationOffset = currentAddress; } From d00921503e2dc5b6a9866c71e39ba5ddd62327c3 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 13:21:47 -0700 Subject: [PATCH 17/23] handle comments --- .../src/core/Index/Tsavorite/Implementation/BlockAllocate.cs | 1 + .../storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs | 3 +++ 2 files changed, 4 insertions(+) diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs index f03b1b323a..9b0173c6ab 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/BlockAllocate.cs @@ -28,6 +28,7 @@ private static bool TryBlockAllocate( return true; } + // logicalAddress less than 0 (RETRY_NOW) should already have been handled Debug.Assert(logicalAddress == 0); // We expect flushEvent to be signaled. internalStatus = OperationStatus.ALLOCATE_FAILED; diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index 34ad771a3a..13b5638131 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -864,7 +864,10 @@ private long AllocateBlock(int recordSize) var logicalAddress = allocator.TryAllocateRetryNow(recordSize); if (logicalAddress > 0) return logicalAddress; + + // logicalAddress less than 0 (RETRY_NOW) should already have been handled Debug.Assert(logicalAddress == 0); + epoch.Suspend(); if (cannedException != null) throw cannedException; try From f4df546381090acba841386027012f36d9a7f8a6 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 13:55:29 -0700 Subject: [PATCH 18/23] fix bitmap tests to use at least 2 pages of memory. --- test/Garnet.test/GarnetBitmapTests.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/Garnet.test/GarnetBitmapTests.cs b/test/Garnet.test/GarnetBitmapTests.cs index 25d910f929..88ad67c31c 100644 --- a/test/Garnet.test/GarnetBitmapTests.cs +++ b/test/Garnet.test/GarnetBitmapTests.cs @@ -168,7 +168,7 @@ public void BitmapSetGetBitTest_LTM(bool preSet) server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: (bitmapBytes << 1).ToString(), + MemorySize: (bitmapBytes << 2).ToString(), PageSize: (bitmapBytes << 1).ToString()); server.Start(); using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); @@ -450,7 +450,7 @@ public void BitmapBitCountTest_LTM() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: (bitmapBytes << 1).ToString(), + MemorySize: (bitmapBytes << 2).ToString(), PageSize: (bitmapBytes << 1).ToString()); server.Start(); using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); @@ -647,7 +647,7 @@ public void BitmapBitPosTest_LTM() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: (bitmapBytes << 1).ToString(), + MemorySize: (bitmapBytes << 2).ToString(), PageSize: (bitmapBytes << 1).ToString()); server.Start(); using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); @@ -1271,7 +1271,7 @@ public void BitmapBitfieldGetTest_LTM() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: (bitmapBytes << 1).ToString(), + MemorySize: (bitmapBytes << 2).ToString(), PageSize: (bitmapBytes << 1).ToString()); //MemorySize: "16g", //PageSize: "32m"); @@ -1472,7 +1472,7 @@ public void BitmapBitfieldSetTest_LTM() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: (bitmapBytes << 1).ToString(), + MemorySize: (bitmapBytes << 2).ToString(), PageSize: (bitmapBytes << 1).ToString()); //MemorySize: "16g", //PageSize: "32m"); @@ -1938,7 +1938,7 @@ public void BitmapBitfieldIncrTest_LTM() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: (bitmapBytes << 1).ToString(), + MemorySize: (bitmapBytes << 2).ToString(), PageSize: (bitmapBytes << 1).ToString()); //MemorySize: "16g", //PageSize: "32m"); From 5f676f99e4261f846b7bb92285cc44a2fb93d5b3 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 13:58:21 -0700 Subject: [PATCH 19/23] fix hll tests --- test/Garnet.test/HyperLogLogTests.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/Garnet.test/HyperLogLogTests.cs b/test/Garnet.test/HyperLogLogTests.cs index 0aa761d2b3..c32f1cf5cb 100644 --- a/test/Garnet.test/HyperLogLogTests.cs +++ b/test/Garnet.test/HyperLogLogTests.cs @@ -571,13 +571,13 @@ public void HyperLogLogPFADD_LTM(int seqSize) if (seqSize < 128) server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: "512", + MemorySize: "1024", PageSize: "512"); else server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: "16384", - PageSize: "16384"); + MemorySize: "32k", + PageSize: "16k"); server.Start(); using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); @@ -689,7 +689,7 @@ public void HyperLogLogTestPFMERGE_LTM_SparseToSparse() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: "512", + MemorySize: "1024", PageSize: "512"); server.Start(); @@ -798,8 +798,8 @@ public void HyperLogLogTestPFMERGE_LTM_SparseToDense(bool reverse) { server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, - MemorySize: "16384", - PageSize: "16384"); + MemorySize: "32k", + PageSize: "16k"); server.Start(); using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); @@ -908,8 +908,8 @@ public void HyperLogLogTestPFMERGE_LTM_DenseToDense() server.Dispose(); server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, - MemorySize: "16384", - PageSize: "16384"); + MemorySize: "32k", + PageSize: "16k"); server.Start(); using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); From f487d5db0818c60d761dc4b59ae37f61e6408cda Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 14:10:11 -0700 Subject: [PATCH 20/23] more testcase fixes --- test/Garnet.test/RespAdminCommandsTests.cs | 6 +++--- test/Garnet.test/TestUtils.cs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/Garnet.test/RespAdminCommandsTests.cs b/test/Garnet.test/RespAdminCommandsTests.cs index d3deb1cefc..2e7284a09a 100644 --- a/test/Garnet.test/RespAdminCommandsTests.cs +++ b/test/Garnet.test/RespAdminCommandsTests.cs @@ -306,7 +306,7 @@ static void ValidateServerData(IDatabase db, string strKey, string strValue, str [Test] [TestCase(63, 15, 1)] - [TestCase(63, 1, 1)] + [TestCase(63, 2, 1)] [TestCase(16, 16, 1)] [TestCase(5, 64, 1)] public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemorySize, int pageSize) @@ -363,7 +363,7 @@ public void SeSaveRecoverMultipleKeysTest(string memorySize, string recoveryMemo bool disableObj = true; server.Dispose(); - server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, lowMemory: true, MemorySize: memorySize, PageSize: "1k", enableAOF: true); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, lowMemory: true, MemorySize: memorySize, PageSize: "512", enableAOF: true); server.Start(); using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) @@ -402,7 +402,7 @@ public void SeSaveRecoverMultipleKeysTest(string memorySize, string recoveryMemo } server.Dispose(false); - server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, tryRecover: true, lowMemory: true, MemorySize: recoveryMemorySize, PageSize: "1k", enableAOF: true); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, tryRecover: true, lowMemory: true, MemorySize: recoveryMemorySize, PageSize: "512", enableAOF: true); server.Start(); using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true))) diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index f7a3fdd591..247d23f49a 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -264,7 +264,7 @@ public static GarnetServer CreateGarnetServer( if (lowMemory) { - opts.MemorySize = opts.ObjectStoreLogMemorySize = MemorySize == default ? "512" : MemorySize; + opts.MemorySize = opts.ObjectStoreLogMemorySize = MemorySize == default ? "1024" : MemorySize; opts.PageSize = opts.ObjectStorePageSize = PageSize == default ? "512" : PageSize; } From fcf78f0037c5a85d4920c3586a1db22d9e3bf3b2 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 18:34:04 -0700 Subject: [PATCH 21/23] fix replication logic to handle micro-skips within the same page --- .../ReplicaOps/ReplicationReplicaAofSync.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs index 81e40677d2..446eb15491 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicationReplicaAofSync.cs @@ -40,11 +40,17 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre { // If the incoming AOF chunk fits in the space between previousAddress and currentAddress (ReplicationOffset), // an enqueue will result in an offset mismatch. So, we have to first reset the AOF to point to currentAddress. - if (currentAddress >= previousAddress + recordLength) + if (currentAddress > previousAddress) { - logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); - storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress); - ReplicationOffset = currentAddress; + if ( + (currentAddress % (1 << storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits()) != 0) || // the skip was to a non-page-boundary + (currentAddress >= previousAddress + recordLength) // the skip will not be auto-handled by the AOF enqueue + ) + { + logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress); + storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress); + ReplicationOffset = currentAddress; + } } } From d00cb3bc824cfcaf0b7fa80307337160ae4c6a77 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Fri, 13 Sep 2024 18:41:30 -0700 Subject: [PATCH 22/23] PageAlignedShiftHeadAddress should always keep the head address, well, aligned. --- .../cs/src/core/Allocator/AllocatorBase.cs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index 2508f4199c..5fc6cb5cf0 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -1265,7 +1265,20 @@ private void PageAlignedShiftReadOnlyAddress(long currentTailAddress) /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private void PageAlignedShiftHeadAddress(long currentTailAddress) - => ShiftHeadAddress((currentTailAddress & ~PageSizeMask) - HeadAddressLagOffset); + { + var desiredHeadAddress = (currentTailAddress & ~PageSizeMask) - HeadAddressLagOffset; + + // Obtain local values of variables that can change + var currentFlushedUntilAddress = FlushedUntilAddress; + if (desiredHeadAddress > currentFlushedUntilAddress) + desiredHeadAddress = currentFlushedUntilAddress & ~PageSizeMask; + + if (Utility.MonotonicUpdate(ref HeadAddress, desiredHeadAddress, out _)) + { + // Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); + epoch.BumpCurrentEpoch(() => OnPagesClosed(desiredHeadAddress)); + } + } /// /// Tries to shift head address to specified value @@ -1280,6 +1293,10 @@ public long ShiftHeadAddress(long desiredHeadAddress) if (newHeadAddress > currentFlushedUntilAddress) newHeadAddress = currentFlushedUntilAddress; + if (newHeadAddress % (1 << LogPageSizeBits) != 0) + { + + } if (Utility.MonotonicUpdate(ref HeadAddress, newHeadAddress, out _)) { // Debug.WriteLine("Allocate: Moving head offset from {0:X} to {1:X}", oldHeadAddress, newHeadAddress); From 0b73dbb563fc2f5c494e68ab3f285116665de544 Mon Sep 17 00:00:00 2001 From: Badrish Chandramouli Date: Thu, 19 Sep 2024 16:58:51 -0700 Subject: [PATCH 23/23] update version to 1.0.25 --- .azure/pipelines/azure-pipelines-external-release.yml | 2 +- libs/host/GarnetServer.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.azure/pipelines/azure-pipelines-external-release.yml b/.azure/pipelines/azure-pipelines-external-release.yml index 6b9ee3a732..fa1b7ad8a7 100644 --- a/.azure/pipelines/azure-pipelines-external-release.yml +++ b/.azure/pipelines/azure-pipelines-external-release.yml @@ -3,7 +3,7 @@ # 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0) # 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same ###################################### -name: 1.0.24 +name: 1.0.25 trigger: branches: include: diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index d35ee40b6b..4c699a63d4 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -49,7 +49,7 @@ public class GarnetServer : IDisposable protected StoreWrapper storeWrapper; // IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6. - readonly string version = "1.0.24"; + readonly string version = "1.0.25"; /// /// Resp protocol version