diff --git a/rebar.config b/rebar.config index a12e328..0a379b1 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ -{deps, [ {kafka_protocol, "4.1.8"} +{deps, [ {kafka_protocol, "4.1.9"} , {replayq, "0.3.4"} , {lc, "0.3.2"} , {telemetry, "1.1.0"} diff --git a/rebar.lock b/rebar.lock deleted file mode 100644 index 9ec4f17..0000000 --- a/rebar.lock +++ /dev/null @@ -1,23 +0,0 @@ -{"1.2.0", -[{<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.8">>},1}, - {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.4">>},0}, - {<<"lc">>,{pkg,<<"lc">>,<<"0.3.2">>},0}, - {<<"replayq">>,{pkg,<<"replayq">>,<<"0.3.4">>},0}, - {<<"snabbkaffe">>,{pkg,<<"snabbkaffe">>,<<"1.0.1">>},1}, - {<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.1.0">>},0}]}. -[ -{pkg_hash,[ - {<<"crc32cer">>, <<"C6C2275C5FB60A95F4935D414F30B50EE9CFED494081C9B36EBB02EDFC2F48DB">>}, - {<<"kafka_protocol">>, <<"59EA25E893BB1BB3AC09EEF6F27F915865C7AB80994639DEC6ACE41AF563F4AF">>}, - {<<"lc">>, <<"51EEA2832670CBFE208E0C99D633758958AF2A63128BB321B78438370E1E7D56">>}, - {<<"replayq">>, <<"50BCECA778E6A239BBDCE6A9A70F7CA42BA8EDEE8121A432E8082447C73F98EF">>}, - {<<"snabbkaffe">>, <<"8E95CCBC90E1445C6B504D7980A35FE87E140BD694CED35851965B87D42BD882">>}, - {<<"telemetry">>, <<"A589817034A27EAB11144AD24D5C0F9FAB1F58173274B1E9BAE7074AF9CBEE51">>}]}, -{pkg_hash_ext,[ - {<<"crc32cer">>, <<"251499085482920DEB6C9B7AADABF9FB4C432F96ADD97AB42AEE4501E5B6F591">>}, - {<<"kafka_protocol">>, <<"8AE0050DB0DFA78147249A150F0EC0D5EF61B965A097478A70A0C242118FDD7C">>}, - {<<"lc">>, <<"49CCD97659956869D78A0F4D580074C421DDDFAD664661BB724D03D08A971A12">>}, - {<<"replayq">>, <<"FC35B84D50AD9118974768936DA1DD321F023106A188C31800AE61EB45A094F7">>}, - {<<"snabbkaffe">>, <<"035812759963897B71EE69A11E8DFF93BBBB0F475055479A557DD3C4FD72FE33">>}, - {<<"telemetry">>, <<"B727B2A1F75614774CFF2D7565B64D0DFA5BD52BA517F16543E6FC7EFCC0DF48">>}]} -]. diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 0ff8a6d..7a5f02b 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -20,7 +20,7 @@ -define(MIN_DISCARD_LOG_INTERVAL, 5000). %% APIs --export([start_link/5, stop/1, send/3, send/4, send_sync/3, ack_cb/4]). +-export([start_link/5, stop/1, send/3, send/4, send_sync/3]). %% gen_server callbacks -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). @@ -98,6 +98,10 @@ -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -define(no_queue_ack, no_queue_ack). -define(MAX_LINGER_BYTES, (10 bsl 20)). +-define(EMPTY, empty). +-define(SYNC_REF(Caller, Ref), {Caller, Ref}). +-define(IS_SYNC_REF(Caller, Ref), ((is_reference(Caller) orelse is_pid(Caller)) andalso is_reference(Ref))). + -type ack_fun() :: wolff:ack_fun(). -type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()). -type sent() :: #{req_ref := reference(), @@ -105,7 +109,11 @@ q_ack_ref := replayq:ack_ref(), attempts := pos_integer() }. - +-type sync_refs() :: {pid() | reference(), reference()}. +-type calls() :: #{ts := pos_integer(), + bytes := pos_integer(), + batch_r := [send_req()] + }. -type state() :: #{ client_id := wolff:client_id() , config := config_state() , conn := undefined | _ @@ -118,7 +126,7 @@ , sent_reqs_count := non_neg_integer() , inflight_calls := non_neg_integer() , topic := topic() - , calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]} + , calls := ?EMPTY | calls() }. %% @doc Start a per-partition producer worker. @@ -171,7 +179,7 @@ send(Pid, Batch, AckFun) -> %% NOTE: It's possible that two or more batches get included into one produce request. %% But a batch is never split into produce requests. %% Make sure it will not exceed the `max_batch_bytes' limit when sending a batch. --spec send(pid(), [wolff:msg()], wolff:ack_fun(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok. +-spec send(pid(), [wolff:msg()], wolff:ack_fun() | sync_refs(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok. send(Pid, [_ | _] = Batch0, AckFun, wait_for_queued) -> Caller = self(), Mref = erlang:monitor(process, Pid), @@ -195,11 +203,11 @@ send(Pid, [_ | _] = Batch0, AckFun, no_wait_for_queued) -> send_sync(Pid, Batch0, Timeout) -> Caller = caller(), Mref = erlang:monitor(process, Pid), - %% synced local usage, safe to use anonymous fun - AckFun = {fun ?MODULE:ack_cb/4, [Caller, Mref]}, - ok = send(Pid, Batch0, AckFun, no_wait_for_queued), + Ack = ?SYNC_REF(Caller, Mref), + ok = send(Pid, Batch0, Ack, no_wait_for_queued), receive {Mref, Partition, BaseOffset} -> + %% sent from eval_ack_cb erlang:demonitor(Mref, [flush]), {Partition, BaseOffset}; {'DOWN', Mref, _, _, Reason} -> @@ -217,11 +225,6 @@ send_sync(Pid, Batch0, Timeout) -> end end. -%% @hidden Callbak exported for send_sync/3. -ack_cb(Partition, BaseOffset, Caller, Mref) -> - _ = erlang:send(Caller, {Mref, Partition, BaseOffset}), - ok. - init(St) -> erlang:process_flag(trap_exit, true), %% ensure init/1 can never fail @@ -282,7 +285,7 @@ do_init(#{client_id := ClientId, inflight_calls => 0, conn := undefined, client_id => ClientId, - calls => empty + calls => ?EMPTY }. handle_call(stop, From, St) -> @@ -831,12 +834,16 @@ request_async(Conn, Req) when is_pid(Conn) -> %% collect send calls which are already sent to mailbox, %% the collection is size-limited by the max_linger_bytes config. -collect_send_calls(Call, Bytes, empty, Max) -> +collect_send_calls(Call, Bytes, ?EMPTY, Max) -> Init = #{ts => now_ts(), bytes => 0, batch_r => []}, collect_send_calls(Call, Bytes, Init, Max); -collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Max) -> +collect_send_calls(Call, Bytes, Calls, Max) -> + #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Sum = Bytes0 + Bytes, - R = Calls#{ts => Ts, bytes => Sum, batch_r => [Call | BatchR]}, + R = Calls#{ts => Ts, + bytes => Sum, + batch_r => [Call | BatchR] + }, case Sum < Max of true -> collect_send_calls2(R, Max); @@ -879,7 +886,7 @@ is_linger_continue(#{calls := Calls, config := Config}) -> false end. -enqueue_calls(#{calls := empty} = St, _) -> +enqueue_calls(#{calls := ?EMPTY} = St, _) -> %% no call to enqueue St; enqueue_calls(St, maybe_linger) -> @@ -892,7 +899,7 @@ enqueue_calls(St, maybe_linger) -> enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) -> Calls = lists:reverse(CallsR), St = ensure_linger_expire_timer_cancel(St0), - enqueue_calls2(Calls, St#{calls => empty}). + enqueue_calls2(Calls, St#{calls => ?EMPTY}). enqueue_calls2(Calls, #{replayq := Q, @@ -933,9 +940,12 @@ maybe_reply_queued(?SEND_REQ({Caller, Ref}, _, _)) -> eval_ack_cb(?ACK_CB(AckFun, Partition), BaseOffset) when is_function(AckFun, 2) -> ok = AckFun(Partition, BaseOffset); %% backward compatible -eval_ack_cb(?ACK_CB({F, A}, Partition), BaseOffset) -> +eval_ack_cb(?ACK_CB({F, A}, Partition), BaseOffset) when is_function(F) -> true = is_function(F, length(A) + 2), - ok = erlang:apply(F, [Partition, BaseOffset | A]). + ok = erlang:apply(F, [Partition, BaseOffset | A]); +eval_ack_cb(?ACK_CB({Caller, Ref}, Partition), BaseOffset) when ?IS_SYNC_REF(Caller, Ref) -> + _ = erlang:send(Caller, {Ref, Partition, BaseOffset}), + ok. handle_overflow(St, Overflow) when Overflow =< 0 -> ok = maybe_log_discard(St, 0),