Skip to content

Commit

Permalink
feat: optimize disk buffer writes
Browse files Browse the repository at this point in the history
Previously, the wolff_producer implementation is agreesive
when try to enqueue newly received calls, even when max_linger_ms
is set to non-zero because the linger was implemented from the poping
end of the queue.
This change moves the linger timer to the pushing end of the queue,
that is, the process will delay enqueue to allow a larger collection
of concurrent calls so the write batch towards disk can be larger
  • Loading branch information
zmstone committed Sep 7, 2024
1 parent b136784 commit 71844d1
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 55 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun).
* `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection
is idle (as in no pending acks from kafka). Default=0 (as in send immediately).

* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write.

* `max_send_ahead`: Number of batches to be sent ahead without receiving ack for
the last request. Must be 0 if messages must be delivered in strict order.

Expand Down
145 changes: 98 additions & 47 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
ack_timeout |
max_batch_bytes |
max_linger_ms |
max_linger_bytes |
max_send_ahead |
compression |
drop_if_highmem |
Expand All @@ -60,6 +61,7 @@
ack_timeout => timeout(),
max_batch_bytes => pos_integer(),
max_linger_ms => non_neg_integer(),
max_linger_bytes => non_neg_integer(),
max_send_ahead => non_neg_integer(),
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
Expand All @@ -75,6 +77,7 @@
ack_timeout => timeout(),
max_batch_bytes => pos_integer(),
max_linger_ms => non_neg_integer(),
max_linger_bytes => non_neg_integer(),
max_send_ahead => non_neg_integer(),
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
Expand All @@ -95,7 +98,9 @@
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).
-define(no_queue_ack, no_queue_ack).
-define(no_caller_ack, no_caller_ack).

-define(MAX_LINGER_BYTES, (10 bsl 20)).
-type ack_fun() :: wolff:ack_fun().
-type send_req() :: ?SEND_REQ(pid() | reference(), [wolff:msg()], ack_fun()).
-type sent() :: #{req_ref := reference(),
q_items := [?Q_ITEM(_CallId, _Ts, _Batch)],
q_ack_ref := replayq:ack_ref(),
Expand All @@ -106,7 +111,7 @@
, client_id := wolff:client_id()
, config := config_state()
, conn := undefined | _
, ?linger_expire_timer := false | timer:tref()
, ?linger_expire_timer := false | reference()
, partition := partition()
, pending_acks := #{} % CallId => AckCb
, produce_api_vsn := undefined | _
Expand All @@ -115,6 +120,7 @@
, sent_reqs_count := non_neg_integer()
, inflight_calls := non_neg_integer()
, topic := topic()
, calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]}
}.

%% @doc Start a per-partition producer worker.
Expand All @@ -130,6 +136,9 @@
%% exact max allowed message size configured in kafka.
%% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection
%% is idle (as in no pending acks). Default: 0 (as in send immediately)
%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka.
%% If set to 0, `max_batch_bytes' is taken for mem-only mode, otherwise it's 10 times
%% `max_batch_bytes' (but never exceeds 10MB) to optimize disk write.
%% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for
%% the last request. Must be 0 if messages must be delivered in strict order.
%% * `compression': `no_compression', `snappy' or `gzip'.
Expand Down Expand Up @@ -251,7 +260,8 @@ do_init(#{client_id := ClientId,
fun(Meta) -> Meta#{partition_id => Partition} end,
#{partition_id => Partition},
Config0),
Config = maps:without([replayq_dir, replayq_seg_bytes], Config1),
Config2 = resolve_max_linger_bytes(Config1, Q),
Config = maps:without([replayq_dir, replayq_seg_bytes], Config2),
wolff_metrics:queuing_set(Config, replayq:count(Q)),
wolff_metrics:inflight_set(Config, 0),
St#{replayq => Q,
Expand All @@ -263,7 +273,8 @@ do_init(#{client_id := ClientId,
sent_reqs_count => 0,
inflight_calls => 0,
conn := undefined,
client_id => ClientId
client_id => ClientId,
calls => empty
}.

