From a1e6e771ccaf974b35d70505b646c445a3e065fd Mon Sep 17 00:00:00 2001 From: Yoganand Rajasekaran <60369795+yrajas@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:13:35 -0700 Subject: [PATCH] Update to v1.0.16. Fix disposal of migration sessions. (#530) Problem When using KEYS option for slot migration, the migration session removal isn't disposing the session object. This leads to an out of memory exception when migrating a large number of keys. Fix Set slots correctly for the migrate operation. This helps to ensure the slots are correctly reset on removal of migration session. Also, removal of multiple outstanding session objects when node is removed for the FORGET command, will dispose each of those session objects. --- .../azure-pipelines-external-release.yml | 2 +- .../Migration/MigrateSessionTaskStore.cs | 40 +++++++++++++++++-- .../Server/Migration/MigrationManager.cs | 2 +- libs/cluster/Session/MigrateCommand.cs | 2 + libs/host/GarnetServer.cs | 2 +- .../ClusterMigrateTests.cs | 37 ++++++++++------- 6 files changed, 65 insertions(+), 20 deletions(-) diff --git a/.azure/pipelines/azure-pipelines-external-release.yml b/.azure/pipelines/azure-pipelines-external-release.yml index 3dbe44e196..8d8c110f90 100644 --- a/.azure/pipelines/azure-pipelines-external-release.yml +++ b/.azure/pipelines/azure-pipelines-external-release.yml @@ -3,7 +3,7 @@ # 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0) # 2) update \libs\host\GarnetServer.cs readonly string version (~line 45) -- NOTE - these two values need to be the same ###################################### -name: 1.0.15 +name: 1.0.16 trigger: branches: include: diff --git a/libs/cluster/Server/Migration/MigrateSessionTaskStore.cs b/libs/cluster/Server/Migration/MigrateSessionTaskStore.cs index effd1cadf1..9783bfcd42 100644 --- a/libs/cluster/Server/Migration/MigrateSessionTaskStore.cs +++ b/libs/cluster/Server/Migration/MigrateSessionTaskStore.cs @@ -129,23 +129,57 @@ public bool TryAddMigrateSession( return success; } + public bool TryRemove(MigrateSession mSession) + { + try + { + _lock.WriteLock(); + if (_disposed) return false; + + foreach (var slot in mSession.GetSlots) + { + Debug.Assert(sessions[slot] == mSession, "MigrateSession not found in slot"); + sessions[slot] = null; + } + + mSession.Dispose(); + return true; + } + catch (Exception ex) + { + logger?.LogError(ex, "Error at TryRemove"); + return false; + } + finally + { + _lock.WriteUnlock(); + } + } + public bool TryRemove(string targetNodeId) { try { _lock.WriteLock(); if (_disposed) return false; - MigrateSession mSession = null; + HashSet mSessions = null; for (var i = 0; i < sessions.Length; i++) { var s = sessions[i]; if (s != null && s.GetTargetNodeId.Equals(targetNodeId, StringComparison.Ordinal)) { sessions[i] = null; - mSession = s; + mSessions ??= []; + _ = mSessions.Add(s); } } - mSession?.Dispose(); + + if (mSessions != null) + { + foreach (var session in mSessions) + session.Dispose(); + } + return true; } catch (Exception ex) diff --git a/libs/cluster/Server/Migration/MigrationManager.cs b/libs/cluster/Server/Migration/MigrationManager.cs index 094a7cc05e..b4d7af39d7 100644 --- a/libs/cluster/Server/Migration/MigrationManager.cs +++ b/libs/cluster/Server/Migration/MigrationManager.cs @@ -82,7 +82,7 @@ public bool TryAddMigrationTask( /// /// public bool TryRemoveMigrationTask(MigrateSession mSession) - => migrationTaskStore.TryRemove(mSession.GetTargetNodeId); + => migrationTaskStore.TryRemove(mSession); /// /// Remove migration task associated with provided target nodeId diff --git a/libs/cluster/Session/MigrateCommand.cs b/libs/cluster/Session/MigrateCommand.cs index bb8c280a87..ff963632ee 100644 --- a/libs/cluster/Session/MigrateCommand.cs +++ b/libs/cluster/Session/MigrateCommand.cs @@ -182,6 +182,8 @@ private bool TryMIGRATE(int count, byte* ptr) // Add pointer of current parsed key if (!keys.TryAdd(new ArgSlice(keyPtr, ksize), KeyMigrationStatus.QUEUED)) logger?.LogWarning($"Failed to add {{key}}", Encoding.ASCII.GetString(keyPtr, ksize)); + else + _ = slots.Add(slot); } } else if (option.Equals("SLOTS", StringComparison.OrdinalIgnoreCase)) diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index a36b5c0a5f..782f6077c8 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -43,7 +43,7 @@ public class GarnetServer : IDisposable protected StoreWrapper storeWrapper; // IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6. - readonly string version = "1.0.15"; + readonly string version = "1.0.16"; /// /// Resp protocol version diff --git a/test/Garnet.test.cluster/ClusterMigrateTests.cs b/test/Garnet.test.cluster/ClusterMigrateTests.cs index ef13d54d67..c3659d335b 100644 --- a/test/Garnet.test.cluster/ClusterMigrateTests.cs +++ b/test/Garnet.test.cluster/ClusterMigrateTests.cs @@ -1572,6 +1572,8 @@ Task WriteWorkload(IPEndPoint endPoint, byte[] key, int keyLen = 16) } } + [Test, Order(16)] + [Category("CLUSTER")] public void ClusterMigrateForgetTest() { context.logger.LogDebug($"0. ClusterSimpleMigrateSlotsRanges started"); @@ -1585,23 +1587,30 @@ public void ClusterMigrateForgetTest() var sourceNodeId = context.clusterTestUtils.ClusterMyId(sourceNodeIndex, context.logger); var targetNodeId = context.clusterTestUtils.ClusterMyId(targetNodeIndex, context.logger); - var resp = context.clusterTestUtils.SetSlot(sourceNodeIndex, 0, "MIGRATING", targetNodeId, context.logger); - Assert.AreEqual("OK", resp); - - var slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, 0, context.logger); - Assert.AreEqual(3, slotState.Length); - Assert.AreEqual("0", slotState[0]); - Assert.AreEqual(">", slotState[1]); - Assert.AreEqual(targetNodeId, slotState[2]); + var numSlots = 3; + for (var slot = 0; slot < numSlots; slot++) + { + var migresp = context.clusterTestUtils.SetSlot(sourceNodeIndex, slot, "MIGRATING", targetNodeId, context.logger); + Assert.AreEqual("OK", migresp); + + var slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, slot, context.logger); + Assert.AreEqual(3, slotState.Length); + Assert.AreEqual(slot.ToString(), slotState[0]); + Assert.AreEqual(">", slotState[1]); + Assert.AreEqual(targetNodeId, slotState[2]); + } - resp = context.clusterTestUtils.ClusterForget(sourceNodeIndex, targetNodeId, 100, context.logger); + var resp = context.clusterTestUtils.ClusterForget(sourceNodeIndex, targetNodeId, 100, context.logger); Assert.AreEqual("OK", resp); - slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, 0, context.logger); - Assert.AreEqual(3, slotState.Length); - Assert.AreEqual("0", slotState[0]); - Assert.AreEqual("=", slotState[1]); - Assert.AreEqual(sourceNodeId, slotState[2]); + for (var slot = 0; slot < numSlots; slot++) + { + var slotState = context.clusterTestUtils.SlotState(sourceNodeIndex, slot, context.logger); + Assert.AreEqual(3, slotState.Length); + Assert.AreEqual(slot.ToString(), slotState[0]); + Assert.AreEqual("=", slotState[1]); + Assert.AreEqual(sourceNodeId, slotState[2]); + } } } } \ No newline at end of file