Skip to content

Commit

Permalink
feat: optimize disk buffer writes
Browse files Browse the repository at this point in the history
Previously, the wolff_producer implementation is aggressive
when try to enqueue newly received calls, even when max_linger_ms
is set to non-zero because the linger was implemented from the popping
end of the queue.
This change moves the linger timer to the pushing end of the queue,
that is, the process will delay enqueue to allow a larger collection
of concurrent calls so the write batch towards disk can be larger
  • Loading branch information
zmstone committed Sep 7, 2024
1 parent b136784 commit bcfceda
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 56 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun).
* `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection
is idle (as in no pending acks from kafka). Default=0 (as in send immediately).

* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write.

* `max_send_ahead`: Number of batches to be sent ahead without receiving ack for
the last request. Must be 0 if messages must be delivered in strict order.

Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
* 4.0.0
- Delete global stats (deprecated since 1.9).
- Move linger delay to front of the buffer queue.
The default value for `max_linger_ms` is `0` as before.
Setting `max_linger_ms=10` will make the disk write batch larger when buffer is configured to disk mode or disk-offload mode.

* 3.0.4
- Upgrade to kafka_protocol-4.1.8
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "4.0.0"},
{vsn, "git"},
{registered, []},
{applications,
[kernel,
Expand Down
143 changes: 97 additions & 46 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
ack_timeout |
max_batch_bytes |
max_linger_ms |
max_linger_bytes |
max_send_ahead |
compression |
drop_if_highmem |
Expand All @@ -60,6 +61,7 @@
ack_timeout => timeout(),
max_batch_bytes => pos_integer(),
max_linger_ms => non_neg_integer(),
max_linger_bytes => non_neg_integer(),
max_send_ahead => non_neg_integer(),
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
Expand All @@ -75,6 +77,7 @@
ack_timeout => timeout(),
max_batch_bytes => pos_integer(),
max_linger_ms => non_neg_integer(),
max_linger_bytes => non_neg_integer(),
max_send_ahead => non_neg_integer(),
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
Expand All @@ -95,7 +98,9 @@
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).
-define(no_queue_ack, no_queue_ack).
-define(no_caller_ack, no_caller_ack).

-define(MAX_LINGER_BYTES, (10 bsl 20)).
-type ack_fun() :: wolff:ack_fun().
-type send_req() :: ?SEND_REQ(pid() | reference(), [wolff:msg()], ack_fun()).
-type sent() :: #{req_ref := reference(),
q_items := [?Q_ITEM(_CallId, _Ts, _Batch)],
q_ack_ref := replayq:ack_ref(),
Expand All @@ -106,7 +111,7 @@
, client_id := wolff:client_id()
, config := config_state()
, conn := undefined | _
, ?linger_expire_timer := false | timer:tref()
, ?linger_expire_timer := false | reference()
, partition := partition()
, pending_acks := #{} % CallId => AckCb
, produce_api_vsn := undefined | _
Expand All @@ -115,6 +120,7 @@
, sent_reqs_count := non_neg_integer()
, inflight_calls := non_neg_integer()
, topic := topic()
, calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]}
}.

%% @doc Start a per-partition producer worker.
Expand All @@ -130,6 +136,9 @@
%% exact max allowed message size configured in kafka.
%% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection
%% is idle (as in no pending acks). Default: 0 (as in send immediately)
%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka.
%% If set to 0, `max_batch_bytes' is taken for mem-only mode, otherwise it's 10 times
%% `max_batch_bytes' (but never exceeds 10MB) to optimize disk write.
%% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for
%% the last request. Must be 0 if messages must be delivered in strict order.
%% * `compression': `no_compression', `snappy' or `gzip'.
Expand Down Expand Up @@ -251,7 +260,8 @@ do_init(#{client_id := ClientId,
fun(Meta) -> Meta#{partition_id => Partition} end,
#{partition_id => Partition},
Config0),
Config = maps:without([replayq_dir, replayq_seg_bytes], Config1),
Config2 = resolve_max_linger_bytes(Config1, Q),
Config = maps:without([replayq_dir, replayq_seg_bytes], Config2),
wolff_metrics:queuing_set(Config, replayq:count(Q)),
wolff_metrics:inflight_set(Config, 0),
St#{replayq => Q,
Expand All @@ -263,7 +273,8 @@ do_init(#{client_id := ClientId,
sent_reqs_count => 0,
inflight_calls => 0,
conn := undefined,
client_id => ClientId
client_id => ClientId,
calls => empty
}.

