Skip to content

Commit

Permalink
Migration Misc Fixes (#552)
Browse files Browse the repository at this point in the history
* separate error message when adding keySlice to migration tracker

* safely add and remove sessions in migration task store

* fix slotsrange option

* safely deal with multiple dispose calls to MigrateSession

* fix bug when iterating stores

* fix stores window scan tracking when copyOption is not enabled

* remove logger messages

* release version 1.0.18

* add slotsrange data test

* add back some log messages

* nit; change info to trace message
  • Loading branch information
vazois committed Aug 3, 2024
1 parent d156731 commit 2725774
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 97 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 45) -- NOTE - these two values need to be the same
######################################
name: 1.0.17
name: 1.0.18
trigger:
branches:
include:
Expand Down
1 change: 1 addition & 0 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_MIGRATE_TO_MYSELF => "ERR Can't MIGRATE to myself"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_INCOMPLETESLOTSRANGE => "ERR incomplete slotrange"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOTNOTMIGRATING => "ERR slot state not set to MIGRATING state"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_FAILEDTOADDKEY => "ERR Failed to add key for migration tracking"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_PARSING => "ERR Parsing error"u8;

/// <summary>
Expand Down
4 changes: 0 additions & 4 deletions libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ public void Start()
public void FlushConfig()
{
lock (this)
{
logger?.LogTrace("Start FlushConfig {path}", clusterConfigDevice.FileName);
ClusterUtils.WriteInto(clusterConfigDevice, pool, 0, currentConfig.ToByteArray(), logger: logger);
logger?.LogTrace("End FlushConfig {path}", clusterConfigDevice.FileName);
}
}

/// <summary>
Expand Down
23 changes: 10 additions & 13 deletions libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public bool TryAddSlots(HashSet<int> slots, out int slotAssigned)
break;
}
FlushConfig();
logger?.LogTrace("ADD SLOTS {slots}", GetRange(slots.ToArray()));
logger?.LogTrace("AddSlots {slots}", GetRange(slots.ToArray()));
return true;
}

Expand Down Expand Up @@ -78,7 +78,7 @@ public bool TryRemoveSlots(HashSet<int> slots, out int notLocalSlot)
break;
}
FlushConfig();
logger?.LogTrace("REMOVE SLOTS {slots}", string.Join(",", slots));
logger?.LogTrace("RemoveSlots {slots}", GetRange(slots.ToArray()));
return true;
}

Expand Down Expand Up @@ -143,8 +143,7 @@ public bool TryPrepareSlotForMigration(int slot, string nodeid, out ReadOnlySpan
break;
}
FlushConfig();

