Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Large Buffers in MigrateSession #623

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b2365e2
add buffer pool to migration manager
vazois Aug 28, 2024
5cb9824
expose separate receive and send buffers from NetworkHandler
vazois Aug 28, 2024
137ed1e
add network buffers wrapper to gcs
vazois Aug 29, 2024
0f0c9c2
push NetworkBuffers to NetworkHandler
vazois Aug 29, 2024
ae4ee08
add networkBuffers instance in migration manager
vazois Aug 30, 2024
5660bbf
add constructor
vazois Aug 30, 2024
466285b
reuse buffer for replication communication
vazois Aug 30, 2024
38aa84d
consolidate network buffer pool
vazois Sep 4, 2024
7316873
nit
vazois Sep 5, 2024
96fbaa8
add gc collect for migration buffer pool
vazois Sep 6, 2024
1f27986
add ACL test for MIGRATEGC
vazois Sep 6, 2024
d4a04ca
make migrategc management commands
vazois Sep 9, 2024
ebcd0e3
add verbose logging for IOCallback
vazois Sep 10, 2024
88d90ed
expose maxEntries from NetworkBuffers.Allocate
vazois Sep 10, 2024
114c8ba
configure GarnetClient for Failover
vazois Sep 10, 2024
09bdac8
cleanup ReplicationManager
vazois Sep 10, 2024
c55512f
Cancel cts before disposing in AofSyncTask
vazois Sep 10, 2024
b4250b3
consolidate buffer pool purge under a single command
vazois Sep 11, 2024
e918085
purge without disposing pool
vazois Sep 11, 2024
5f2a8df
introduce info bpstats metrics
vazois Sep 11, 2024
d9f1129
ensure shared bp uses correct allocation size for send and recv
vazois Sep 11, 2024
663d717
fix bp stats call
vazois Sep 12, 2024
0362142
change format of info bpstats
vazois Sep 12, 2024
016fe16
fix out of bounds level request for LFBP
vazois Sep 17, 2024
fd5e71f
introduce purge for server buffer pool
vazois Sep 17, 2024
9bfe50f
move PURGEBP in server namespace
vazois Sep 17, 2024
5d9409f
fix migrate bench keys option
vazois Sep 17, 2024
dfc86d7
fix formatting
vazois Sep 17, 2024
60e49cb
add outOfBound allocation request metric
vazois Sep 18, 2024
5649cd4
augment migrate bench
vazois Sep 19, 2024
fed65a5
separate pool definition from buffer spec
vazois Sep 19, 2024
d77d1a8
rename to NetworkBufferSettings
vazois Sep 19, 2024
a0dd73f
rename NetworkBuffers to NetworkBufferSettings
vazois Sep 19, 2024
e918347
revert dispose order
vazois Sep 19, 2024
3f8a52e
revert replication networkSettings
vazois Sep 19, 2024
0ade0d9
add timeout to dispose of LFBP
vazois Sep 19, 2024
6174790
add timeout in cluster test TearDown
vazois Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/Resp.benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static void Main(string[] args)