handle_call(stop, From, St) ->
Expand All @@ -275,11 +286,14 @@ handle_call(_Call, _From, St) ->
handle_info({do_init, St0}, _) ->
St = do_init(St0),
{noreply, St};
handle_info(?linger_expire, St) ->
{noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})};
handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) ->
{Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit),
St1 = enqueue_calls(Calls, St0),
handle_info(?linger_expire, St0) ->
St1 = enqueue_calls(St0#{?linger_expire_timer => false}, no_linger),
St = maybe_send_to_kafka(St1),
{noreply, St};
handle_info(?SEND_REQ(_, Batch, _) = Call, #{calls := Calls0} = St0) ->
Bytes = batch_bytes(Batch),
Calls = collect_send_calls(Call, Bytes, Calls0),
St1 = enqueue_calls(St0#{calls => Calls}, maybe_linger),
St = maybe_send_to_kafka(St1),
{noreply, St};
handle_info({msg, Conn, Rsp}, #{conn := Conn} = St0) ->
Expand Down Expand Up @@ -366,11 +380,26 @@ ensure_ts(Batch) ->
make_call_id(Base) ->
Base + erlang:unique_integer([positive]).

resolve_max_linger_bytes(#{max_linger_bytes := 0,
max_batch_bytes := Mb
} = Config, Q) ->
case replayq:is_mem_only(Q) of
true ->
Config#{max_linger_bytes => Mb};
false ->
%% when disk or offload mode, try to linger with more bytes
%% to optimize disk write performance
Config#{max_linger_bytes => min(?MAX_LINGER_BYTES, Mb * 10)}
end;
resolve_max_linger_bytes(Config, _Q) ->
Config.

use_defaults(Config) ->
use_defaults(Config, [{required_acks, all_isr},
{ack_timeout, 10000},
{max_batch_bytes, ?WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES},
{max_linger_ms, 0},
{max_linger_bytes, 0},
{max_send_ahead, 0},
{compression, no_compression},
{reconnect_delay_ms, 2000}
Expand Down Expand Up @@ -451,38 +480,17 @@ maybe_send_to_kafka(#{conn := Conn, replayq := Q} = St) ->

maybe_send_to_kafka_has_pending(St) ->
case is_send_ahead_allowed(St) of
true -> maybe_send_to_kafka_now(St);
true -> send_to_kafka(St);
false -> St %% reached max inflight limit
end.

maybe_send_to_kafka_now(#{?linger_expire_timer := LTimer,
replayq := Q,
config := #{max_linger_ms := Max}} = St) ->
First = replayq:peek(Q),
LingerTimeout = Max - (now_ts() - get_item_ts(First)),
case LingerTimeout =< 0 of
true ->
%% the oldest item is too old, send now
send_to_kafka(St); %% send now
false when is_reference(LTimer) ->
%% timer already started
St;
false ->
%% delay send, try to accumulate more into the batch
Ref = erlang:send_after(LingerTimeout, self(), ?linger_expire),
St#{?linger_expire_timer := Ref}
end.

send_to_kafka(#{sent_reqs := SentReqs,
sent_reqs_count := SentReqsCount,
inflight_calls := InflightCalls,
replayq := Q,
config := #{max_batch_bytes := BytesLimit} = Config,
conn := Conn,
?linger_expire_timer := LTimer
conn := Conn
} = St0) ->
%% timer might have already expired, but should do no harm
is_reference(LTimer) andalso erlang:cancel_timer(LTimer),
{NewQ, QAckRef, Items} =
replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
Expand All @@ -491,7 +499,7 @@ send_to_kafka(#{sent_reqs := SentReqs,
NewInflightCalls = InflightCalls + NrOfCalls,
_ = wolff_metrics:inflight_set(Config, NewInflightCalls),
#kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0),
St1 = St0#{replayq := NewQ, ?linger_expire_timer := false},
St1 = St0#{replayq := NewQ},
Sent = #{req_ref => Ref,
q_items => Items,
q_ack_ref => QAckRef,
Expand Down Expand Up @@ -534,8 +542,6 @@ queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
queue_item_marshaller(Bin) when is_binary(Bin) ->
binary_to_term(Bin).

get_item_ts(?Q_ITEM(_, Ts, _)) -> Ts.

get_produce_version(#{conn := Conn} = St) when is_pid(Conn) ->
Vsn = case kpro:get_api_vsn_range(Conn, produce) of
{ok, {_Min, Max}} -> Max;
Expand Down Expand Up @@ -825,24 +831,69 @@ zz(I) -> (I bsl 1) bxor (I bsr 63).
request_async(Conn, Req) when is_pid(Conn) ->
ok = kpro:send(Conn, Req).

collect_send_calls(Call, Bytes, empty) ->
Init = #{ts => now_ts(), bytes => 0, batch_r => []},
collect_send_calls(Call, Bytes, Init);
collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR}) ->
collect_send_calls2(#{ts => Ts, bytes => Bytes0 + Bytes, batch_r => [Call | BatchR]}).

%% Collect all send requests which are already in process mailbox
collect_send_calls(Calls, Size, Limit) when Size >= Limit ->
{lists:reverse(Calls), Size};
collect_send_calls(Calls, Size, Limit) ->
collect_send_calls2(Calls) ->
receive
?SEND_REQ(_, Batch, _) = Call ->
collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit)
Bytes = batch_bytes(Batch),
collect_send_calls(Call, Bytes, Calls)
after
0 ->
{lists:reverse(Calls), Size}
Calls
end.

enqueue_calls(Calls, #{replayq := Q,
pending_acks := PendingAcks0,
call_id_base := CallIdBase,
partition := Partition,
config := Config0
} = St0) ->
ensure_linger_expire_timer_start(#{?linger_expire_timer := false} = St, Timeout) ->
%% delay enqueue, try to accumulate more into the batch
Ref = erlang:send_after(Timeout, self(), ?linger_expire),
St#{?linger_expire_timer := Ref};
ensure_linger_expire_timer_start(St, _Timeout) ->
%% timer is already started
St.

ensure_linger_expire_timer_cancel(#{?linger_expire_timer := LTimer} = St) ->
_ = is_reference(LTimer) andalso erlang:cancel_timer(LTimer),
St#{?linger_expire_timer => false}.

%% check if the call collection should continue to linger before enqueue
is_linger_continue(#{calls := Calls, config := Config}) ->
#{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config,
#{ts := Ts, bytes := Bytes} = Calls,
TimeLeft = MaxLingerMs - (now_ts() - Ts),
case TimeLeft =< 0 of
true ->
false;
false ->
(Bytes < MaxLingerBytes) andalso {true, TimeLeft}
end.

enqueue_calls(#{calls := empty} = St, _) ->
%% no call to enqueue
St;
enqueue_calls(St, maybe_linger) ->
case is_linger_continue(St) of
{true, Timeout} ->
ensure_linger_expire_timer_start(St, Timeout);
false ->
enqueue_calls(St, no_linger)
end;
enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) ->
Calls = lists:reverse(CallsR),
St = ensure_linger_expire_timer_cancel(St0),
enqueue_calls2(Calls, St#{calls => empty}).

enqueue_calls2(Calls,
#{replayq := Q,
pending_acks := PendingAcks0,
call_id_base := CallIdBase,
partition := Partition,
config := Config0
} = St0) ->
{QueueItems, PendingAcks, CallByteSize} =
lists:foldl(
fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) ->
Expand Down
2 changes: 1 addition & 1 deletion src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pick_next_alive(LookupFn, Partition, Count) ->
pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1).

pick_next_alive(_LookupFn, _Partition, Count, Count) ->
throw(#{cause => all_producers_down});
throw(#{cause => all_producers_down, count => Count});
pick_next_alive(LookupFn, Partition, Count, Tried) ->
Pid = LookupFn(Partition),
case is_alive(Pid) of
Expand Down
23 changes: 15 additions & 8 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,25 @@ replayq_overflow_test() ->
Msg = #{key => <<>>, value => <<"12345">>},
Batch = [Msg, Msg],
BatchSize = wolff_producer:batch_bytes(Batch),
ProducerCfg = #{max_batch_bytes => 1, %% make sure not collecting calls into one batch
LingerMs = 100,
ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time
replayq_max_total_bytes => BatchSize,
required_acks => all_isr,
max_linger_ms => 1000 %% do not send to kafka immediately
max_linger_ms => LingerMs, %% delay enqueue
max_linger_bytes => BatchSize + 1 %% delay enqueue
},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Pid = wolff_producers:lookup_producer(Producers, 0),
?assert(is_process_alive(Pid)),
TesterPid = self(),
AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end,
AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end,
SendF = fun(AckFun) -> wolff:send(Producers, Batch, AckFun) end,
%% send two batches to overflow one
spawn(fun() -> SendF(AckFun1) end),
proc_lib:spawn_link(fun() -> SendF(AckFun1) end),
timer:sleep(1), %% ensure order
spawn(fun() -> SendF(AckFun2) end),
proc_lib:spawn_link(fun() -> SendF(AckFun2) end),
timer:sleep(LingerMs * 2),
try
%% pushed out of replayq due to overflow
receive
Expand All @@ -363,7 +368,7 @@ replayq_overflow_test() ->
[1] = get_telemetry_seq(CntrEventsTable, [wolff, dropped_queue_full]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0,1,2,1,0]),
[0,2,1,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -610,8 +615,10 @@ test_message_too_large() ->
%% then it should retry sending one message at a time
ProducerCfg = #{partitioner => fun(_, _) -> 0 end,
max_batch_bytes => MaxMessageBytes * 3,
%% ensure batching
max_linger_ms => 100
%% ensure batching by delay enqueue by 100 seconds
max_linger_ms => 100,
%% ensure linger is not expired by reaching size
max_linger_bytes => MaxMessageBytes * 100
},
{ok, Producers} = wolff:start_producers(Client, TopicBin, ProducerCfg),
MaxBytesCompensateOverhead = MaxMessageBytes - ?BATCHING_OVERHEAD - 7,
Expand All @@ -620,7 +627,7 @@ test_message_too_large() ->
Ref = make_ref(),
Self = self(),
AckFun = {fun ?MODULE:ack_cb/4, [Self, Ref]},
_ = wolff:send(Producers, Batch, AckFun),
spawn(fun() -> wolff:send(Producers, Batch, AckFun) end),
fun() -> ?WAIT(5000, {ack, Ref, _Partition, BaseOffset}, BaseOffset) end
end,
%% Must be ok to send one message
Expand Down

0 comments on commit bcfceda

Please sign in to comment.