Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsavorite allocator - tighten the packing of pages #657

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4cb40c9
Change allocator to enqueue with the invariant that the first record …
badrishc Sep 7, 2024
198e879
fixes based on comments
badrishc Sep 8, 2024
772cb56
add another comment
badrishc Sep 8, 2024
8b5708e
add comments
badrishc Sep 9, 2024
f53f516
fixes - we now always wrap TryAllocate with TryAllocateRetryNow.
badrishc Sep 9, 2024
b0f3886
Add Non-readcache "Insert At Tail" stress test
TedHartMS Sep 9, 2024
fb0a807
support 0% mutable fraction.
badrishc Sep 11, 2024
1c95a5a
Fix InernalUpsert srcRecordInfo setting when found below ReadOnlyAddress
TedHartMS Sep 12, 2024
3958da1
Adjust mutable-page counts in stress test
TedHartMS Sep 12, 2024
56db587
fix typo
TedHartMS Sep 12, 2024
845504b
Enforce at least two pages of memory.
badrishc Sep 12, 2024
2b41b13
nit
badrishc Sep 12, 2024
85ae71e
Merge remote-tracking branch 'origin/main' into badrishc/allocator-en…
badrishc Sep 12, 2024
4e1fdd9
update Garnet to use new allocator logic
badrishc Sep 12, 2024
454b8c0
Fix
badrishc Sep 13, 2024
d38a9a2
update low memory to meet new constraint
badrishc Sep 13, 2024
4867ddb
re-enable warning
badrishc Sep 13, 2024
d009215
handle comments
badrishc Sep 13, 2024
f4df546
fix bitmap tests to use at least 2 pages of memory.
badrishc Sep 13, 2024
5f676f9
fix hll tests
badrishc Sep 13, 2024
f487d5d
more testcase fixes
badrishc Sep 13, 2024
fcf78f0
fix replication logic to handle micro-skips within the same page
badrishc Sep 14, 2024
d00cb3b
PageAlignedShiftHeadAddress should always keep the head address, well…
badrishc Sep 14, 2024
dca9aab
Merge branch 'main' into badrishc/allocator-enqueue-tight
badrishc Sep 14, 2024
4559bbe
Merge branch 'main' into badrishc/allocator-enqueue-tight
badrishc Sep 16, 2024
a8fb59b
Merge branch 'main' into badrishc/allocator-enqueue-tight
badrishc Sep 17, 2024
324bc0c
Merge branch 'main' into badrishc/allocator-enqueue-tight
badrishc Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre

if (clusterProvider.serverOptions.MainMemoryReplication)
{
var firstRecordLength = GetFirstAofEntryLength(record);
if (previousAddress > ReplicationOffset ||
currentAddress >= previousAddress + firstRecordLength)
// If the incoming AOF chunk fits in the space between previousAddress and currentAddress (ReplicationOffset),
// an enqueue will result in an offset mismatch. So, we have to first reset the AOF to point to currentAddress.
if (currentAddress >= previousAddress + recordLength)
{
logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress);
storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress);
Expand All @@ -56,15 +56,6 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
}

// If there is a gap between the local tail and incoming currentAddress, try to skip local AOF to the next page
if (currentAddress >= storeWrapper.appendOnlyFile.TailAddress + recordLength
&& storeWrapper.appendOnlyFile.GetPage(currentAddress) == storeWrapper.appendOnlyFile.GetPage(storeWrapper.appendOnlyFile.TailAddress) + 1)
{
logger?.LogWarning("SkipPage from {previousAddress} to {currentAddress}, tail is {tailAddress}", previousAddress, currentAddress, storeWrapper.appendOnlyFile.TailAddress);
storeWrapper.appendOnlyFile.UnsafeSkipPage();
logger?.LogWarning("New tail after SkipPage is {tailAddress}", storeWrapper.appendOnlyFile.TailAddress);
}

// Enqueue to AOF
_ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span<byte>(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit);

Expand Down
249 changes: 127 additions & 122 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,28 @@ private static bool TryBlockAllocate<TInput, TOutput, TContext>(
out OperationStatus internalStatus)
{
pendingContext.flushEvent = allocator.FlushEvent;
logicalAddress = allocator.TryAllocate(recordSize);
logicalAddress = allocator.TryAllocateRetryNow(recordSize);
if (logicalAddress > 0)
{
pendingContext.flushEvent = default;
internalStatus = OperationStatus.SUCCESS;
return true;
}

if (logicalAddress == 0)
{
// We expect flushEvent to be signaled.
internalStatus = OperationStatus.ALLOCATE_FAILED;
return false;
}

// logicalAddress is < 0 so we do not expect flushEvent to be signaled; return RETRY_LATER to refresh the epoch.
pendingContext.flushEvent = default;
allocator.TryComplete();
internalStatus = OperationStatus.RETRY_LATER;
Debug.Assert(logicalAddress == 0);
badrishc marked this conversation as resolved.
Show resolved Hide resolved
// We expect flushEvent to be signaled.
internalStatus = OperationStatus.ALLOCATE_FAILED;
return false;
}