handle_call(stop, From, St) ->
Expand All @@ -275,11 +286,14 @@ handle_call(_Call, _From, St) ->
handle_info({do_init, St0}, _) ->
St = do_init(St0),
{noreply, St};
handle_info(?linger_expire, St) ->
{noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})};
handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) ->
{Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit),
St1 = enqueue_calls(Calls, St0),
handle_info(?linger_expire, St0) ->
St1 = enqueue_calls(St0#{?linger_expire_timer => false}, no_linger),
St = maybe_send_to_kafka(St1),
{noreply, St};
handle_info(?SEND_REQ(_, Batch, _) = Call, #{calls := Calls0} = St0) ->
Bytes = batch_bytes(Batch),
Calls = collect_send_calls(Call, Bytes, Calls0),
St1 = enqueue_calls(St0#{calls => Calls}, maybe_linger),
St = maybe_send_to_kafka(St1),
{noreply, St};
handle_info({msg, Conn, Rsp}, #{conn := Conn} = St0) ->
Expand Down Expand Up @@ -366,11 +380,26 @@ ensure_ts(Batch) ->
make_call_id(Base) ->
Base + erlang:unique_integer([positive]).

resolve_max_linger_bytes(#{max_linger_bytes := 0,
max_batch_bytes := Mb
} = Config, Q) ->
case replayq:is_mem_only(Q) of
true ->
Config#{max_linger_bytes => Mb};
false ->
%% when disk or offload mode, try to linger with more bytes
%% to optimize disk write performance
Config#{max_linger_bytes => min(?MAX_LINGER_BYTES, Mb * 10)}
end;
resolve_max_linger_bytes(Config, _Q) ->
Config.

use_defaults(Config) ->
use_defaults(Config, [{required_acks, all_isr},
{ack_timeout, 10000},
{max_batch_bytes, ?WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES},
{max_linger_ms, 0},
{max_linger_bytes, 0},
{max_send_ahead, 0},
{compression, no_compression},
{reconnect_delay_ms, 2000}
Expand Down Expand Up @@ -451,38 +480,17 @@ maybe_send_to_kafka(#{conn := Conn, replayq := Q} = St) ->

maybe_send_to_kafka_has_pending(St) ->
case is_send_ahead_allowed(St) of
true -> maybe_send_to_kafka_now(St);
true -> send_to_kafka(St);
false -> St %% reached max inflight limit
end.

maybe_send_to_kafka_now(#{?linger_expire_timer := LTimer,
replayq := Q,
config := #{max_linger_ms := Max}} = St) ->
First = replayq:peek(Q),
LingerTimeout = Max - (now_ts() - get_item_ts(First)),
case LingerTimeout =< 0 of
true ->
%% the oldest item is too old, send now
send_to_kafka(St); %% send now
false when is_reference(LTimer) ->
%% timer already started
St;
false ->
%% delay send, try to accumulate more into the batch
Ref = erlang:send_after(LingerTimeout, self(), ?linger_expire),
St#{?linger_expire_timer := Ref}
end.

send_to_kafka(#{sent_reqs := SentReqs,
sent_reqs_count := SentReqsCount,
inflight_calls := InflightCalls,
replayq := Q,
config := #{max_batch_bytes := BytesLimit} = Config,
conn := Conn,
?linger_expire_timer := LTimer
conn := Conn
} = St0) ->
%% timer might have already expired, but should do no harm
is_reference(LTimer) andalso erlang:cancel_timer(LTimer),
{NewQ, QAckRef, Items} =
replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}),
wolff_metrics:queuing_set(Config, replayq:count(NewQ)),
Expand All @@ -491,7 +499,7 @@ send_to_kafka(#{sent_reqs := SentReqs,
NewInflightCalls = InflightCalls + NrOfCalls,
_ = wolff_metrics:inflight_set(Config, NewInflightCalls),
#kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0),
St1 = St0#{replayq := NewQ, ?linger_expire_timer := false},
St1 = St0#{replayq := NewQ},
Sent = #{req_ref => Ref,
q_items => Items,
q_ack_ref => QAckRef,
Expand Down Expand Up @@ -534,8 +542,6 @@ queue_item_marshaller(?Q_ITEM(_, _, _) = I) ->
queue_item_marshaller(Bin) when is_binary(Bin) ->
binary_to_term(Bin).

get_item_ts(?Q_ITEM(_, Ts, _)) -> Ts.

get_produce_version(#{conn := Conn} = St) when is_pid(Conn) ->
Vsn = case kpro:get_api_vsn_range(Conn, produce) of
{ok, {_Min, Max}} -> Max;
Expand Down Expand Up @@ -825,24 +831,70 @@ zz(I) -> (I bsl 1) bxor (I bsr 63).
request_async(Conn, Req) when is_pid(Conn) ->
ok = kpro:send(Conn, Req).

collect_send_calls(Call, Bytes, empty) ->
Init = #{ts => now_ts(), bytes => 0, batch_r => []},
collect_send_calls(Call, Bytes, Init);
collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR}) ->
_ = maybe_reply_queued(Call),
collect_send_calls2(#{ts => Ts, bytes => Bytes0 + Bytes, batch_r => [Call | BatchR]}).

%% Collect all send requests which are already in process mailbox
collect_send_calls(Calls, Size, Limit) when Size >= Limit ->
{lists:reverse(Calls), Size};
collect_send_calls(Calls, Size, Limit) ->
collect_send_calls2(Calls) ->
receive
?SEND_REQ(_, Batch, _) = Call ->
collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit)
Bytes = batch_bytes(Batch),
collect_send_calls(Call, Bytes, Calls)
after
0 ->
{lists:reverse(Calls), Size}
Calls
end.

enqueue_calls(Calls, #{replayq := Q,
pending_acks := PendingAcks0,
call_id_base := CallIdBase,
partition := Partition,
config := Config0
} = St0) ->
ensure_linger_expire_timer_start(#{?linger_expire_timer := false} = St, Timeout) ->
%% delay enqueue, try to accumulate more into the batch
Ref = erlang:send_after(Timeout, self(), ?linger_expire),
St#{?linger_expire_timer := Ref};
ensure_linger_expire_timer_start(St, _Timeout) ->
%% timer is already started
St.

ensure_linger_expire_timer_cancel(#{?linger_expire_timer := LTimer} = St) ->
_ = is_reference(LTimer) andalso erlang:cancel_timer(LTimer),
St#{?linger_expire_timer => false}.

%% check if the call collection should continue to linger before enqueue
is_linger_continue(#{calls := Calls, config := Config}) ->
#{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config,
#{ts := Ts, bytes := Bytes} = Calls,
TimeLeft = MaxLingerMs - (now_ts() - Ts),
case TimeLeft =< 0 of
true ->
false;
false ->
(Bytes < MaxLingerBytes) andalso {true, TimeLeft}
end.

enqueue_calls(#{calls := empty} = St, _) ->
%% no call to enqueue
St;
enqueue_calls(St, maybe_linger) ->
case is_linger_continue(St) of
{true, Timeout} ->
ensure_linger_expire_timer_start(St, Timeout);
false ->
enqueue_calls(St, no_linger)
end;
enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) ->
Calls = lists:reverse(CallsR),
St = ensure_linger_expire_timer_cancel(St0),
enqueue_calls2(Calls, St#{calls => empty}).

enqueue_calls2(Calls,
#{replayq := Q,
pending_acks := PendingAcks0,
call_id_base := CallIdBase,
partition := Partition,
config := Config0
} = St0) ->
{QueueItems, PendingAcks, CallByteSize} =
lists:foldl(
fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) ->
Expand All @@ -856,7 +908,6 @@ enqueue_calls(Calls, #{replayq := Q,
end, {[], PendingAcks0, 0}, Calls),
NewQ = replayq:append(Q, lists:reverse(QueueItems)),
wolff_metrics:queuing_set(Config0, replayq:count(NewQ)),
lists:foreach(fun maybe_reply_queued/1, Calls),
Overflow = case maps:get(drop_if_highmem, Config0, false)
andalso replayq:is_mem_only(NewQ)
andalso load_ctl:is_high_mem() of
Expand Down
2 changes: 1 addition & 1 deletion src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pick_next_alive(LookupFn, Partition, Count) ->
pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1).

pick_next_alive(_LookupFn, _Partition, Count, Count) ->
throw(#{cause => all_producers_down});
throw(#{cause => all_producers_down, count => Count});
pick_next_alive(LookupFn, Partition, Count, Tried) ->
Pid = LookupFn(Partition),
case is_alive(Pid) of
Expand Down
21 changes: 14 additions & 7 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,25 @@ replayq_overflow_test() ->
Msg = #{key => <<>>, value => <<"12345">>},
Batch = [Msg, Msg],
BatchSize = wolff_producer:batch_bytes(Batch),
ProducerCfg = #{max_batch_bytes => 1, %% make sure not collecting calls into one batch
LingerMs = 100,
ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time
replayq_max_total_bytes => BatchSize,
required_acks => all_isr,
max_linger_ms => 1000 %% do not send to kafka immediately
max_linger_ms => LingerMs, %% delay enqueue
max_linger_bytes => BatchSize + 1 %% delay enqueue
},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Pid = wolff_producers:lookup_producer(Producers, 0),
?assert(is_process_alive(Pid)),
TesterPid = self(),
AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end,
AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end,
SendF = fun(AckFun) -> wolff:send(Producers, Batch, AckFun) end,
%% send two batches to overflow one
spawn(fun() -> SendF(AckFun1) end),
proc_lib:spawn_link(fun() -> SendF(AckFun1) end),
timer:sleep(1), %% ensure order
spawn(fun() -> SendF(AckFun2) end),
proc_lib:spawn_link(fun() -> SendF(AckFun2) end),
timer:sleep(LingerMs * 2),
try
%% pushed out of replayq due to overflow
receive
Expand All @@ -363,7 +368,7 @@ replayq_overflow_test() ->
[1] = get_telemetry_seq(CntrEventsTable, [wolff, dropped_queue_full]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0,1,2,1,0]),
[0,2,1,0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0,1,0]),
Expand Down Expand Up @@ -610,8 +615,10 @@ test_message_too_large() ->
%% then it should retry sending one message at a time
ProducerCfg = #{partitioner => fun(_, _) -> 0 end,
max_batch_bytes => MaxMessageBytes * 3,
%% ensure batching
max_linger_ms => 100
%% ensure batching by delay enqueue by 100 seconds
max_linger_ms => 100,
%% ensure linger is not expired by reaching size
max_linger_bytes => MaxMessageBytes * 100
},
{ok, Producers} = wolff:start_producers(Client, TopicBin, ProducerCfg),
MaxBytesCompensateOverhead = MaxMessageBytes - ?BATCHING_OVERHEAD - 7,
Expand Down

0 comments on commit 71844d1

Please sign in to comment.