Skip to content

Commit

Permalink
Fix Latency Tracking Issue (#659)
Browse files Browse the repository at this point in the history
* consider INFO and other commands as admin commands for latency tracking

* cleanup process other commands

* move non data commands under other commands

* classify command early for latency calculation

* revert bool return from process admin commands

* nit

* classify command for latency using commandInfo

* inline Process method

* revert back to implicit classification of commands for latency measurements

* bump garnet version

* add safety messages in code
  • Loading branch information
vazois committed Sep 16, 2024
1 parent 840997f commit f75773c
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same
######################################
name: 1.0.22
name: 1.0.23
trigger:
branches:
include:
Expand Down
2 changes: 2 additions & 0 deletions benchmark/BDN.benchmark/Resp/RespParseStress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void GlobalSetup()
{
QuietMode = true,
AuthSettings = authSettings,
MetricsSamplingFrequency = 5,
LatencyMonitor = true,
};
server = new EmbeddedRespServer(opt);

Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class GarnetServer : IDisposable
protected StoreWrapper storeWrapper;

// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.22";
readonly string version = "1.0.23";

/// <summary>
/// Resp protocol version
Expand Down
7 changes: 5 additions & 2 deletions libs/server/Resp/AdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
private void ProcessAdminCommands(RespCommand command)
{
hasAdminCommand = true;

/*
* WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
*/
containsSlowCommand = true;
if (_authenticator.CanAuthenticate && !_authenticator.IsAuthenticated)
{
// If the current session is unauthenticated, we stop parsing, because no other commands are allowed
Expand All @@ -39,6 +41,7 @@ private void ProcessAdminCommands(RespCommand command)
RespCommand.CONFIG_SET => NetworkCONFIG_SET(),
RespCommand.FAILOVER or
RespCommand.REPLICAOF or
RespCommand.MIGRATE or
RespCommand.SECONDARYOF => NetworkProcessClusterCommand(command),
RespCommand.LATENCY_HELP => NetworkLatencyHelp(),
RespCommand.LATENCY_HISTOGRAM => NetworkLatencyHistogram(),
Expand Down
180 changes: 99 additions & 81 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -115,8 +114,8 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
/// </summary>
public IGarnetServer Server { get; set; }

// Track whether the incoming network batch had some admin command
bool hasAdminCommand;
// Track whether the incoming network batch contains slow commands that should not be counter in NET_RS histogram
bool containsSlowCommand;

readonly CustomCommandManagerSession customCommandManagerSession;

Expand Down Expand Up @@ -369,10 +368,10 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
{
if (latencyMetrics != null)
{
if (hasAdminCommand)
if (containsSlowCommand)
{
latencyMetrics.StopAndSwitch(LatencyMetricsType.NET_RS_LAT, LatencyMetricsType.NET_RS_LAT_ADMIN);
hasAdminCommand = false;
containsSlowCommand = false;
}
else
latencyMetrics.Stop(LatencyMetricsType.NET_RS_LAT);
Expand Down Expand Up @@ -443,6 +442,10 @@ private void ProcessMessages()
SendAndReset();
}
}
else
{
containsSlowCommand = true;
}

// Advance read head variables to process the next command
_origReadHead = readHead = endReadHead;
Expand Down Expand Up @@ -496,6 +499,10 @@ private bool MakeUpperCase(byte* ptr)
private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
/*
* WARNING: Do not add any command here classified as @slow!
* Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
*/
_ = cmd switch
{
RespCommand.GET => NetworkGET(ref storageApi),
Expand All @@ -515,6 +522,7 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.SETRANGE => NetworkSetRange(ref storageApi),
RespCommand.GETDEL => NetworkGETDEL(ref storageApi),
RespCommand.APPEND => NetworkAppend(ref storageApi),
RespCommand.STRLEN => NetworkSTRLEN(ref storageApi),
RespCommand.INCR => NetworkIncrement(RespCommand.INCR, ref storageApi),
RespCommand.INCRBY => NetworkIncrement(RespCommand.INCRBY, ref storageApi),
RespCommand.DECR => NetworkIncrement(RespCommand.DECR, ref storageApi),
Expand All @@ -524,7 +532,7 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.BITCOUNT => NetworkStringBitCount(ref storageApi),
RespCommand.BITPOS => NetworkStringBitPosition(ref storageApi),
RespCommand.PUBLISH => NetworkPUBLISH(),
RespCommand.PING => parseState.Count == 0 ? NetworkPING() : ProcessArrayCommands(cmd, ref storageApi),
RespCommand.PING => parseState.Count == 0 ? NetworkPING() : NetworkArrayPING(),
RespCommand.ASKING => NetworkASKING(),
RespCommand.MULTI => NetworkMULTI(),
RespCommand.EXEC => NetworkEXEC(),
Expand All @@ -534,22 +542,6 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.RUNTXP => NetworkRUNTXP(),
RespCommand.READONLY => NetworkREADONLY(),
RespCommand.READWRITE => NetworkREADWRITE(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_INFO => NetworkCOMMAND_INFO(),
RespCommand.ECHO => NetworkECHO(),
RespCommand.INFO => NetworkINFO(),
RespCommand.HELLO => NetworkHELLO(),
RespCommand.TIME => NetworkTIME(),
RespCommand.FLUSHALL => NetworkFLUSHALL(),
RespCommand.FLUSHDB => NetworkFLUSHDB(),
RespCommand.AUTH => NetworkAUTH(),
RespCommand.MEMORY_USAGE => NetworkMemoryUsage(ref storageApi),
RespCommand.ACL_CAT => NetworkAclCat(),
RespCommand.ACL_WHOAMI => NetworkAclWhoAmI(),
RespCommand.ASYNC => NetworkASYNC(),
RespCommand.MIGRATE => NetworkProcessClusterCommand(cmd),

_ => ProcessArrayCommands(cmd, ref storageApi)
};

Expand All @@ -559,6 +551,10 @@ private bool ProcessBasicCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
/*
* WARNING: Do not add any command here classified as @slow!
* Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
*/
var success = cmd switch
{
RespCommand.MGET => NetworkMGET(ref storageApi),
Expand All @@ -569,13 +565,6 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.WATCH => NetworkWATCH(),
RespCommand.WATCH_MS => NetworkWATCH_MS(),
RespCommand.WATCH_OS => NetworkWATCH_OS(),
RespCommand.STRLEN => NetworkSTRLEN(ref storageApi),
RespCommand.PING => NetworkArrayPING(),
//General key commands
RespCommand.DBSIZE => NetworkDBSIZE(ref storageApi),
RespCommand.KEYS => NetworkKEYS(ref storageApi),
RespCommand.SCAN => NetworkSCAN(ref storageApi),
RespCommand.TYPE => NetworkTYPE(ref storageApi),
// Pub/sub commands
RespCommand.SUBSCRIBE => NetworkSUBSCRIBE(),
RespCommand.PSUBSCRIBE => NetworkPSUBSCRIBE(),
Expand Down Expand Up @@ -676,10 +665,6 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.SUNIONSTORE => SetUnionStore(ref storageApi),
RespCommand.SDIFF => SetDiff(ref storageApi),
RespCommand.SDIFFSTORE => SetDiffStore(ref storageApi),
// Script Commands
RespCommand.SCRIPT => TrySCRIPT(),
RespCommand.EVAL => TryEVAL(),
RespCommand.EVALSHA => TryEVALSHA(),
_ => ProcessOtherCommands(cmd, ref storageApi)
};
return success;
Expand All @@ -688,7 +673,48 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (command == RespCommand.CLIENT_ID)
/*
* WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
*/
containsSlowCommand = true;
var success = command switch
{
RespCommand.AUTH => NetworkAUTH(),
RespCommand.MEMORY_USAGE => NetworkMemoryUsage(ref storageApi),
RespCommand.CLIENT_ID => NetworkCLIENTID(),
RespCommand.CLIENT_INFO => NetworkCLIENTINFO(),
RespCommand.CLIENT_LIST => NetworkCLIENTLIST(),
RespCommand.CLIENT_KILL => NetworkCLIENTKILL(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_INFO => NetworkCOMMAND_INFO(),
RespCommand.ECHO => NetworkECHO(),
RespCommand.HELLO => NetworkHELLO(),
RespCommand.TIME => NetworkTIME(),
RespCommand.FLUSHALL => NetworkFLUSHALL(),
RespCommand.FLUSHDB => NetworkFLUSHDB(),
RespCommand.ACL_CAT => NetworkAclCat(),
RespCommand.ACL_WHOAMI => NetworkAclWhoAmI(),
RespCommand.ASYNC => NetworkASYNC(),
RespCommand.RUNTXP => NetworkRUNTXP(),
RespCommand.INFO => NetworkINFO(),
RespCommand.CustomTxn => NetworkCustomTxn(),
RespCommand.CustomRawStringCmd => NetworkCustomRawStringCmd(ref storageApi),
RespCommand.CustomObjCmd => NetworkCustomObjCmd(ref storageApi),
RespCommand.CustomProcedure => NetworkCustomProcedure(),
//General key commands
RespCommand.DBSIZE => NetworkDBSIZE(ref storageApi),
RespCommand.KEYS => NetworkKEYS(ref storageApi),
RespCommand.SCAN => NetworkSCAN(ref storageApi),
RespCommand.TYPE => NetworkTYPE(ref storageApi),
// Script Commands
RespCommand.SCRIPT => TrySCRIPT(),
RespCommand.EVAL => TryEVAL(),
RespCommand.EVALSHA => TryEVALSHA(),
_ => Process(command)
};

bool NetworkCLIENTID()
{
if (parseState.Count != 0)
{
Expand All @@ -700,28 +726,8 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp

return true;
}
else if (command == RespCommand.CLIENT_INFO)
{
return NetworkCLIENTINFO();
}
else if (command == RespCommand.CLIENT_LIST)
{
return NetworkCLIENTLIST();
}
else if (command == RespCommand.CLIENT_KILL)
{
return NetworkCLIENTKILL();
}
else if (command == RespCommand.SUBSCRIBE)
{
while (!RespWriteUtils.WriteInteger(1, ref dcurr, dend))
SendAndReset();
}
else if (command == RespCommand.RUNTXP)
{
return NetworkRUNTXP();
}
else if (command == RespCommand.CustomTxn)

bool NetworkCustomTxn()
{
if (!IsCommandArityValid(currentCustomTransaction.NameStr, parseState.Count))
{
Expand All @@ -732,32 +738,10 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
// Perform the operation
TryTransactionProc(currentCustomTransaction.id, recvBufferPtr + readHead, recvBufferPtr + endReadHead, customCommandManagerSession.GetCustomTransactionProcedure(currentCustomTransaction.id, txnManager, scratchBufferManager).Item1);
currentCustomTransaction = null;
return true;
}
else if (command == RespCommand.CustomRawStringCmd)
{
if (!IsCommandArityValid(currentCustomRawStringCommand.NameStr, parseState.Count))
{
currentCustomRawStringCommand = null;
return true;
}

// Perform the operation
TryCustomRawStringCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomRawStringCommand.GetRespCommand(), currentCustomRawStringCommand.expirationTicks, currentCustomRawStringCommand.type, ref storageApi);
currentCustomRawStringCommand = null;
}
else if (command == RespCommand.CustomObjCmd)
{
if (!IsCommandArityValid(currentCustomObjectCommand.NameStr, parseState.Count))
{
currentCustomObjectCommand = null;
return true;
}

// Perform the operation
TryCustomObjectCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomObjectCommand.GetRespCommand(), currentCustomObjectCommand.subid, currentCustomObjectCommand.type, ref storageApi);
currentCustomObjectCommand = null;
}
else if (command == RespCommand.CustomProcedure)
bool NetworkCustomProcedure()
{
if (!IsCommandArityValid(currentCustomProcedure.NameStr, parseState.Count))
{
Expand All @@ -769,12 +753,46 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
currentCustomProcedure.CustomProcedureImpl);

currentCustomProcedure = null;
return true;
}
else

[MethodImpl(MethodImplOptions.AggressiveInlining)]
bool Process(RespCommand command)
{
ProcessAdminCommands(command);
return true;
}

return success;
}

private bool NetworkCustomRawStringCmd<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (!IsCommandArityValid(currentCustomRawStringCommand.NameStr, parseState.Count))
{
currentCustomRawStringCommand = null;
return true;
}

// Perform the operation
TryCustomRawStringCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomRawStringCommand.GetRespCommand(), currentCustomRawStringCommand.expirationTicks, currentCustomRawStringCommand.type, ref storageApi);
currentCustomRawStringCommand = null;
return true;
}

bool NetworkCustomObjCmd<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (!IsCommandArityValid(currentCustomObjectCommand.NameStr, parseState.Count))
{
currentCustomObjectCommand = null;
return true;
}

// Perform the operation
TryCustomObjectCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomObjectCommand.GetRespCommand(), currentCustomObjectCommand.subid, currentCustomObjectCommand.type, ref storageApi);
currentCustomObjectCommand = null;
return true;
}

Expand Down

0 comments on commit f75773c

Please sign in to comment.