/// <summary>Options for TryAllocateRecord.</summary>
internal struct AllocateOptions
{
/// <summary>If true, use the non-revivification recycling of records that failed to CAS and are carried in PendingContext through RETRY.</summary>
internal bool Recycle;
internal bool IgnoreHeiAddress;

/// <summary>If true, the source record is elidable so we can try to elide from the tag chain (and transfer it to the FreeList if we're doing Revivification).</summary>
internal bool ElideSourceRecord;
};

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -64,7 +60,7 @@ bool TryAllocateRecord<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSes
return true;
if (RevivificationManager.UseFreeRecordPool)
{
if (!options.IgnoreHeiAddress && stackCtx.hei.Address >= minMutableAddress)
if (!options.ElideSourceRecord && stackCtx.hei.Address >= minMutableAddress)
minRevivAddress = stackCtx.hei.Address;
if (sessionFunctions.Ctx.IsInV1)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,17 +406,15 @@ private OperationStatus CreateNewRecordRMW<TInput, TOutput, TContext, TSessionFu
AllocateOptions allocOptions = new()
{
Recycle = true,

// If the source record is elidable we can try to elide from the chain and transfer it to the FreeList if we're doing Revivification
IgnoreHeiAddress = stackCtx.recSrc.HasMainLogSrc && CanElide<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref stackCtx, ref srcRecordInfo)
ElideSourceRecord = stackCtx.recSrc.HasMainLogSrc && CanElide<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref stackCtx, ref srcRecordInfo)
};

if (!TryAllocateRecord(sessionFunctions, ref pendingContext, ref stackCtx, actualSize, ref allocatedSize, keySize, allocOptions,
out long newLogicalAddress, out long newPhysicalAddress, out OperationStatus status))
return status;

ref RecordInfo newRecordInfo = ref WriteNewRecordInfo(ref key, hlogBase, newPhysicalAddress, inNewVersion: sessionFunctions.Ctx.InNewVersion, stackCtx.recSrc.LatestLogicalAddress);
if (allocOptions.IgnoreHeiAddress)
if (allocOptions.ElideSourceRecord)
newRecordInfo.PreviousAddress = srcRecordInfo.PreviousAddress;
stackCtx.SetNewRecord(newLogicalAddress);

Expand Down Expand Up @@ -453,7 +451,7 @@ private OperationStatus CreateNewRecordRMW<TInput, TOutput, TContext, TSessionFu
// Do not elide (restore newRecordInfo.PreviousAddress to its original WriteNewRecordInfo state) if requested to preserve the source record.
if (rmwInfo.PreserveCopyUpdaterSourceRecord)
{
allocOptions.IgnoreHeiAddress = false;
allocOptions.ElideSourceRecord = false;
newRecordInfo.PreviousAddress = stackCtx.recSrc.LatestLogicalAddress;
}
goto DoCAS;
Expand Down Expand Up @@ -528,9 +526,9 @@ private OperationStatus CreateNewRecordRMW<TInput, TOutput, TContext, TSessionFu
}
}

// IgnoreHeiAddress means we have verified that the old source record is elidable and now that CAS has replaced it in the HashBucketEntry with
// ElideSourceRecord means we have verified that the old source record is elidable and now that CAS has replaced it in the HashBucketEntry with
// the new source record that does not point to the old source record, we have elided it, so try to transfer to freelist.
if (allocOptions.IgnoreHeiAddress)
if (allocOptions.ElideSourceRecord)
{
// Success should always Seal the old record. This may be readcache, readonly, or the temporary recordInfo, which is OK and saves the cost of an "if".
srcRecordInfo.SealAndInvalidate(); // The record was elided, so Invalidate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ internal OperationStatus InternalUpsert<TInput, TOutput, TContext, TSessionFunct
// ConcurrentWriter failed (e.g. insufficient space, another thread set Tombstone, etc). Write a new record, but track that we have to seal and unlock this one.
goto CreateNewRecord;
}
if (stackCtx.recSrc.LogicalAddress >= hlogBase.HeadAddress)
{
// Safe Read-Only Region: Create a record in the mutable region, but set srcRecordInfo in case we are eliding.
if (stackCtx.recSrc.HasMainLogSrc)
srcRecordInfo = ref stackCtx.recSrc.GetInfo();
goto CreateNewRecord;
}

