Skip to content

Commit

Permalink
Merge pull request #77 from zmstone/0906-optimize-iops
Browse files Browse the repository at this point in the history
feat: optimize disk buffer writes
  • Loading branch information
zmstone authored Sep 9, 2024
2 parents b136784 + 40446ac commit 40deb8a
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 69 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun).
* `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection
is idle (as in no pending acks from kafka). Default=0 (as in send immediately).

* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write.

* `max_send_ahead`: Number of batches to be sent ahead without receiving ack for
the last request. Must be 0 if messages must be delivered in strict order.

Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
* 4.0.0
- Delete global stats (deprecated since 1.9).
- Move linger delay to front of the buffer queue.
The default value for `max_linger_ms` is `0` as before.
Setting `max_linger_ms=10` will make the disk write batch larger when buffer is configured to disk mode or disk-offload mode.

* 3.0.4
- Upgrade to kafka_protocol-4.1.8
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "4.0.0"},
{vsn, "git"},
{registered, []},
{applications,
[kernel,
Expand Down
24 changes: 21 additions & 3 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@

%% Messaging APIs
-export([send/3,
send_sync/3
send_sync/3,
cast/3
]).

%% Messaging APIs of dynamic producer.
-export([send2/4,
cast2/4,
send_sync2/4,
add_topic/2,
remove_topic/2
Expand Down Expand Up @@ -120,8 +122,8 @@ stop_and_delete_supervised_producers(Producers) ->
%% In case `required_acks' is configured to `none', the callback is evaluated immediately after send.
%% The partition number and the per-partition worker pid are returned in a tuple to caller,
%% so it may use them to correlate the future `AckFun' evaluation.
%% NOTE: This API has no backpressure,
%% high produce rate may cause execussive ram and disk usage.
%% NOTE: This API is blocked until the batch is enqueued to the producer buffer, otherwise no backpressure.
%% High produce rate may cause excessive ram and disk usage.
%% NOTE: In case producers are configured with `required_acks = none',
%% the second arg for callback function will always be `?UNKNOWN_OFFSET' (`-1').
-spec send(producers(), [msg()], ack_fun()) -> {partition(), pid()}.
Expand All @@ -137,6 +139,22 @@ send2(Producers, Topic, Batch, AckFun) ->
ok = wolff_producer:send(ProducerPid, Batch, AckFun),
{Partition, ProducerPid}.

%% @doc Cast a batch to a partition producer.
%% Even less backpressure than `send/3'.
%% It does not wait for the batch to be enqueued to the producer buffer.
-spec cast(producers(), [msg()], ack_fun()) -> {partition(), pid()}.
cast(Producers, Batch, AckFun) ->
{Partition, ProducerPid} = wolff_producers:pick_producer(Producers, Batch),
ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued),
{Partition, ProducerPid}.

%% @doc Topic as argument for dynamic producers, otherwise equivalent to `cast/3'.
-spec cast2(producers(), topic(), [msg()], ack_fun()) -> {partition(), pid()}.
cast2(Producers, Topic, Batch, AckFun) ->
{Partition, ProducerPid} = wolff_producers:pick_producer2(Producers, Topic, Batch),
ok = wolff_producer:send(ProducerPid, Batch, AckFun, no_wait_for_queued),
{Partition, ProducerPid}.

%% @doc Pick a partition producer and send a batch synchronously.
%% Raise error exception in case produce pid is down or when timed out.
%% NOTE: In case producers are configured with `required_acks => none',
Expand Down
Loading

0 comments on commit 40deb8a

Please sign in to comment.