Skip to content

Commit

Permalink
reuse buffer for replication communication
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Sep 3, 2024
1 parent 69316e8 commit 29aa345
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 3 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao
this,
current.LocalNodeId,
remoteNodeId,
new GarnetClientSession(address, port, networkBuffers: new NetworkBuffers(1 << 22), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger),
new GarnetClientSession(address, port, new NetworkBuffers(1 << 22), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger),
new CancellationTokenSource(),
startAddress,
logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<bool> SendCheckpoint()
return false;
}

GarnetClientSession gcs = new(address, port, new NetworkBuffers(1 << 20), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger);
GarnetClientSession gcs = new(address, port, clusterProvider.replicationManager.GetNetworkBuffers, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger);
CheckpointEntry localEntry = default;
AofSyncTaskInfo aofSyncTaskInfo = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private async Task<string> InitiateReplicaSync()

try
{
gcs = new(address, port, new NetworkBuffers(1 << 21), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword);
gcs = new(address, port, new NetworkBuffers(1 << 12), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword);
recvCheckpointHandler = new ReceiveCheckpointHandler(clusterProvider, logger);
gcs.Connect();

Expand Down
7 changes: 7 additions & 0 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ internal sealed partial class ReplicationManager : IDisposable

readonly CancellationTokenSource ctsRepManager = new();

readonly NetworkBuffers networkBuffers;

public NetworkBuffers GetNetworkBuffers => networkBuffers;

readonly ILogger logger;
bool _disposed;

Expand Down Expand Up @@ -89,6 +93,8 @@ public long GetCurrentSafeAofAddress()
public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null)
{
var opts = clusterProvider.serverOptions;

this.networkBuffers = new NetworkBuffers(new LimitedFixedBufferPool(1 << 22, logger: logger), new LimitedFixedBufferPool(1 << 22, logger: logger));
this.clusterProvider = clusterProvider;
this.storeWrapper = clusterProvider.storeWrapper;
aofProcessor = new AofProcessor(storeWrapper, recordToAof: false, logger: logger);
Expand Down Expand Up @@ -177,6 +183,7 @@ public void Dispose()
replicaSyncSessionTaskStore.Dispose();
ctsRepManager.Dispose();
aofProcessor?.Dispose();
networkBuffers.Dispose();
}

public void DisposeConnections()
Expand Down

0 comments on commit 29aa345

Please sign in to comment.