From f75773c0a0ed66bde9a8b1e5a2fbf1a08ebfd0b1 Mon Sep 17 00:00:00 2001
From: Vasileios Zois <96085550+vazois@users.noreply.github.com>
Date: Mon, 16 Sep 2024 16:25:50 -0700
Subject: [PATCH] Fix Latency Tracking Issue (#659)
* consider INFO and other commands as admin commands for latency tracking
* cleanup process other commands
* move non data commands under other commands
* classify command early for latency calculation
* revert bool return from process admin commands
* nit
* classify command for latency using commandInfo
* inline Process method
* revert back to implicit classification of commands for latency measurements
* bump garnet version
* add safety messages in code
---
.../azure-pipelines-external-release.yml | 2 +-
.../BDN.benchmark/Resp/RespParseStress.cs | 2 +
libs/host/GarnetServer.cs | 2 +-
libs/server/Resp/AdminCommands.cs | 7 +-
libs/server/Resp/RespServerSession.cs | 180 ++++++++++--------
5 files changed, 108 insertions(+), 85 deletions(-)
diff --git a/.azure/pipelines/azure-pipelines-external-release.yml b/.azure/pipelines/azure-pipelines-external-release.yml
index 5dc9fef4c7..24816a5930 100644
--- a/.azure/pipelines/azure-pipelines-external-release.yml
+++ b/.azure/pipelines/azure-pipelines-external-release.yml
@@ -3,7 +3,7 @@
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same
######################################
-name: 1.0.22
+name: 1.0.23
trigger:
branches:
include:
diff --git a/benchmark/BDN.benchmark/Resp/RespParseStress.cs b/benchmark/BDN.benchmark/Resp/RespParseStress.cs
index 19ea1f41dd..e1816c0714 100644
--- a/benchmark/BDN.benchmark/Resp/RespParseStress.cs
+++ b/benchmark/BDN.benchmark/Resp/RespParseStress.cs
@@ -62,6 +62,8 @@ public void GlobalSetup()
{
QuietMode = true,
AuthSettings = authSettings,
+ MetricsSamplingFrequency = 5,
+ LatencyMonitor = true,
};
server = new EmbeddedRespServer(opt);
diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs
index 36905847a8..180d44b131 100644
--- a/libs/host/GarnetServer.cs
+++ b/libs/host/GarnetServer.cs
@@ -49,7 +49,7 @@ public class GarnetServer : IDisposable
protected StoreWrapper storeWrapper;
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
- readonly string version = "1.0.22";
+ readonly string version = "1.0.23";
///
/// Resp protocol version
diff --git a/libs/server/Resp/AdminCommands.cs b/libs/server/Resp/AdminCommands.cs
index a3ca0e5083..05cde600c6 100644
--- a/libs/server/Resp/AdminCommands.cs
+++ b/libs/server/Resp/AdminCommands.cs
@@ -22,8 +22,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
private void ProcessAdminCommands(RespCommand command)
{
- hasAdminCommand = true;
-
+ /*
+ * WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
+ */
+ containsSlowCommand = true;
if (_authenticator.CanAuthenticate && !_authenticator.IsAuthenticated)
{
// If the current session is unauthenticated, we stop parsing, because no other commands are allowed
@@ -39,6 +41,7 @@ private void ProcessAdminCommands(RespCommand command)
RespCommand.CONFIG_SET => NetworkCONFIG_SET(),
RespCommand.FAILOVER or
RespCommand.REPLICAOF or
+ RespCommand.MIGRATE or
RespCommand.SECONDARYOF => NetworkProcessClusterCommand(command),
RespCommand.LATENCY_HELP => NetworkLatencyHelp(),
RespCommand.LATENCY_HISTOGRAM => NetworkLatencyHistogram(),
diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs
index c0ffdce91c..0cc8df973f 100644
--- a/libs/server/Resp/RespServerSession.cs
+++ b/libs/server/Resp/RespServerSession.cs
@@ -3,7 +3,6 @@
using System;
using System.Buffers;
-using System.Buffers.Binary;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
@@ -115,8 +114,8 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
///
public IGarnetServer Server { get; set; }
- // Track whether the incoming network batch had some admin command
- bool hasAdminCommand;
+ // Track whether the incoming network batch contains slow commands that should not be counter in NET_RS histogram
+ bool containsSlowCommand;
readonly CustomCommandManagerSession customCommandManagerSession;
@@ -369,10 +368,10 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
{
if (latencyMetrics != null)
{
- if (hasAdminCommand)
+ if (containsSlowCommand)
{
latencyMetrics.StopAndSwitch(LatencyMetricsType.NET_RS_LAT, LatencyMetricsType.NET_RS_LAT_ADMIN);
- hasAdminCommand = false;
+ containsSlowCommand = false;
}
else
latencyMetrics.Stop(LatencyMetricsType.NET_RS_LAT);
@@ -443,6 +442,10 @@ private void ProcessMessages()
SendAndReset();
}
}
+ else
+ {
+ containsSlowCommand = true;
+ }
// Advance read head variables to process the next command
_origReadHead = readHead = endReadHead;
@@ -496,6 +499,10 @@ private bool MakeUpperCase(byte* ptr)
private bool ProcessBasicCommands(RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
+ /*
+ * WARNING: Do not add any command here classified as @slow!
+ * Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
+ */
_ = cmd switch
{
RespCommand.GET => NetworkGET(ref storageApi),
@@ -515,6 +522,7 @@ private bool ProcessBasicCommands(RespCommand cmd, ref TGarnetApi st
RespCommand.SETRANGE => NetworkSetRange(ref storageApi),
RespCommand.GETDEL => NetworkGETDEL(ref storageApi),
RespCommand.APPEND => NetworkAppend(ref storageApi),
+ RespCommand.STRLEN => NetworkSTRLEN(ref storageApi),
RespCommand.INCR => NetworkIncrement(RespCommand.INCR, ref storageApi),
RespCommand.INCRBY => NetworkIncrement(RespCommand.INCRBY, ref storageApi),
RespCommand.DECR => NetworkIncrement(RespCommand.DECR, ref storageApi),
@@ -524,7 +532,7 @@ private bool ProcessBasicCommands(RespCommand cmd, ref TGarnetApi st
RespCommand.BITCOUNT => NetworkStringBitCount(ref storageApi),
RespCommand.BITPOS => NetworkStringBitPosition(ref storageApi),
RespCommand.PUBLISH => NetworkPUBLISH(),
- RespCommand.PING => parseState.Count == 0 ? NetworkPING() : ProcessArrayCommands(cmd, ref storageApi),
+ RespCommand.PING => parseState.Count == 0 ? NetworkPING() : NetworkArrayPING(),
RespCommand.ASKING => NetworkASKING(),
RespCommand.MULTI => NetworkMULTI(),
RespCommand.EXEC => NetworkEXEC(),
@@ -534,22 +542,6 @@ private bool ProcessBasicCommands(RespCommand cmd, ref TGarnetApi st
RespCommand.RUNTXP => NetworkRUNTXP(),
RespCommand.READONLY => NetworkREADONLY(),
RespCommand.READWRITE => NetworkREADWRITE(),
- RespCommand.COMMAND => NetworkCOMMAND(),
- RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
- RespCommand.COMMAND_INFO => NetworkCOMMAND_INFO(),
- RespCommand.ECHO => NetworkECHO(),
- RespCommand.INFO => NetworkINFO(),
- RespCommand.HELLO => NetworkHELLO(),
- RespCommand.TIME => NetworkTIME(),
- RespCommand.FLUSHALL => NetworkFLUSHALL(),
- RespCommand.FLUSHDB => NetworkFLUSHDB(),
- RespCommand.AUTH => NetworkAUTH(),
- RespCommand.MEMORY_USAGE => NetworkMemoryUsage(ref storageApi),
- RespCommand.ACL_CAT => NetworkAclCat(),
- RespCommand.ACL_WHOAMI => NetworkAclWhoAmI(),
- RespCommand.ASYNC => NetworkASYNC(),
- RespCommand.MIGRATE => NetworkProcessClusterCommand(cmd),
-
_ => ProcessArrayCommands(cmd, ref storageApi)
};
@@ -559,6 +551,10 @@ private bool ProcessBasicCommands(RespCommand cmd, ref TGarnetApi st
private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
+ /*
+ * WARNING: Do not add any command here classified as @slow!
+ * Only @fast commands otherwise latency tracking will break for NET_RS (check how containsSlowCommand is used).
+ */
var success = cmd switch
{
RespCommand.MGET => NetworkMGET(ref storageApi),
@@ -569,13 +565,6 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st
RespCommand.WATCH => NetworkWATCH(),
RespCommand.WATCH_MS => NetworkWATCH_MS(),
RespCommand.WATCH_OS => NetworkWATCH_OS(),
- RespCommand.STRLEN => NetworkSTRLEN(ref storageApi),
- RespCommand.PING => NetworkArrayPING(),
- //General key commands
- RespCommand.DBSIZE => NetworkDBSIZE(ref storageApi),
- RespCommand.KEYS => NetworkKEYS(ref storageApi),
- RespCommand.SCAN => NetworkSCAN(ref storageApi),
- RespCommand.TYPE => NetworkTYPE(ref storageApi),
// Pub/sub commands
RespCommand.SUBSCRIBE => NetworkSUBSCRIBE(),
RespCommand.PSUBSCRIBE => NetworkPSUBSCRIBE(),
@@ -676,10 +665,6 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st
RespCommand.SUNIONSTORE => SetUnionStore(ref storageApi),
RespCommand.SDIFF => SetDiff(ref storageApi),
RespCommand.SDIFFSTORE => SetDiffStore(ref storageApi),
- // Script Commands
- RespCommand.SCRIPT => TrySCRIPT(),
- RespCommand.EVAL => TryEVAL(),
- RespCommand.EVALSHA => TryEVALSHA(),
_ => ProcessOtherCommands(cmd, ref storageApi)
};
return success;
@@ -688,7 +673,48 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st
private bool ProcessOtherCommands(RespCommand command, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
- if (command == RespCommand.CLIENT_ID)
+ /*
+ * WARNING: Here is safe to add @slow commands (check how containsSlowCommand is used).
+ */
+ containsSlowCommand = true;
+ var success = command switch
+ {
+ RespCommand.AUTH => NetworkAUTH(),
+ RespCommand.MEMORY_USAGE => NetworkMemoryUsage(ref storageApi),
+ RespCommand.CLIENT_ID => NetworkCLIENTID(),
+ RespCommand.CLIENT_INFO => NetworkCLIENTINFO(),
+ RespCommand.CLIENT_LIST => NetworkCLIENTLIST(),
+ RespCommand.CLIENT_KILL => NetworkCLIENTKILL(),
+ RespCommand.COMMAND => NetworkCOMMAND(),
+ RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
+ RespCommand.COMMAND_INFO => NetworkCOMMAND_INFO(),
+ RespCommand.ECHO => NetworkECHO(),
+ RespCommand.HELLO => NetworkHELLO(),
+ RespCommand.TIME => NetworkTIME(),
+ RespCommand.FLUSHALL => NetworkFLUSHALL(),
+ RespCommand.FLUSHDB => NetworkFLUSHDB(),
+ RespCommand.ACL_CAT => NetworkAclCat(),
+ RespCommand.ACL_WHOAMI => NetworkAclWhoAmI(),
+ RespCommand.ASYNC => NetworkASYNC(),
+ RespCommand.RUNTXP => NetworkRUNTXP(),
+ RespCommand.INFO => NetworkINFO(),
+ RespCommand.CustomTxn => NetworkCustomTxn(),
+ RespCommand.CustomRawStringCmd => NetworkCustomRawStringCmd(ref storageApi),
+ RespCommand.CustomObjCmd => NetworkCustomObjCmd(ref storageApi),
+ RespCommand.CustomProcedure => NetworkCustomProcedure(),
+ //General key commands
+ RespCommand.DBSIZE => NetworkDBSIZE(ref storageApi),
+ RespCommand.KEYS => NetworkKEYS(ref storageApi),
+ RespCommand.SCAN => NetworkSCAN(ref storageApi),
+ RespCommand.TYPE => NetworkTYPE(ref storageApi),
+ // Script Commands
+ RespCommand.SCRIPT => TrySCRIPT(),
+ RespCommand.EVAL => TryEVAL(),
+ RespCommand.EVALSHA => TryEVALSHA(),
+ _ => Process(command)
+ };
+
+ bool NetworkCLIENTID()
{
if (parseState.Count != 0)
{
@@ -700,28 +726,8 @@ private bool ProcessOtherCommands(RespCommand command, ref TGarnetAp
return true;
}
- else if (command == RespCommand.CLIENT_INFO)
- {
- return NetworkCLIENTINFO();
- }
- else if (command == RespCommand.CLIENT_LIST)
- {
- return NetworkCLIENTLIST();
- }
- else if (command == RespCommand.CLIENT_KILL)
- {
- return NetworkCLIENTKILL();
- }
- else if (command == RespCommand.SUBSCRIBE)
- {
- while (!RespWriteUtils.WriteInteger(1, ref dcurr, dend))
- SendAndReset();
- }
- else if (command == RespCommand.RUNTXP)
- {
- return NetworkRUNTXP();
- }
- else if (command == RespCommand.CustomTxn)
+
+ bool NetworkCustomTxn()
{
if (!IsCommandArityValid(currentCustomTransaction.NameStr, parseState.Count))
{
@@ -732,32 +738,10 @@ private bool ProcessOtherCommands(RespCommand command, ref TGarnetAp
// Perform the operation
TryTransactionProc(currentCustomTransaction.id, recvBufferPtr + readHead, recvBufferPtr + endReadHead, customCommandManagerSession.GetCustomTransactionProcedure(currentCustomTransaction.id, txnManager, scratchBufferManager).Item1);
currentCustomTransaction = null;
+ return true;
}
- else if (command == RespCommand.CustomRawStringCmd)
- {
- if (!IsCommandArityValid(currentCustomRawStringCommand.NameStr, parseState.Count))
- {
- currentCustomRawStringCommand = null;
- return true;
- }
-
- // Perform the operation
- TryCustomRawStringCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomRawStringCommand.GetRespCommand(), currentCustomRawStringCommand.expirationTicks, currentCustomRawStringCommand.type, ref storageApi);
- currentCustomRawStringCommand = null;
- }
- else if (command == RespCommand.CustomObjCmd)
- {
- if (!IsCommandArityValid(currentCustomObjectCommand.NameStr, parseState.Count))
- {
- currentCustomObjectCommand = null;
- return true;
- }
- // Perform the operation
- TryCustomObjectCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomObjectCommand.GetRespCommand(), currentCustomObjectCommand.subid, currentCustomObjectCommand.type, ref storageApi);
- currentCustomObjectCommand = null;
- }
- else if (command == RespCommand.CustomProcedure)
+ bool NetworkCustomProcedure()
{
if (!IsCommandArityValid(currentCustomProcedure.NameStr, parseState.Count))
{
@@ -769,12 +753,46 @@ private bool ProcessOtherCommands(RespCommand command, ref TGarnetAp
currentCustomProcedure.CustomProcedureImpl);
currentCustomProcedure = null;
+ return true;
}
- else
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ bool Process(RespCommand command)
{
ProcessAdminCommands(command);
return true;
}
+
+ return success;
+ }
+
+ private bool NetworkCustomRawStringCmd(ref TGarnetApi storageApi)
+ where TGarnetApi : IGarnetApi
+ {
+ if (!IsCommandArityValid(currentCustomRawStringCommand.NameStr, parseState.Count))
+ {
+ currentCustomRawStringCommand = null;
+ return true;
+ }
+
+ // Perform the operation
+ TryCustomRawStringCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomRawStringCommand.GetRespCommand(), currentCustomRawStringCommand.expirationTicks, currentCustomRawStringCommand.type, ref storageApi);
+ currentCustomRawStringCommand = null;
+ return true;
+ }
+
+ bool NetworkCustomObjCmd(ref TGarnetApi storageApi)
+ where TGarnetApi : IGarnetApi
+ {
+ if (!IsCommandArityValid(currentCustomObjectCommand.NameStr, parseState.Count))
+ {
+ currentCustomObjectCommand = null;
+ return true;
+ }
+
+ // Perform the operation
+ TryCustomObjectCommand(recvBufferPtr + readHead, recvBufferPtr + endReadHead, currentCustomObjectCommand.GetRespCommand(), currentCustomObjectCommand.subid, currentCustomObjectCommand.type, ref storageApi);
+ currentCustomObjectCommand = null;
return true;
}