diff --git a/README.md b/README.md index 54698b9..a783b56 100644 --- a/README.md +++ b/README.md @@ -161,6 +161,8 @@ wolff:send(Producers, [Msg], AckFun). * `max_linger_ms`: Age in milliseconds a batch can stay in queue when the connection is idle (as in no pending acks from kafka). Default=0 (as in send immediately). +* `max_linger_bytes`: Number of bytes to collect before sending it to Kafka. If set to 0, `max_batch_bytes` is taken for mem-only mode, otherwise it's 10 times `max_batch_bytes` (but never exceeds 10MB) to optimize disk write. + * `max_send_ahead`: Number of batches to be sent ahead without receiving ack for the last request. Must be 0 if messages must be delivered in strict order. diff --git a/changelog.md b/changelog.md index 71d5f8e..593256c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,8 @@ * 4.0.0 - Delete global stats (deprecated since 1.9). + - Move linger delay to front of the buffer queue. + The default value for `max_linger_ms` is `0` as before. + Setting `max_linger_ms=10` will make the disk write batch larger when buffer is configured to disk mode or disk-offload mode. * 3.0.4 - Upgrade to kafka_protocol-4.1.8 diff --git a/src/wolff.app.src b/src/wolff.app.src index efb0a11..6d487a1 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "4.0.0"}, + {vsn, "git"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index e3bb5f3..a6c616f 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -45,6 +45,7 @@ ack_timeout | max_batch_bytes | max_linger_ms | + max_linger_bytes | max_send_ahead | compression | drop_if_highmem | @@ -60,6 +61,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -75,6 +77,7 @@ ack_timeout => timeout(), max_batch_bytes => pos_integer(), max_linger_ms => non_neg_integer(), + max_linger_bytes => non_neg_integer(), max_send_ahead => non_neg_integer(), compression => kpro:compress_option(), drop_if_highmem => boolean(), @@ -95,7 +98,9 @@ -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -define(no_queue_ack, no_queue_ack). -define(no_caller_ack, no_caller_ack). - +-define(MAX_LINGER_BYTES, (10 bsl 20)). +-type ack_fun() :: wolff:ack_fun(). +-type send_req() :: ?SEND_REQ(pid() | reference(), [wolff:msg()], ack_fun()). -type sent() :: #{req_ref := reference(), q_items := [?Q_ITEM(_CallId, _Ts, _Batch)], q_ack_ref := replayq:ack_ref(), @@ -106,7 +111,7 @@ , client_id := wolff:client_id() , config := config_state() , conn := undefined | _ - , ?linger_expire_timer := false | timer:tref() + , ?linger_expire_timer := false | reference() , partition := partition() , pending_acks := #{} % CallId => AckCb , produce_api_vsn := undefined | _ @@ -115,6 +120,7 @@ , sent_reqs_count := non_neg_integer() , inflight_calls := non_neg_integer() , topic := topic() + , calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]} }. %% @doc Start a per-partition producer worker. @@ -130,6 +136,9 @@ %% exact max allowed message size configured in kafka. %% * `max_linger_ms': Age in milliseconds a baatch can stay in queue when the connection %% is idle (as in no pending acks). Default: 0 (as in send immediately) +%% * `max_linger_bytes': Number of bytes to collect before sending it to Kafka. +%% If set to 0, `max_batch_bytes' is taken for mem-only mode, otherwise it's 10 times +%% `max_batch_bytes' (but never exceeds 10MB) to optimize disk write. %% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for %% the last request. Must be 0 if messages must be delivered in strict order. %% * `compression': `no_compression', `snappy' or `gzip'. @@ -251,7 +260,8 @@ do_init(#{client_id := ClientId, fun(Meta) -> Meta#{partition_id => Partition} end, #{partition_id => Partition}, Config0), - Config = maps:without([replayq_dir, replayq_seg_bytes], Config1), + Config2 = resolve_max_linger_bytes(Config1, Q), + Config = maps:without([replayq_dir, replayq_seg_bytes], Config2), wolff_metrics:queuing_set(Config, replayq:count(Q)), wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, @@ -263,7 +273,8 @@ do_init(#{client_id := ClientId, sent_reqs_count => 0, inflight_calls => 0, conn := undefined, - client_id => ClientId + client_id => ClientId, + calls => empty }. handle_call(stop, From, St) -> @@ -275,11 +286,14 @@ handle_call(_Call, _From, St) -> handle_info({do_init, St0}, _) -> St = do_init(St0), {noreply, St}; -handle_info(?linger_expire, St) -> - {noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})}; -handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) -> - {Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit), - St1 = enqueue_calls(Calls, St0), +handle_info(?linger_expire, St0) -> + St1 = enqueue_calls(St0#{?linger_expire_timer => false}, no_linger), + St = maybe_send_to_kafka(St1), + {noreply, St}; +handle_info(?SEND_REQ(_, Batch, _) = Call, #{calls := Calls0} = St0) -> + Bytes = batch_bytes(Batch), + Calls = collect_send_calls(Call, Bytes, Calls0), + St1 = enqueue_calls(St0#{calls => Calls}, maybe_linger), St = maybe_send_to_kafka(St1), {noreply, St}; handle_info({msg, Conn, Rsp}, #{conn := Conn} = St0) -> @@ -366,11 +380,26 @@ ensure_ts(Batch) -> make_call_id(Base) -> Base + erlang:unique_integer([positive]). +resolve_max_linger_bytes(#{max_linger_bytes := 0, + max_batch_bytes := Mb + } = Config, Q) -> + case replayq:is_mem_only(Q) of + true -> + Config#{max_linger_bytes => Mb}; + false -> + %% when disk or offload mode, try to linger with more bytes + %% to optimize disk write performance + Config#{max_linger_bytes => min(?MAX_LINGER_BYTES, Mb * 10)} + end; +resolve_max_linger_bytes(Config, _Q) -> + Config. + use_defaults(Config) -> use_defaults(Config, [{required_acks, all_isr}, {ack_timeout, 10000}, {max_batch_bytes, ?WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES}, {max_linger_ms, 0}, + {max_linger_bytes, 0}, {max_send_ahead, 0}, {compression, no_compression}, {reconnect_delay_ms, 2000} @@ -451,38 +480,17 @@ maybe_send_to_kafka(#{conn := Conn, replayq := Q} = St) -> maybe_send_to_kafka_has_pending(St) -> case is_send_ahead_allowed(St) of - true -> maybe_send_to_kafka_now(St); + true -> send_to_kafka(St); false -> St %% reached max inflight limit end. -maybe_send_to_kafka_now(#{?linger_expire_timer := LTimer, - replayq := Q, - config := #{max_linger_ms := Max}} = St) -> - First = replayq:peek(Q), - LingerTimeout = Max - (now_ts() - get_item_ts(First)), - case LingerTimeout =< 0 of - true -> - %% the oldest item is too old, send now - send_to_kafka(St); %% send now - false when is_reference(LTimer) -> - %% timer already started - St; - false -> - %% delay send, try to accumulate more into the batch - Ref = erlang:send_after(LingerTimeout, self(), ?linger_expire), - St#{?linger_expire_timer := Ref} - end. - send_to_kafka(#{sent_reqs := SentReqs, sent_reqs_count := SentReqsCount, inflight_calls := InflightCalls, replayq := Q, config := #{max_batch_bytes := BytesLimit} = Config, - conn := Conn, - ?linger_expire_timer := LTimer + conn := Conn } = St0) -> - %% timer might have already expired, but should do no harm - is_reference(LTimer) andalso erlang:cancel_timer(LTimer), {NewQ, QAckRef, Items} = replayq:pop(Q, #{bytes_limit => BytesLimit, count_limit => 999999999}), wolff_metrics:queuing_set(Config, replayq:count(NewQ)), @@ -491,7 +499,7 @@ send_to_kafka(#{sent_reqs := SentReqs, NewInflightCalls = InflightCalls + NrOfCalls, _ = wolff_metrics:inflight_set(Config, NewInflightCalls), #kpro_req{ref = Ref, no_ack = NoAck} = Req = make_request(Items, St0), - St1 = St0#{replayq := NewQ, ?linger_expire_timer := false}, + St1 = St0#{replayq := NewQ}, Sent = #{req_ref => Ref, q_items => Items, q_ack_ref => QAckRef, @@ -534,8 +542,6 @@ queue_item_marshaller(?Q_ITEM(_, _, _) = I) -> queue_item_marshaller(Bin) when is_binary(Bin) -> binary_to_term(Bin). -get_item_ts(?Q_ITEM(_, Ts, _)) -> Ts. - get_produce_version(#{conn := Conn} = St) when is_pid(Conn) -> Vsn = case kpro:get_api_vsn_range(Conn, produce) of {ok, {_Min, Max}} -> Max; @@ -825,24 +831,69 @@ zz(I) -> (I bsl 1) bxor (I bsr 63). request_async(Conn, Req) when is_pid(Conn) -> ok = kpro:send(Conn, Req). +collect_send_calls(Call, Bytes, empty) -> + Init = #{ts => now_ts(), bytes => 0, batch_r => []}, + collect_send_calls(Call, Bytes, Init); +collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR}) -> + collect_send_calls2(#{ts => Ts, bytes => Bytes0 + Bytes, batch_r => [Call | BatchR]}). + %% Collect all send requests which are already in process mailbox -collect_send_calls(Calls, Size, Limit) when Size >= Limit -> - {lists:reverse(Calls), Size}; -collect_send_calls(Calls, Size, Limit) -> +collect_send_calls2(Calls) -> receive ?SEND_REQ(_, Batch, _) = Call -> - collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit) + Bytes = batch_bytes(Batch), + collect_send_calls(Call, Bytes, Calls) after 0 -> - {lists:reverse(Calls), Size} + Calls end. -enqueue_calls(Calls, #{replayq := Q, - pending_acks := PendingAcks0, - call_id_base := CallIdBase, - partition := Partition, - config := Config0 - } = St0) -> +ensure_linger_expire_timer_start(#{?linger_expire_timer := false} = St, Timeout) -> + %% delay enqueue, try to accumulate more into the batch + Ref = erlang:send_after(Timeout, self(), ?linger_expire), + St#{?linger_expire_timer := Ref}; +ensure_linger_expire_timer_start(St, _Timeout) -> + %% timer is already started + St. + +ensure_linger_expire_timer_cancel(#{?linger_expire_timer := LTimer} = St) -> + _ = is_reference(LTimer) andalso erlang:cancel_timer(LTimer), + St#{?linger_expire_timer => false}. + +%% check if the call collection should continue to linger before enqueue +is_linger_continue(#{calls := Calls, config := Config}) -> + #{max_linger_ms := MaxLingerMs, max_linger_bytes := MaxLingerBytes} = Config, + #{ts := Ts, bytes := Bytes} = Calls, + TimeLeft = MaxLingerMs - (now_ts() - Ts), + case TimeLeft =< 0 of + true -> + false; + false -> + (Bytes < MaxLingerBytes) andalso {true, TimeLeft} + end. + +enqueue_calls(#{calls := empty} = St, _) -> + %% no call to enqueue + St; +enqueue_calls(St, maybe_linger) -> + case is_linger_continue(St) of + {true, Timeout} -> + ensure_linger_expire_timer_start(St, Timeout); + false -> + enqueue_calls(St, no_linger) + end; +enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) -> + Calls = lists:reverse(CallsR), + St = ensure_linger_expire_timer_cancel(St0), + enqueue_calls2(Calls, St#{calls => empty}). + +enqueue_calls2(Calls, + #{replayq := Q, + pending_acks := PendingAcks0, + call_id_base := CallIdBase, + partition := Partition, + config := Config0 + } = St0) -> {QueueItems, PendingAcks, CallByteSize} = lists:foldl( fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) -> diff --git a/src/wolff_producers.erl b/src/wolff_producers.erl index 068d8b2..0c1edd0 100644 --- a/src/wolff_producers.erl +++ b/src/wolff_producers.erl @@ -330,7 +330,7 @@ pick_next_alive(LookupFn, Partition, Count) -> pick_next_alive(LookupFn, (Partition + 1) rem Count, Count, _Tried = 1). pick_next_alive(_LookupFn, _Partition, Count, Count) -> - throw(#{cause => all_producers_down}); + throw(#{cause => all_producers_down, count => Count}); pick_next_alive(LookupFn, Partition, Count, Tried) -> Pid = LookupFn(Partition), case is_alive(Pid) of diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index d394a04..4ca1131 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -324,20 +324,25 @@ replayq_overflow_test() -> Msg = #{key => <<>>, value => <<"12345">>}, Batch = [Msg, Msg], BatchSize = wolff_producer:batch_bytes(Batch), - ProducerCfg = #{max_batch_bytes => 1, %% make sure not collecting calls into one batch + LingerMs = 100, + ProducerCfg = #{max_batch_bytes => 1, %% ensure send one call at a time replayq_max_total_bytes => BatchSize, required_acks => all_isr, - max_linger_ms => 1000 %% do not send to kafka immediately + max_linger_ms => LingerMs, %% delay enqueue + max_linger_bytes => BatchSize + 1 %% delay enqueue }, {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + Pid = wolff_producers:lookup_producer(Producers, 0), + ?assert(is_process_alive(Pid)), TesterPid = self(), AckFun1 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_1, BaseOffset}, ok end, AckFun2 = fun(_Partition, BaseOffset) -> TesterPid ! {ack_2, BaseOffset}, ok end, SendF = fun(AckFun) -> wolff:send(Producers, Batch, AckFun) end, %% send two batches to overflow one - spawn(fun() -> SendF(AckFun1) end), + proc_lib:spawn_link(fun() -> SendF(AckFun1) end), timer:sleep(1), %% ensure order - spawn(fun() -> SendF(AckFun2) end), + proc_lib:spawn_link(fun() -> SendF(AckFun2) end), + timer:sleep(LingerMs * 2), try %% pushed out of replayq due to overflow receive @@ -363,7 +368,7 @@ replayq_overflow_test() -> [1] = get_telemetry_seq(CntrEventsTable, [wolff, dropped_queue_full]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])), - [0,1,2,1,0]), + [0,2,1,0]), ?assert_eq_optional_tail( wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])), [0,1,0]), @@ -610,8 +615,10 @@ test_message_too_large() -> %% then it should retry sending one message at a time ProducerCfg = #{partitioner => fun(_, _) -> 0 end, max_batch_bytes => MaxMessageBytes * 3, - %% ensure batching - max_linger_ms => 100 + %% ensure batching by delay enqueue by 100 seconds + max_linger_ms => 100, + %% ensure linger is not expired by reaching size + max_linger_bytes => MaxMessageBytes * 100 }, {ok, Producers} = wolff:start_producers(Client, TopicBin, ProducerCfg), MaxBytesCompensateOverhead = MaxMessageBytes - ?BATCHING_OVERHEAD - 7, @@ -620,7 +627,7 @@ test_message_too_large() -> Ref = make_ref(), Self = self(), AckFun = {fun ?MODULE:ack_cb/4, [Self, Ref]}, - _ = wolff:send(Producers, Batch, AckFun), + spawn(fun() -> wolff:send(Producers, Batch, AckFun) end), fun() -> ?WAIT(5000, {ack, Ref, _Partition, BaseOffset}, BaseOffset) end end, %% Must be ok to send one message