static void WaitForServer(Options opts)
{
using var client = new GarnetClientSession(opts.Address, opts.Port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
using var client = new GarnetClientSession(opts.Address, opts.Port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
while (true)
{
try
Expand Down
12 changes: 8 additions & 4 deletions benchmark/Resp.benchmark/RespOnlineBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void InitializeClients()
{
gcsPool = new AsyncPool<GarnetClientSession>(opts.NumThreads.First(), () =>
{
var c = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
var c = new GarnetClientSession(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (auth != null)
{
Expand Down Expand Up @@ -573,8 +573,8 @@ public async void OpRunnerGarnetClientSession(int thread_id)
client = new GarnetClientSession(
address,
port,
opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null,
bufferSize: Math.Max(bufferSizeValue, opts.ValueLength * opts.IntraThreadParallelism));
new(Math.Max(bufferSizeValue, opts.ValueLength * opts.IntraThreadParallelism)),
tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
Expand Down Expand Up @@ -669,7 +669,11 @@ public async void OpRunnerGarnetClientSessionParallel(int thread_id, int paralle
GarnetClientSession client = null;
if (!opts.Pool)
{
client = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null, null, null, Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength));
client = new GarnetClientSession(
address,
port,
new(Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength)),
vazois marked this conversation as resolved.
Show resolved Hide resolved
tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Resp.benchmark/RespPerfBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private void GarnetClientSessionOperateThreadRunner(int NumOps, OpType opType, R
default:
throw new Exception($"opType: {opType} benchmark not supported with GarnetClientSession!");
}
var c = new GarnetClientSession(opts.Address, opts.Port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
var c = new GarnetClientSession(opts.Address, opts.Port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (opts.Auth != null)
{
Expand Down
4 changes: 2 additions & 2 deletions benchmark/Resp.benchmark/TxnPerfBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void Run()
{
gcsPool = new AsyncPool<GarnetClientSession>(opts.NumThreads.First(), () =>
{
var c = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
var c = new GarnetClientSession(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (auth != null)
{
Expand Down Expand Up @@ -325,7 +325,7 @@ public void OpRunnerSERedis(int thread_id)
public void LoadData()
{
var req = new OnlineReqGen(0, opts.DbSize, true, opts.Zipf, opts.KeyLength, opts.ValueLength);
GarnetClientSession client = new(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
GarnetClientSession client = new(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
Expand Down
56 changes: 44 additions & 12 deletions libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
{
readonly string address;
readonly int port;
readonly int bufferSize;
readonly int bufferSizeDigits;
INetworkSender networkSender;
readonly ElasticCircularBuffer<TaskType> tasksTypes = new();
Expand Down Expand Up @@ -61,8 +60,6 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// </summary>
public bool IsConnected => socket != null && socket.Connected && !Disposed;

readonly LimitedFixedBufferPool networkPool;

/// <summary>
/// Username to authenticate the session on the server.
/// </summary>
Expand All @@ -73,6 +70,21 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// </summary>
readonly string authPassword = null;

/// <summary>
/// Indicating whether this instance is using its own network pool or one that was provided
/// </summary>
readonly bool usingManagedNetworkPool = false;

/// <summary>
/// Instance of network buffer settings describing the send and receive buffer sizes
/// </summary>
readonly NetworkBufferSettings networkBufferSettings;

/// <summary>
/// NetworkPool used to allocate send and receive buffers
/// </summary>
readonly LimitedFixedBufferPool networkPool;

/// <summary>
/// Create client instance
/// </summary>
Expand All @@ -81,16 +93,28 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// <param name="tlsOptions">TLS options</param>
/// <param name="authUsername">Username to authenticate with</param>
/// <param name="authPassword">Password to authenticate with</param>
/// <param name="bufferSize">Network buffer size</param>
/// <param name="networkBufferSettings">Settings for send and receive network buffers</param>
/// <param name="networkSendThrottleMax">Max outstanding network sends allowed</param>
/// <param name="logger">Logger</param>
public GarnetClientSession(string address, int port, SslClientAuthenticationOptions tlsOptions = null, string authUsername = null, string authPassword = null, int bufferSize = 1 << 17, int networkSendThrottleMax = 8, ILogger logger = null)
public GarnetClientSession(
string address,
int port,
NetworkBufferSettings networkBufferSettings,
LimitedFixedBufferPool networkPool = null,
SslClientAuthenticationOptions tlsOptions = null,
string authUsername = null,
string authPassword = null,
int networkSendThrottleMax = 8,
ILogger logger = null)
{
this.networkPool = new LimitedFixedBufferPool(bufferSize, logger: logger);
this.address = address;
this.port = port;
this.bufferSize = bufferSize;
this.bufferSizeDigits = NumUtils.NumDigits(bufferSize);

this.usingManagedNetworkPool = networkPool != null;
this.networkBufferSettings = networkBufferSettings;
this.networkPool = networkPool ?? networkBufferSettings.Create();
this.bufferSizeDigits = NumUtils.NumDigits(this.networkBufferSettings.sendBufferSize);

this.logger = logger;
this.sslOptions = tlsOptions;
this.networkSendThrottleMax = networkSendThrottleMax;
Expand All @@ -107,7 +131,15 @@ public GarnetClientSession(string address, int port, SslClientAuthenticationOpti
public void Connect(int timeoutMs = 0, CancellationToken token = default)
{
socket = GetSendSocket(address, port, timeoutMs);
networkHandler = new GarnetClientSessionTcpNetworkHandler(this, socket, networkPool, sslOptions != null, this, networkSendThrottleMax, logger);
networkHandler = new GarnetClientSessionTcpNetworkHandler(
this,
socket,
networkBufferSettings,
networkPool,
sslOptions != null,
messageConsumer: this,
networkSendThrottleMax: networkSendThrottleMax,
logger: logger);
networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false).GetAwaiter().GetResult();
networkSender = networkHandler.GetNetworkSender();
networkSender.GetResponseObject();
Expand Down Expand Up @@ -159,7 +191,7 @@ public void Dispose()
networkSender?.ReturnResponseObject();
socket?.Dispose();
networkHandler?.Dispose();
networkPool.Dispose();
if (!usingManagedNetworkPool) networkPool.Dispose();
}

/// <summary>
Expand Down Expand Up @@ -259,8 +291,8 @@ public void ExecuteClusterAppendLog(string nodeId, long previousAddress, long cu
}
offset = curr;

if (payloadLength > bufferSize)
throw new Exception($"Payload length {payloadLength} is larger than bufferSize {bufferSize} bytes");
if (payloadLength > networkBufferSettings.sendBufferSize)
throw new Exception($"Payload length {payloadLength} is larger than bufferSize {networkBufferSettings.sendBufferSize} bytes");

while (!RespWriteUtils.WriteBulkString(new Span<byte>((void*)payloadPtr, payloadLength), ref curr, end))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Garnet.client
{
sealed class GarnetClientSessionTcpNetworkHandler : TcpNetworkHandlerBase<GarnetClientSession, GarnetTcpNetworkSender>
{
public GarnetClientSessionTcpNetworkHandler(GarnetClientSession serverHook, Socket socket, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new GarnetTcpNetworkSender(socket, networkPool, networkSendThrottleMax), socket, networkPool, useTLS, messageConsumer, logger)
public GarnetClientSessionTcpNetworkHandler(GarnetClientSession serverHook, Socket socket, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new GarnetTcpNetworkSender(socket, networkBufferSettings, networkPool, networkSendThrottleMax), socket, networkBufferSettings, networkPool, useTLS, messageConsumer: messageConsumer, logger: logger)
{
}

Expand Down
5 changes: 3 additions & 2 deletions libs/client/ClientTcpNetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public class ClientTcpNetworkSender : GarnetTcpNetworkSender
/// </summary>
/// <param name="socket"></param>
/// <param name="callback"></param>
/// <param name="networkBufferSettings"></param>
/// <param name="networkPool"></param>
/// <param name="networkSendThrottleMax"></param>
public ClientTcpNetworkSender(Socket socket, Action<object> callback, LimitedFixedBufferPool networkPool, int networkSendThrottleMax)
: base(socket, networkPool, networkSendThrottleMax)
public ClientTcpNetworkSender(Socket socket, Action<object> callback, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, int networkSendThrottleMax)
: base(socket, networkBufferSettings, networkPool, networkSendThrottleMax)
{
this.callback = callback;
this.reusableSaea = new SimpleObjectPool<SocketAsyncEventArgs>(() =>
Expand Down
7 changes: 5 additions & 2 deletions libs/client/GarnetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public sealed partial class GarnetClient : IServerHook, IMessageConsumer, IDispo
readonly string address;
readonly int port;
readonly int sendPageSize;
readonly int bufferSize;
readonly int maxOutstandingTasks;
NetworkWriter networkWriter;
INetworkSender networkSender;
Expand Down Expand Up @@ -133,6 +134,7 @@ public GarnetClient(
string authUsername = null,
string authPassword = null,
int sendPageSize = 1 << 21,
int bufferSize = 1 << 17,
int maxOutstandingTasks = 1 << 19,
int timeoutMilliseconds = 0,
MemoryPool<byte> memoryPool = null,
Expand All @@ -144,6 +146,7 @@ public GarnetClient(
this.address = address;
this.port = port;
this.sendPageSize = (int)Utility.PreviousPowerOf2(sendPageSize);
this.bufferSize = bufferSize;
this.authUsername = authUsername;
this.authPassword = authPassword;

Expand Down Expand Up @@ -186,7 +189,7 @@ public GarnetClient(
public void Connect(CancellationToken token = default)
{
socket = GetSendSocket(timeoutMilliseconds);
networkWriter = new NetworkWriter(this, socket, 1 << 17, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false).GetAwaiter().GetResult();
networkSender = networkHandler.GetNetworkSender();

Expand Down Expand Up @@ -219,7 +222,7 @@ public void Connect(CancellationToken token = default)
public async Task ConnectAsync(CancellationToken token = default)
{
socket = GetSendSocket(timeoutMilliseconds);
networkWriter = new NetworkWriter(this, socket, 1 << 17, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
await networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false);
networkSender = networkHandler.GetNetworkSender();

Expand Down
4 changes: 2 additions & 2 deletions libs/client/GarnetClientTcpNetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Garnet.client
{
sealed class GarnetClientTcpNetworkHandler : TcpNetworkHandlerBase<GarnetClient, ClientTcpNetworkSender>
{
public GarnetClientTcpNetworkHandler(GarnetClient serverHook, Action<object> callback, Socket socket, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new ClientTcpNetworkSender(socket, callback, networkPool, networkSendThrottleMax), socket, networkPool, useTLS, messageConsumer, logger)
public GarnetClientTcpNetworkHandler(GarnetClient serverHook, Action<object> callback, Socket socket, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new ClientTcpNetworkSender(socket, callback, networkBufferSettings, networkPool, networkSendThrottleMax), socket, networkBufferSettings, networkPool, useTLS, messageConsumer: messageConsumer, logger: logger)
{
}

Expand Down
8 changes: 5 additions & 3 deletions libs/client/NetworkWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ internal sealed class NetworkWriter : IDisposable
/// </summary>
CompletionEvent FlushEvent;

readonly NetworkBufferSettings networkBufferSettings;
readonly LimitedFixedBufferPool networkPool;
readonly GarnetClientTcpNetworkHandler networkHandler;

Expand All @@ -80,10 +81,11 @@ internal sealed class NetworkWriter : IDisposable
/// </summary>
public NetworkWriter(GarnetClient serverHook, Socket socket, int messageBufferSize, SslClientAuthenticationOptions sslOptions, out GarnetClientTcpNetworkHandler networkHandler, int sendPageSize, int networkSendThrottleMax, ILogger logger = null)
{
this.networkPool = new LimitedFixedBufferPool(messageBufferSize, logger: logger);
this.networkBufferSettings = new NetworkBufferSettings(messageBufferSize, messageBufferSize);
this.networkPool = networkBufferSettings.Create(logger: logger);

if (BufferSize > PageOffset.kPageMask) throw new Exception();
this.networkHandler = networkHandler = new GarnetClientTcpNetworkHandler(serverHook, AsyncFlushPageCallback, socket, networkPool, sslOptions != null, serverHook, networkSendThrottleMax, logger);
this.networkHandler = networkHandler = new GarnetClientTcpNetworkHandler(serverHook, AsyncFlushPageCallback, socket, networkBufferSettings, networkPool, sslOptions != null, serverHook, networkSendThrottleMax: networkSendThrottleMax, logger: logger);
networkSender = networkHandler.GetNetworkSender();

FlushEvent.Initialize();
Expand All @@ -109,7 +111,7 @@ public void Dispose()
FlushEvent.Dispose();
epoch.Dispose();
networkHandler.Dispose();
networkPool.Dispose();
networkPool?.Dispose();
}

/// <summary>
Expand Down
13 changes: 13 additions & 0 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ public MetricsItem[] GetGossipStats(bool metricsDisabled)
];
}

public MetricsItem[] GetBufferPoolStats()
=> [new("migration_manager", migrationManager.GetBufferPoolStats()), new("replication_manager", replicationManager.GetBufferPoolStats())];

public void PurgeBufferPool(ManagerType managerType)
{
if (managerType == ManagerType.MM)
migrationManager.Purge();
else if (managerType == ManagerType.RM)
replicationManager.Purge();
else
throw new GarnetException();
}

internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(StoreType storeType)
{
Debug.Assert(serverOptions.EnableCluster);
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public static void IOCallback(this ILogger logger, uint errorCode, uint numBytes
{
if (errorCode != 0)
{
string errorMessage = new Win32Exception((int)errorCode).Message;
logger.LogError("OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
var errorMessage = new Win32Exception((int)errorCode).Message;
logger.LogError("[ClusterUtils] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
}
((SemaphoreSlim)context).Release();
}
Expand Down
2 changes: 2 additions & 0 deletions libs/cluster/Server/Failover/ReplicaFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private GarnetClient CreateConnection(string nodeId)
address,
port,
clusterProvider.serverOptions.TlsOptions?.TlsClientOptions,
sendPageSize: 1 << 17,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the 3rd time (so far) I've seen the magic # 17, so best to make it a const int somewhere

maxOutstandingTasks: 8,
authUsername: clusterProvider.ClusterUsername,
authPassword: clusterProvider.ClusterPassword, logger: logger);

Expand Down
Loading