From 29aa3455a883c54e6ab0f26e0c47f27bd1e8ff12 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Fri, 30 Aug 2024 13:49:34 -0700 Subject: [PATCH] reuse buffer for replication communication --- libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs | 2 +- .../Server/Replication/PrimaryOps/ReplicaSyncSession.cs | 2 +- .../Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs | 2 +- libs/cluster/Server/Replication/ReplicationManager.cs | 7 +++++++ 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs b/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs index 24fe3b5f54..5645eae113 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs @@ -131,7 +131,7 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao this, current.LocalNodeId, remoteNodeId, - new GarnetClientSession(address, port, networkBuffers: new NetworkBuffers(1 << 22), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger), + new GarnetClientSession(address, port, new NetworkBuffers(1 << 22), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger), new CancellationTokenSource(), startAddress, logger); diff --git a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs index 676ddce947..d98c5e3029 100644 --- a/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs +++ b/libs/cluster/Server/Replication/PrimaryOps/ReplicaSyncSession.cs @@ -60,7 +60,7 @@ public async Task SendCheckpoint() return false; } - GarnetClientSession gcs = new(address, port, new NetworkBuffers(1 << 20), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger); + GarnetClientSession gcs = new(address, port, clusterProvider.replicationManager.GetNetworkBuffers, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger); CheckpointEntry localEntry = default; AofSyncTaskInfo aofSyncTaskInfo = null; diff --git a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs index 08e0710eb4..227da11830 100644 --- a/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs +++ b/libs/cluster/Server/Replication/ReplicaOps/ReplicaReceiveCheckpoint.cs @@ -147,7 +147,7 @@ private async Task InitiateReplicaSync() try { - gcs = new(address, port, new NetworkBuffers(1 << 21), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword); + gcs = new(address, port, new NetworkBuffers(1 << 12), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword); recvCheckpointHandler = new ReceiveCheckpointHandler(clusterProvider, logger); gcs.Connect(); diff --git a/libs/cluster/Server/Replication/ReplicationManager.cs b/libs/cluster/Server/Replication/ReplicationManager.cs index 56c191b58a..c9aee022ab 100644 --- a/libs/cluster/Server/Replication/ReplicationManager.cs +++ b/libs/cluster/Server/Replication/ReplicationManager.cs @@ -22,6 +22,10 @@ internal sealed partial class ReplicationManager : IDisposable readonly CancellationTokenSource ctsRepManager = new(); + readonly NetworkBuffers networkBuffers; + + public NetworkBuffers GetNetworkBuffers => networkBuffers; + readonly ILogger logger; bool _disposed; @@ -89,6 +93,8 @@ public long GetCurrentSafeAofAddress() public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null) { var opts = clusterProvider.serverOptions; + + this.networkBuffers = new NetworkBuffers(new LimitedFixedBufferPool(1 << 22, logger: logger), new LimitedFixedBufferPool(1 << 22, logger: logger)); this.clusterProvider = clusterProvider; this.storeWrapper = clusterProvider.storeWrapper; aofProcessor = new AofProcessor(storeWrapper, recordToAof: false, logger: logger); @@ -177,6 +183,7 @@ public void Dispose() replicaSyncSessionTaskStore.Dispose(); ctsRepManager.Dispose(); aofProcessor?.Dispose(); + networkBuffers.Dispose(); } public void DisposeConnections()