Skip to content

Commit

Permalink
reuse network pool buffers for replication
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Sep 10, 2024
1 parent a8ae7fb commit 9d65b87
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public GarnetClientSession(
this.port = port;

this.usingManagedNetworkBuffers = networkBuffers.IsAllocated;
this.networkBuffers = usingManagedNetworkBuffers ? networkBuffers : networkBuffers.Allocate(logger);
this.networkBuffers = usingManagedNetworkBuffers ? networkBuffers : networkBuffers.Allocate(logger: logger);
this.bufferSizeDigits = NumUtils.NumDigits(this.networkBuffers.sendMinAllocationSize);

this.logger = logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void Dispose()
{
iter?.Dispose();
cts?.Dispose();
garnetClient?.Dispose();
}

public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress)
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao
aofSyncTaskInfo = null;

if (startAddress == 0) startAddress = ReplicationManager.kFirstValidAofAddress;
bool success = false;
var success = false;
var current = clusterProvider.clusterManager.CurrentConfig;
var (address, port) = current.GetWorkerAddressFromNodeId(remoteNodeId);

Expand All @@ -131,7 +131,7 @@ public bool TryAddReplicationTask(string remoteNodeId, long startAddress, out Ao
this,
current.LocalNodeId,
remoteNodeId,
new GarnetClientSession(address, port, new NetworkBuffers(1 << 22).Allocate(logger: logger), clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger),
new GarnetClientSession(address, port, clusterProvider.replicationManager.GetNetworkBuffers, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger),
new CancellationTokenSource(),
startAddress,
logger);
Expand Down
3 changes: 2 additions & 1 deletion libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
{
var opts = clusterProvider.serverOptions;

this.networkBuffers = new NetworkBuffers(1 << 22, 1 << 22).Allocate(logger: logger);
this.networkBuffers = new NetworkBuffers(1 << 22, 1 << 12).Allocate(logger: logger);
this.clusterProvider = clusterProvider;
this.storeWrapper = clusterProvider.storeWrapper;
aofProcessor = new AofProcessor(storeWrapper, recordToAof: false, logger: logger);
Expand Down Expand Up @@ -177,6 +177,7 @@ public void Dispose()
replicationConfigDevice.Dispose();
pool.Free();

aofTaskStore.Dispose();
checkpointStore.WaitForReplicas();
DisposeReplicaSyncSessionTasks();
DisposeConnections();
Expand Down
5 changes: 3 additions & 2 deletions libs/common/NetworkBuffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public NetworkBuffers(int sendBufferPoolSize = 1 << 17, int recvBufferPoolSize =
/// <summary>
/// Allocate network buffer pool
/// </summary>
/// <param name="maxEntriesPerLevel"></param>
/// <param name="logger"></param>
/// <returns></returns>
public NetworkBuffers Allocate(ILogger logger = null)
public NetworkBuffers Allocate(int maxEntriesPerLevel = 16, ILogger logger = null)
{
Debug.Assert(bufferPool == null);

Expand All @@ -64,7 +65,7 @@ public NetworkBuffers Allocate(ILogger logger = null)
var levels = maxSize / minSize;
Debug.Assert(levels > 0);
levels = Math.Max(4, levels);
bufferPool = new LimitedFixedBufferPool(sendMinAllocationSize, numLevels: levels, logger: logger);
bufferPool = new LimitedFixedBufferPool(sendMinAllocationSize, maxEntriesPerLevel: maxEntriesPerLevel, numLevels: levels, logger: logger);
return this;
}

Expand Down

0 comments on commit 9d65b87

Please sign in to comment.