Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align Replica Checkpoint Correctly on Attach #571

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
20 changes: 10 additions & 10 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,20 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover

if (storeType is StoreType.Main or StoreType.All)
{
entry.storeVersion = storeWrapper.store.CurrentVersion;
entry.storeHlogToken = storeCheckpointToken;
entry.storeIndexToken = storeCheckpointToken;
entry.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.storePrimaryReplId = replicationManager.PrimaryReplId;
entry.metadata.storeVersion = storeWrapper.store.CurrentVersion;
entry.metadata.storeHlogToken = storeCheckpointToken;
entry.metadata.storeIndexToken = storeCheckpointToken;
entry.metadata.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.storePrimaryReplId = replicationManager.PrimaryReplId;
}

if (storeType is StoreType.Object or StoreType.All)
{
entry.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.objectStorePrimaryReplId = replicationManager.PrimaryReplId;
entry.metadata.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.metadata.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.objectStorePrimaryReplId = replicationManager.PrimaryReplId;
}

// Keep track of checkpoints for replica
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public static void IOCallback(this ILogger logger, uint errorCode, uint numBytes
{
if (errorCode != 0)
{
string errorMessage = new Win32Exception((int)errorCode).Message;
logger.LogError("OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
var errorMessage = new Win32Exception((int)errorCode).Message;
logger.LogError("[ClusterUtils] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
}
((SemaphoreSlim)context).Release();
}
Expand Down
102 changes: 42 additions & 60 deletions libs/cluster/Server/Replication/CheckpointEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,24 @@
using System.IO;
using System.Text;
using Garnet.common;
using Garnet.server;

namespace Garnet.cluster
{
sealed class CheckpointEntry
{
public long storeVersion;
public Guid storeHlogToken;
public Guid storeIndexToken;
public long storeCheckpointCoveredAofAddress;
public string storePrimaryReplId;

public long objectStoreVersion;
public Guid objectStoreHlogToken;
public Guid objectStoreIndexToken;
public long objectCheckpointCoveredAofAddress;
public string objectStorePrimaryReplId;

public CheckpointMetadata metadata;
public SingleWriterMultiReaderLock _lock;
public CheckpointEntry next;

public CheckpointEntry()
{
storeVersion = -1;
storeHlogToken = default;
storeIndexToken = default;
storeCheckpointCoveredAofAddress = long.MaxValue;
storePrimaryReplId = null;

objectStoreVersion = -1;
objectStoreHlogToken = default;
objectStoreIndexToken = default;
objectCheckpointCoveredAofAddress = long.MaxValue;
objectStorePrimaryReplId = null;

metadata = new();
next = null;
}

public long GetMinAofCoveredAddress()
=> Math.Max(Math.Min(storeCheckpointCoveredAofAddress, objectCheckpointCoveredAofAddress), 64);
=> Math.Max(Math.Min(metadata.storeCheckpointCoveredAofAddress, metadata.objectCheckpointCoveredAofAddress), 64);

/// <summary>
/// Indicate addition of new reader by trying to increment reader counter
Expand Down Expand Up @@ -76,26 +55,26 @@ public bool ContainsSharedToken(CheckpointEntry entry, CheckpointFileType fileTy
{
return fileType switch
{
CheckpointFileType.STORE_HLOG => storeHlogToken.Equals(entry.storeHlogToken),
CheckpointFileType.STORE_INDEX => storeIndexToken.Equals(entry.storeIndexToken),
CheckpointFileType.OBJ_STORE_HLOG => objectStoreHlogToken.Equals(entry.objectStoreHlogToken),
CheckpointFileType.OBJ_STORE_INDEX => objectStoreIndexToken.Equals(entry.objectStoreIndexToken),
CheckpointFileType.STORE_HLOG => metadata.storeHlogToken.Equals(entry.metadata.storeHlogToken),
CheckpointFileType.STORE_INDEX => metadata.storeIndexToken.Equals(entry.metadata.storeIndexToken),
CheckpointFileType.OBJ_STORE_HLOG => metadata.objectStoreHlogToken.Equals(entry.metadata.objectStoreHlogToken),
CheckpointFileType.OBJ_STORE_INDEX => metadata.objectStoreIndexToken.Equals(entry.metadata.objectStoreIndexToken),
_ => throw new Exception($"Option {fileType} not supported")
};
}

public string GetCheckpointEntryDump()
{
string dump = $"\n" +
$"storeVersion: {storeVersion}\n" +
$"storeHlogToken: {storeHlogToken}\n" +
$"storeIndexToken: {storeIndexToken}\n" +
$"storeCheckpointCoveredAofAddress: {storeCheckpointCoveredAofAddress}\n" +
$"storeVersion: {metadata.storeVersion}\n" +
$"storeHlogToken: {metadata.storeHlogToken}\n" +
$"storeIndexToken: {metadata.storeIndexToken}\n" +
$"storeCheckpointCoveredAofAddress: {metadata.storeCheckpointCoveredAofAddress}\n" +
$"------------------------------------------------------------------------\n" +
$"objectStoreVersion:{objectStoreVersion}\n" +
$"objectStoreHlogToken:{objectStoreHlogToken}\n" +
$"objectStoreIndexToken:{objectStoreIndexToken}\n" +
$"objectCheckpointCoveredAofAddress:{objectCheckpointCoveredAofAddress}\n" +
$"objectStoreVersion:{metadata.objectStoreVersion}\n" +
$"objectStoreHlogToken:{metadata.objectStoreHlogToken}\n" +
$"objectStoreIndexToken:{metadata.objectStoreIndexToken}\n" +
$"objectCheckpointCoveredAofAddress:{metadata.objectCheckpointCoveredAofAddress}\n" +
$"------------------------------------------------------------------------\n" +
$"activeReaders:{_lock}";
return dump;
Expand All @@ -108,28 +87,28 @@ public byte[] ToByteArray()
byte[] byteBuffer = default;

//Write checkpoint entry data for main store
writer.Write(storeVersion);
byteBuffer = storeHlogToken.ToByteArray();
writer.Write(metadata.storeVersion);
byteBuffer = metadata.storeHlogToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
byteBuffer = storeIndexToken.ToByteArray();
byteBuffer = metadata.storeIndexToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
writer.Write(storeCheckpointCoveredAofAddress);
writer.Write(storePrimaryReplId == null ? 0 : 1);
if (storePrimaryReplId != null) writer.Write(storePrimaryReplId);
writer.Write(metadata.storeCheckpointCoveredAofAddress);
writer.Write(metadata.storePrimaryReplId == null ? 0 : 1);
if (metadata.storePrimaryReplId != null) writer.Write(metadata.storePrimaryReplId);

//Write checkpoint entry data for object store
writer.Write(objectStoreVersion);
byteBuffer = objectStoreHlogToken.ToByteArray();
writer.Write(metadata.objectStoreVersion);
byteBuffer = metadata.objectStoreHlogToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
byteBuffer = objectStoreIndexToken.ToByteArray();
byteBuffer = metadata.objectStoreIndexToken.ToByteArray();
writer.Write(byteBuffer.Length);
writer.Write(byteBuffer);
writer.Write(objectCheckpointCoveredAofAddress);
writer.Write(objectStorePrimaryReplId == null ? 0 : 1);
if (objectStorePrimaryReplId != null) writer.Write(objectStorePrimaryReplId);
writer.Write(metadata.objectCheckpointCoveredAofAddress);
writer.Write(metadata.objectStorePrimaryReplId == null ? 0 : 1);
if (metadata.objectStorePrimaryReplId != null) writer.Write(metadata.objectStorePrimaryReplId);

byte[] byteArray = ms.ToArray();
writer.Dispose();
Expand All @@ -143,17 +122,20 @@ public static CheckpointEntry FromByteArray(byte[] serialized)
var reader = new BinaryReader(ms);
var cEntry = new CheckpointEntry
{
storeVersion = reader.ReadInt64(),
storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeCheckpointCoveredAofAddress = reader.ReadInt64(),
storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default,

objectStoreVersion = reader.ReadInt64(),
objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectCheckpointCoveredAofAddress = reader.ReadInt64(),
objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default
metadata = new()
{
storeVersion = reader.ReadInt64(),
storeHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
storeCheckpointCoveredAofAddress = reader.ReadInt64(),
storePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default,

objectStoreVersion = reader.ReadInt64(),
objectStoreHlogToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectStoreIndexToken = new Guid(reader.ReadBytes(reader.ReadInt32())),
objectCheckpointCoveredAofAddress = reader.ReadInt64(),
objectStorePrimaryReplId = reader.ReadInt32() > 0 ? reader.ReadString() : default
}
};

reader.Dispose();
Expand Down
Loading