Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Aug 8, 2024
1 parent f393573 commit 07b5431
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
89 changes: 89 additions & 0 deletions test/Garnet.test.cluster/ClusterReplicationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,5 +1048,94 @@ public void ClusterReplicateFails()
var exc = Assert.Throws<RedisServerException>(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect));
Assert.IsTrue(exc.Message.StartsWith("ERR I don't know about node "));
}

//[Test, Order(22), Timeout(testTimeout)]
public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW)
{
var replica_count = 1;// Per primary
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
var primaryNodeIndex = 0;
var replicaNodeIndex = 1;
Assert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, disableObjects: false, MainMemoryReplication: true, CommitFrequencyMs: -1, OnDemandCheckpoint: true, enableAOF: true, useTLS: useTLS);
context.CreateConnection(useTLS: useTLS);
_ = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);

var keyLength = 32;
var kvpairCount = keyCount;
var addCount = 5;
context.kvPairs = [];

// Populate Primary
if (!performRMW)
context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex);
else
context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex, addCount);

context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger);
for (var i = 0; i < 5; i++)
{
var primaryLastSaveTime = context.clusterTestUtils.LastSave(primaryNodeIndex, logger: context.logger);
var replicaLastSaveTime = context.clusterTestUtils.LastSave(replicaNodeIndex, logger: context.logger);
context.clusterTestUtils.Checkpoint(primaryNodeIndex);
context.clusterTestUtils.WaitCheckpoint(primaryNodeIndex, primaryLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitCheckpoint(replicaNodeIndex, replicaLastSaveTime, logger: context.logger);
context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger);
}

var primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger);
var replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger);
Assert.AreEqual(6, primaryVersion);
Assert.AreEqual(primaryVersion, replicaVersion);

// Dispose primary and delete data
context.nodes[primaryNodeIndex].Dispose(true);
// Dispose primary but do not delete data
context.nodes[replicaNodeIndex].Dispose(false);

// Restart primary and do not recover
context.nodes[primaryNodeIndex] = context.CreateInstance(
context.clusterTestUtils.GetEndPoint(primaryNodeIndex).Port,
disableObjects: true,
tryRecover: false,
enableAOF: true,
MainMemoryReplication: true,
CommitFrequencyMs: -1,
OnDemandCheckpoint: true,
timeout: timeout,
useTLS: useTLS,
cleanClusterConfig: true);
context.nodes[primaryNodeIndex].Start();

// Restart secondary and recover
context.nodes[replicaNodeIndex] = context.CreateInstance(
context.clusterTestUtils.GetEndPoint(replicaNodeIndex).Port,
disableObjects: true,
tryRecover: true,
enableAOF: true,
MainMemoryReplication: true,
CommitFrequencyMs: -1,
OnDemandCheckpoint: true,
timeout: timeout,
useTLS: useTLS,
cleanClusterConfig: true);
context.nodes[replicaNodeIndex].Start();
context.CreateConnection(useTLS: useTLS);

// Assert primary version is 1 and replica has recovered to previous checkpoint
primaryVersion = context.clusterTestUtils.GetInfo(primaryNodeIndex, "store", "CurrentVersion", logger: context.logger);
replicaVersion = context.clusterTestUtils.GetInfo(replicaNodeIndex, "store", "CurrentVersion", logger: context.logger);
Assert.AreEqual(1, primaryVersion);
Assert.AreEqual(6, replicaVersion);

Assert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger));

context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger);
context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger);
context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger);

context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
}
}
}
24 changes: 24 additions & 0 deletions test/Garnet.test.cluster/ClusterTestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,30 @@ public string GetFailoverState(IPEndPoint endPoint, ILogger logger = null)
return items;
}

public string GetInfo(int nodeIndex, string section, string segment, ILogger logger = null)
=> GetInfo(endpoints[nodeIndex].ToIPEndPoint(), section, segment, logger);

public string GetInfo(IPEndPoint endPoint, string section, string segment, ILogger logger = null)
{
try
{
var server = redis.GetServer(endPoint);
var result = server.Info(section);
Assert.AreEqual(1, result.Length, "section does not exist");
foreach (var item in result[0])
if (item.Key.Equals(segment))
return item.Value;
Assert.Fail($"Segment not available for {section} section");
return "";
}
catch (Exception ex)
{
logger?.LogError(ex, "An error has occurred; GetFailoverState");
Assert.Fail(ex.Message);
return null;
}
}

public void WaitForReplicaAofSync(int primaryIndex, int secondaryIndex, ILogger logger = null)
{
long primaryReplicationOffset;
Expand Down

0 comments on commit 07b5431

Please sign in to comment.