Skip to content

Commit

Permalink
Merge branch 'main' into adding-renamenx-support
Browse files Browse the repository at this point in the history
  • Loading branch information
TalZaccai committed Sep 19, 2024
2 parents f34cf8b + 7d866da commit 71cb473
Show file tree
Hide file tree
Showing 16 changed files with 1,034 additions and 1,845 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same
######################################
name: 1.0.22
name: 1.0.24
trigger:
branches:
include:
Expand Down
2 changes: 2 additions & 0 deletions benchmark/BDN.benchmark/Resp/RespParseStress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void GlobalSetup()
{
QuietMode = true,
AuthSettings = authSettings,
MetricsSamplingFrequency = 5,
LatencyMonitor = true,
};
server = new EmbeddedRespServer(opt);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal sealed unsafe partial class MigrateSession : IDisposable
{
internal sealed class MigrationKeyIterationFunctions
{
internal unsafe struct MainStoreGetKeysInSlots : IScanIteratorFunctions<SpanByte, SpanByte>
internal sealed unsafe class MainStoreGetKeysInSlots : IScanIteratorFunctions<SpanByte, SpanByte>
{
MigrationScanIterator iterator;

Expand Down Expand Up @@ -90,7 +90,7 @@ public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal struct MigrationScanIterator
internal sealed class MigrationScanIterator
{
readonly MigrateSession session;
readonly HashSet<int> slots;
Expand Down
6 changes: 4 additions & 2 deletions libs/cluster/Session/ClusterKeyIterationFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ internal sealed unsafe partial class ClusterSession : IClusterSession
{
internal static class ClusterKeyIterationFunctions
{
internal struct MainStoreCountKeys : IScanIteratorFunctions<SpanByte, SpanByte>
internal sealed class MainStoreCountKeys : IScanIteratorFunctions<SpanByte, SpanByte>
{
// This must be a class as it is passed through pending IO operations
internal int keyCount;
readonly int slot;

Expand All @@ -34,8 +35,9 @@ public void OnStop(bool completed, long numberOfRecords) { }
public void OnException(Exception exception, long numberOfRecords) { }
}

internal struct ObjectStoreCountKeys : IScanIteratorFunctions<byte[], IGarnetObject>
internal sealed class ObjectStoreCountKeys : IScanIteratorFunctions<byte[], IGarnetObject>
{
// This must be a class as it is passed through pending IO operations
internal int keyCount;
readonly int slot;

Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class GarnetServer : IDisposable
protected StoreWrapper storeWrapper;

// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.22";
readonly string version = "1.0.24";

/// <summary>
/// Resp protocol version
Expand Down
7 changes: 5 additions & 2 deletions libs/server/Resp/AdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
private void ProcessAdminCommands(RespCommand command)
{
hasAdminCommand = true;

/*
* WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
*/
containsSlowCommand = true;
if (_authenticator.CanAuthenticate && !_authenticator.IsAuthenticated)
{
// If the current session is unauthenticated, we stop parsing, because no other commands are allowed
Expand All @@ -39,6 +41,7 @@ private void ProcessAdminCommands(RespCommand command)
RespCommand.CONFIG_SET => NetworkCONFIG_SET(),
RespCommand.FAILOVER or
RespCommand.REPLICAOF or
RespCommand.MIGRATE or
RespCommand.SECONDARYOF => NetworkProcessClusterCommand(command),
RespCommand.LATENCY_HELP => NetworkLatencyHelp(),
RespCommand.LATENCY_HISTOGRAM => NetworkLatencyHistogram(),
Expand Down
180 changes: 99 additions & 81 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -115,8 +114,8 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
/// </summary>
public IGarnetServer Server { get; set; }

// Track whether the incoming network batch had some admin command
bool hasAdminCommand;
// Track whether the incoming network batch contains slow commands that should not be counter in NET_RS histogram
bool containsSlowCommand;

readonly CustomCommandManagerSession customCommandManagerSession;

Expand Down Expand Up @@ -369,10 +368,10 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
{
if (latencyMetrics != null)
{
if (hasAdminCommand)
if (containsSlowCommand)
{
latencyMetrics.StopAndSwitch(LatencyMetricsType.NET_RS_LAT, LatencyMetricsType.NET_RS_LAT_ADMIN);
hasAdminCommand = false;
containsSlowCommand = false;
}
else
latencyMetrics.Stop(LatencyMetricsType.NET_RS_LAT);
Expand Down Expand Up @@ -443,6 +442,10 @@ private void ProcessMessages()
SendAndReset();
}
}
else
{
containsSlowCommand = true;
}

// Advance read head variables to process the next command
_origReadHead = readHead = endReadHead;
Expand Down Expand Up @@ -496,6 +499,10 @@ private bool MakeUpperCase(byte* ptr)
private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
/*
* WARNING: Do not add any command here classified as @slow!
* Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
*/
_ = cmd switch
{
RespCommand.GET => NetworkGET(ref storageApi),
Expand All @@ -516,6 +523,7 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.SETRANGE => NetworkSetRange(ref storageApi),
RespCommand.GETDEL => NetworkGETDEL(ref storageApi),
RespCommand.APPEND => NetworkAppend(ref storageApi),
RespCommand.STRLEN => NetworkSTRLEN(ref storageApi),
RespCommand.INCR => NetworkIncrement(RespCommand.INCR, ref storageApi),
RespCommand.INCRBY => NetworkIncrement(RespCommand.INCRBY, ref storageApi),
RespCommand.DECR => NetworkIncrement(RespCommand.DECR, ref storageApi),
Expand All @@ -525,7 +533,7 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.BITCOUNT => NetworkStringBitCount(ref storageApi),
RespCommand.BITPOS => NetworkStringBitPosition(ref storageApi),
RespCommand.PUBLISH => NetworkPUBLISH(),
RespCommand.PING => parseState.Count == 0 ? NetworkPING() : ProcessArrayCommands(cmd, ref storageApi),
RespCommand.PING => parseState.Count == 0 ? NetworkPING() : NetworkArrayPING(),
RespCommand.ASKING => NetworkASKING(),
RespCommand.MULTI => NetworkMULTI(),
RespCommand.EXEC => NetworkEXEC(),
Expand All @@ -535,22 +543,6 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.RUNTXP => NetworkRUNTXP(),
RespCommand.READONLY => NetworkREADONLY(),
RespCommand.READWRITE => NetworkREADWRITE(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_INFO => NetworkCOMMAND_INFO(),
RespCommand.ECHO => NetworkECHO(),
RespCommand.INFO => NetworkINFO(),
RespCommand.HELLO => NetworkHELLO(),
RespCommand.TIME => NetworkTIME(),
RespCommand.FLUSHALL => NetworkFLUSHALL(),
RespCommand.FLUSHDB => NetworkFLUSHDB(),
RespCommand.AUTH => NetworkAUTH(),
RespCommand.MEMORY_USAGE => NetworkMemoryUsage(ref storageApi),
RespCommand.ACL_CAT => NetworkAclCat(),
RespCommand.ACL_WHOAMI => NetworkAclWhoAmI(),
RespCommand.ASYNC => NetworkASYNC(),
RespCommand.MIGRATE => NetworkProcessClusterCommand(cmd),

_ => ProcessArrayCommands(cmd, ref storageApi)
};

Expand All @@ -560,6 +552,10 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
/*
* WARNING: Do not add any command here classified as @slow!
* Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
*/
var success = cmd switch
{
RespCommand.MGET => NetworkMGET(ref storageApi),
Expand All @@ -570,13 +566,6 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.WATCH => NetworkWATCH(),
RespCommand.WATCH_MS => NetworkWATCH_MS(),
RespCommand.WATCH_OS => NetworkWATCH_OS(),
RespCommand.STRLEN => NetworkSTRLEN(ref storageApi),
RespCommand.PING => NetworkArrayPING(),
//General key commands
RespCommand.DBSIZE => NetworkDBSIZE(ref storageApi),
RespCommand.KEYS => NetworkKEYS(ref storageApi),
RespCommand.SCAN => NetworkSCAN(ref storageApi),
RespCommand.TYPE => NetworkTYPE(ref storageApi),
// Pub/sub commands
RespCommand.SUBSCRIBE => NetworkSUBSCRIBE(),
RespCommand.PSUBSCRIBE => NetworkPSUBSCRIBE(),
Expand Down Expand Up @@ -677,10 +666,6 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.SUNIONSTORE => SetUnionStore(ref storageApi),
RespCommand.SDIFF => SetDiff(ref storageApi),
RespCommand.SDIFFSTORE => SetDiffStore(ref storageApi),
// Script Commands
RespCommand.SCRIPT => TrySCRIPT(),
RespCommand.EVAL => TryEVAL(),
RespCommand.EVALSHA => TryEVALSHA(),
_ => ProcessOtherCommands(cmd, ref storageApi)
};
return success;
Expand All @@ -689,7 +674,48 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (command == RespCommand.CLIENT_ID)
/*
* WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
*/
containsSlowCommand = true;
var success = command switch
{
RespCommand.AUTH => NetworkAUTH(),
RespCommand.MEMORY_USAGE => NetworkMemoryUsage(ref storageApi),
RespCommand.CLIENT_ID => NetworkCLIENTID(),
RespCommand.CLIENT_INFO => NetworkCLIENTINFO(),
RespCommand.CLIENT_LIST => NetworkCLIENTLIST(),
RespCommand.CLIENT_KILL => NetworkCLIENTKILL(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_INFO => NetworkCOMMAND_INFO(),
RespCommand.ECHO => NetworkECHO(),
RespCommand.HELLO => NetworkHELLO(),
RespCommand.TIME => NetworkTIME(),
RespCommand.FLUSHALL => NetworkFLUSHALL(),
RespCommand.FLUSHDB => NetworkFLUSHDB(),
RespCommand.ACL_CAT => NetworkAclCat(),
RespCommand.ACL_WHOAMI => NetworkAclWhoAmI(),
RespCommand.ASYNC => NetworkASYNC(),
RespCommand.RUNTXP => NetworkRUNTXP(),
RespCommand.INFO => NetworkINFO(),
RespCommand.CustomTxn => NetworkCustomTxn(),
RespCommand.CustomRawStringCmd => NetworkCustomRawStringCmd(ref storageApi),
RespCommand.CustomObjCmd => NetworkCustomObjCmd(ref storageApi),
RespCommand.CustomProcedure => NetworkCustomProcedure(),
//General key commands
RespCommand.DBSIZE => NetworkDBSIZE(ref storageApi),
RespCommand.KEYS => NetworkKEYS(ref storageApi),
RespCommand.SCAN => NetworkSCAN(ref storageApi),
RespCommand.TYPE => NetworkTYPE(ref storageApi),
// Script Commands
RespCommand.SCRIPT => TrySCRIPT(),
RespCommand.EVAL => TryEVAL(),
RespCommand.EVALSHA => TryEVALSHA(),
_ => Process(command)
};

bool NetworkCLIENTID()
{
if (parseState.Count != 0)
{
Expand All @@ -701,28 +727,8 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp

return true;
}
else if (command == RespCommand.CLIENT_INFO)
{
return NetworkCLIENTINFO();
}
else if (command == RespCommand.CLIENT_LIST)
{
return NetworkCLIENTLIST();
}
else if (command == RespCommand.CLIENT_KILL)
{
return NetworkCLIENTKILL();
}
else if (command == RespCommand.SUBSCRIBE)
{
while (!RespWriteUtils.WriteInteger(1, ref dcurr, dend))
SendAndReset();
}
else if (command == RespCommand.RUNTXP)
{
return NetworkRUNTXP();
}
else if (command == RespCommand.CustomTxn)

bool NetworkCustomTxn()
{
if (!IsCommandArityValid(currentCustomTransaction.NameStr, parseState.Count))
{
Expand All @@ -733,32 +739,10 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
// Perform the operation
TryTransactionProc(currentCustomTransaction.id, recvBufferPtr + readHead, recvBufferPtr + endReadHead, customCommandManagerSession.GetCustomTransactionProcedure(currentCustomTransaction.id, txnManager, scratchBufferManager).Item1);
currentCustomTransaction = null;
return true;
}
else if (command == RespCommand.CustomRawStringCmd)
{
if (!IsCommandArityValid(currentCustomRawStringCommand.NameStr, parseState.Count))
{
currentCustomRawStringCommand = null;
return true;
}

// Perform the operation
TryCustomRawStringCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomRawStringCommand.GetRespCommand(), currentCustomRawStringCommand.expirationTicks, currentCustomRawStringCommand.type, ref storageApi);
currentCustomRawStringCommand = null;
}
else if (command == RespCommand.CustomObjCmd)
{
if (!IsCommandArityValid(currentCustomObjectCommand.NameStr, parseState.Count))
{
currentCustomObjectCommand = null;
return true;
}

// Perform the operation
TryCustomObjectCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomObjectCommand.GetRespCommand(), currentCustomObjectCommand.subid, currentCustomObjectCommand.type, ref storageApi);
currentCustomObjectCommand = null;
}
else if (command == RespCommand.CustomProcedure)
bool NetworkCustomProcedure()
{
if (!IsCommandArityValid(currentCustomProcedure.NameStr, parseState.Count))
{
Expand All @@ -770,12 +754,46 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
currentCustomProcedure.CustomProcedureImpl);

currentCustomProcedure = null;
return true;
}
else

[MethodImpl(MethodImplOptions.AggressiveInlining)]
bool Process(RespCommand command)
{
ProcessAdminCommands(command);
return true;
}

return success;
}

private bool NetworkCustomRawStringCmd<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (!IsCommandArityValid(currentCustomRawStringCommand.NameStr, parseState.Count))
{
currentCustomRawStringCommand = null;
return true;
}

// Perform the operation
TryCustomRawStringCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomRawStringCommand.GetRespCommand(), currentCustomRawStringCommand.expirationTicks, currentCustomRawStringCommand.type, ref storageApi);
currentCustomRawStringCommand = null;
return true;
}

bool NetworkCustomObjCmd<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (!IsCommandArityValid(currentCustomObjectCommand.NameStr, parseState.Count))
{
currentCustomObjectCommand = null;
return true;
}

// Perform the operation
TryCustomObjectCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomObjectCommand.GetRespCommand(), currentCustomObjectCommand.subid, currentCustomObjectCommand.type, ref storageApi);
currentCustomObjectCommand = null;
return true;
}

Expand Down
Loading

0 comments on commit 71cb473

Please sign in to comment.