logger?.LogInformation("MIGRATE {slot} TO {currentConfig.GetWorkerAddressFromNodeId(nodeid)}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
logger?.LogTrace("SetSlot MIGRATING {slot} TO {nodeId}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}

Expand Down Expand Up @@ -211,8 +210,7 @@ public bool TryPrepareSlotsForMigration(HashSet<int> slots, string nodeid, out R
break;
}
FlushConfig();

logger?.LogInformation("MIGRATE {slot} TO {migrating node}", string.Join(' ', slots), currentConfig.GetWorkerAddressFromNodeId(nodeid));
logger?.LogTrace("SetSlotsRange MIGRATING {slot} TO {nodeId}", GetRange(slots.ToArray()), currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}

Expand Down Expand Up @@ -266,8 +264,7 @@ public bool TryPrepareSlotForImport(int slot, string nodeid, out ReadOnlySpan<by
break;
}
FlushConfig();

logger?.LogInformation("IMPORT {slot} FROM {currentConfig.GetWorkerAddressFromNodeId(nodeid)}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
logger?.LogTrace("SetSlot IMPORTING {slot} TO {nodeId}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}

Expand Down Expand Up @@ -329,8 +326,8 @@ public bool TryPrepareSlotsForImport(HashSet<int> slots, string nodeid, out Read
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}

logger?.LogInformation("IMPORT {slot} FROM {importingNode}", string.Join(' ', slots), currentConfig.GetWorkerAddressFromNodeId(nodeid));
FlushConfig();
logger?.LogTrace("SetSlotsRange IMPORTING {slot} TO {nodeId}", GetRange(slots.ToArray()), currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}

Expand Down Expand Up @@ -364,7 +361,7 @@ public bool TryPrepareSlotForOwnershipChange(int slot, string nodeid, out ReadOn
break;
}
FlushConfig();
logger?.LogInformation("SLOT {slot} IMPORTED TO {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
logger?.LogTrace("SLOT {slot} MIGRATED TO {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}
else if (current.GetState((ushort)slot) is SlotState.IMPORTING)
Expand All @@ -385,7 +382,7 @@ public bool TryPrepareSlotForOwnershipChange(int slot, string nodeid, out ReadOn
break;
}
FlushConfig();
logger?.LogInformation("SLOT {slot} IMPORTED FROM {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
logger?.LogTrace("SLOT {slot} IMPORTED FROM {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}
return true;
Expand Down Expand Up @@ -418,7 +415,7 @@ public bool TryPrepareSlotsForOwnershipChange(HashSet<int> slots, string nodeid,
}

FlushConfig();
logger?.LogInformation("SLOT {slot} IMPORTED TO {endpoint}", slots, currentConfig.GetWorkerAddressFromNodeId(nodeid));
logger?.LogTrace("Slots {slot} IMPORTED TO {endpoint}", GetRange(slots.ToArray()), currentConfig.GetWorkerAddressFromNodeId(nodeid));
return true;
}

Expand Down
2 changes: 2 additions & 0 deletions libs/cluster/Server/Migration/MigrateSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ internal sealed unsafe partial class MigrateSession : IDisposable
readonly List<(int, int)> _slotRanges;
readonly Dictionary<ArgSlice, KeyMigrationStatus> _keys;
SingleWriterMultiReaderLock _keyDictLock;
SingleWriterMultiReaderLock _disposed;

readonly HashSet<int> _sslots;
readonly CancellationTokenSource _cts = new();
Expand Down Expand Up @@ -222,6 +223,7 @@ internal MigrateSession(
/// </summary>
public void Dispose()
{
if (!_disposed.TryWriteLock()) return;
_cts?.Cancel();
_cts?.Dispose();
_gcs.Dispose();
Expand Down
6 changes: 2 additions & 4 deletions libs/cluster/Server/Migration/MigrateSessionSlots.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public bool MigrateSlotsDriver()
while (true)
{
// Iterate main store
if (!localServerSession.BasicGarnetApi.IterateMainStore(ref mainStoreGetKeysInSlots, storeTailAddress))
return false;
_ = localServerSession.BasicGarnetApi.IterateMainStore(ref mainStoreGetKeysInSlots, storeTailAddress);

// If did not acquire any keys stop scanning
if (_keys.IsNullOrEmpty())
Expand All @@ -50,8 +49,7 @@ public bool MigrateSlotsDriver()
while (true)
{
// Iterate object store
if (!localServerSession.BasicGarnetApi.IterateObjectStore(ref objectStoreGetKeysInSlots, objectStoreTailAddress))
return false;
_ = localServerSession.BasicGarnetApi.IterateObjectStore(ref objectStoreGetKeysInSlots, objectStoreTailAddress);

// If did not acquire any keys stop scanning
if (_keys.IsNullOrEmpty())
Expand Down
34 changes: 23 additions & 11 deletions libs/cluster/Server/Migration/MigrateSessionTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ public int GetNumSessions()
_lock.ReadLock();
if (_disposed) return 0;

HashSet<MigrateSession> ss = [];
for (var i = 0; i < sessions.Length; i++)
count += sessions[i] != null ? 1 : 0;
{
if (sessions[i] != null)
_ = ss.Add(sessions[i]);
}
count = ss.Count;
}
finally
{
Expand Down Expand Up @@ -101,6 +106,7 @@ public bool TryAddMigrateSession(
_lock.WriteLock();
if (_disposed) return false;

// First iterate and check if corresponding slot is associated to another active migrate session
foreach (var slot in mSession.GetSlots)
{
if (sessions[slot] != null)
Expand All @@ -109,8 +115,12 @@ public bool TryAddMigrateSession(
success = false;
return false;
}
sessions[slot] = mSession;
}

// If reached this point all slots to be migrated are not associated with any other session
// so we can mark them as being associated with this newly added session
foreach (var slot in mSession.GetSlots)
sessions[slot] = mSession;
}
catch (Exception ex)
{
Expand All @@ -129,6 +139,11 @@ public bool TryAddMigrateSession(
return success;
}

/// <summary>
/// Remove only the provided session instance
/// </summary>
/// <param name="mSession"></param>
/// <returns></returns>
public bool TryRemove(MigrateSession mSession)
{
try
Expand Down Expand Up @@ -156,30 +171,27 @@ public bool TryRemove(MigrateSession mSession)
}
}

/// <summary>
/// Remove all sessions associated with the provided targetNodeId
/// </summary>
/// <param name="targetNodeId"></param>
/// <returns></returns>
public bool TryRemove(string targetNodeId)
{
try
{
_lock.WriteLock();
if (_disposed) return false;
HashSet<MigrateSession> mSessions = null;
for (var i = 0; i < sessions.Length; i++)
{
var s = sessions[i];
if (s != null && s.GetTargetNodeId.Equals(targetNodeId, StringComparison.Ordinal))
{
sessions[i] = null;
mSessions ??= [];
_ = mSessions.Add(s);
s.Dispose();
}
}

if (mSessions != null)
{
foreach (var session in mSessions)
session.Dispose();
}

return true;
}
catch (Exception ex)
Expand Down
64 changes: 3 additions & 61 deletions libs/cluster/Server/Migration/MigrationKeyIterationFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,6 @@ internal sealed unsafe partial class MigrateSession : IDisposable
{
internal sealed class MigrationKeyIterationFunctions
{
internal struct MainStoreMigrateSlots : IScanIteratorFunctions<SpanByte, SpanByte>
{
readonly MigrateSession session;
readonly HashSet<int> slots;

internal MainStoreMigrateSlots(MigrateSession session, HashSet<int> slots)
{
this.session = session;
this.slots = slots;
}

public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
var s = HashSlotUtils.HashSlot(ref key);

if (slots.Contains(s) && !ClusterSession.Expired(ref value) && !session.WriteOrSendMainStoreKeyValuePair(ref key, ref value))
return false;
return true;
}
public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal struct ObjectStoreMigrateSlots : IScanIteratorFunctions<byte[], IGarnetObject>
{
readonly MigrateSession session;
readonly HashSet<int> slots;

internal ObjectStoreMigrateSlots(MigrateSession session, HashSet<int> slots)
{
this.session = session;
this.slots = slots;
}

public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
var slot = HashSlotUtils.HashSlot(key);

if (slots.Contains(slot) && !ClusterSession.Expired(ref value))
{
byte[] objectData = GarnetObjectSerializer.Serialize(value);
if (!session.WriteOrSendObjectStoreKeyValuePair(key, objectData, value.Expiration))
return false;
}
return true;
}
public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
public bool OnStart(long beginAddress, long endAddress) => true;
public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal unsafe struct MainStoreGetKeysInSlots : IScanIteratorFunctions<SpanByte, SpanByte>
{
MigrationScanIterator iterator;
Expand Down Expand Up @@ -148,7 +90,6 @@ public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}


internal struct MigrationScanIterator
{
readonly MigrateSession session;
Expand Down Expand Up @@ -198,8 +139,9 @@ public void AdvanceIterator()
/// <returns></returns>
public bool Consume(ref Span<byte> key)
{
// Check if key is within the current processing window
if (currentOffset < offset)
// Check if key is within the current processing window only if _copyOption is set
// in order to skip keys that have been send over to target node but not deleted locally
if (session._copyOption && currentOffset < offset)
{
currentOffset++;
return true;
Expand Down
12 changes: 10 additions & 2 deletions libs/cluster/Session/MigrateCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal enum MigrateCmdParseState : byte
SLOTOUTOFRANGE,
NOTMIGRATING,
MULTI_TRANSFER_OPTION,
FAILEDTOADDKEY
}

private bool HandleCommandParsingErrors(MigrateCmdParseState mpState, string targetAddress, int targetPort, int slotMultiRef)
Expand All @@ -48,6 +49,7 @@ private bool HandleCommandParsingErrors(MigrateCmdParseState mpState, string tar
MigrateCmdParseState.INCOMPLETESLOTSRANGE => CmdStrings.RESP_ERR_GENERIC_INCOMPLETESLOTSRANGE,
MigrateCmdParseState.SLOTOUTOFRANGE => Encoding.ASCII.GetBytes($"ERR Slot {slotMultiRef} out of range."),
MigrateCmdParseState.NOTMIGRATING => CmdStrings.RESP_ERR_GENERIC_SLOTNOTMIGRATING,
MigrateCmdParseState.FAILEDTOADDKEY => CmdStrings.RESP_ERR_GENERIC_FAILEDTOADDKEY,
_ => CmdStrings.RESP_ERR_GENERIC_PARSING,
};
while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend))
Expand Down Expand Up @@ -181,9 +183,12 @@ private bool TryMIGRATE(int count, byte* ptr)

// Add pointer of current parsed key
if (!keys.TryAdd(new ArgSlice(keyPtr, ksize), KeyMigrationStatus.QUEUED))
{
logger?.LogWarning($"Failed to add {{key}}", Encoding.ASCII.GetString(keyPtr, ksize));
else
_ = slots.Add(slot);
pstate = MigrateCmdParseState.FAILEDTOADDKEY;
continue;
}
_ = slots.Add(slot);
}
}
else if (option.Equals("SLOTS", StringComparison.OrdinalIgnoreCase))
Expand Down Expand Up @@ -228,6 +233,9 @@ private bool TryMIGRATE(int count, byte* ptr)
}
else if (option.Equals("SLOTSRANGE", StringComparison.OrdinalIgnoreCase))
{
if (transferOption == TransferOption.KEYS)
pstate = MigrateCmdParseState.MULTI_TRANSFER_OPTION;
transferOption = TransferOption.SLOTS;
slots = [];
if (args == 0 || (args & 0x1) > 0)
{
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class GarnetServer : IDisposable
protected StoreWrapper storeWrapper;

// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.17";
readonly string version = "1.0.18";

/// <summary>
/// Resp protocol version
Expand Down
Loading

0 comments on commit 2725774

Please sign in to comment.