// No record exists, or readonly or below. Drop through to create new record.
Debug.Assert(!sessionFunctions.IsManualLocking || LockTable.IsLockedExclusive(ref stackCtx.hei), "A Lockable-session Upsert() of an on-disk or non-existent key requires a LockTable lock");
Expand Down Expand Up @@ -303,17 +310,15 @@ private OperationStatus CreateNewRecordUpsert<TInput, TOutput, TContext, TSessio
AllocateOptions allocOptions = new()
{
Recycle = true,

// If the source record is elidable we can try to elide from the chain and transfer it to the FreeList if we're doing Revivification
IgnoreHeiAddress = stackCtx.recSrc.HasMainLogSrc && CanElide<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref stackCtx, ref srcRecordInfo)
ElideSourceRecord = stackCtx.recSrc.HasMainLogSrc && CanElide<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref stackCtx, ref srcRecordInfo)
};

if (!TryAllocateRecord(sessionFunctions, ref pendingContext, ref stackCtx, actualSize, ref allocatedSize, keySize, allocOptions,
out long newLogicalAddress, out long newPhysicalAddress, out OperationStatus status))
return status;

ref RecordInfo newRecordInfo = ref WriteNewRecordInfo(ref key, hlogBase, newPhysicalAddress, inNewVersion: sessionFunctions.Ctx.InNewVersion, stackCtx.recSrc.LatestLogicalAddress);
if (allocOptions.IgnoreHeiAddress)
if (allocOptions.ElideSourceRecord)
newRecordInfo.PreviousAddress = srcRecordInfo.PreviousAddress;
stackCtx.SetNewRecord(newLogicalAddress);

Expand Down Expand Up @@ -353,9 +358,9 @@ private OperationStatus CreateNewRecordUpsert<TInput, TOutput, TContext, TSessio

sessionFunctions.PostSingleWriter(ref key, ref input, ref value, ref newRecordValue, ref output, ref upsertInfo, WriteReason.Upsert, ref newRecordInfo);

// IgnoreHeiAddress means we have verified that the old source record is elidable and now that CAS has replaced it in the HashBucketEntry with
// ElideSourceRecord means we have verified that the old source record is elidable and now that CAS has replaced it in the HashBucketEntry with
// the new source record that does not point to the old source record, we have elided it, so try to transfer to freelist.
if (allocOptions.IgnoreHeiAddress)
if (allocOptions.ElideSourceRecord)
{
// Success should always Seal the old record. This may be readcache, readonly, or the temporary recordInfo, which is OK and saves the cost of an "if".
srcRecordInfo.SealAndInvalidate(); // The record was elided, so Invalidate
Expand Down
39 changes: 10 additions & 29 deletions libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -433,17 +433,6 @@ public void UnsafeCommitMetadataOnly(TsavoriteLogRecoveryInfo info)
public int UnsafeGetLogPageSizeBits()
=> allocator.LogPageSizeBits;

/// <summary>
/// Get page number for given address
/// </summary>
/// <param name="logicalAddress"></param>
/// <returns></returns>
public long GetPage(long logicalAddress)
=> allocator.GetPage(logicalAddress);

public void UnsafeSkipPage()
=> allocator.SkipPage();

/// <summary>
/// Get read only lag address
/// </summary>
Expand Down Expand Up @@ -872,28 +861,20 @@ private long AllocateBlock(int recordSize)
while (true)
{
var flushEvent = allocator.FlushEvent;
var logicalAddress = allocator.TryAllocate(recordSize);
var logicalAddress = allocator.TryAllocateRetryNow(recordSize);
if (logicalAddress > 0)
return logicalAddress;

if (logicalAddress == 0)
Debug.Assert(logicalAddress == 0);
badrishc marked this conversation as resolved.
Show resolved Hide resolved
epoch.Suspend();
if (cannedException != null) throw cannedException;
try
{
epoch.Suspend();
if (cannedException != null) throw cannedException;
try
{
flushEvent.Wait();
}
finally
{
epoch.Resume();
}
flushEvent.Wait();
}
finally
{
epoch.Resume();
}

// logicalAddress is < 0 so we do not expect flushEvent to be signaled; refresh the epoch and retry now
allocator.TryComplete();
epoch.ProtectAndDrain();
Thread.Yield();
}
}

Expand Down
Loading
Loading