Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: minimize the reply context for sync-send #79

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions rebar.lock

This file was deleted.

49 changes: 30 additions & 19 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
-define(MIN_DISCARD_LOG_INTERVAL, 5000).

%% APIs
-export([start_link/5, stop/1, send/3, send/4, send_sync/3, ack_cb/4]).
-export([start_link/5, stop/1, send/3, send/4, send_sync/3]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).
Expand Down Expand Up @@ -98,14 +98,22 @@
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).
-define(no_queue_ack, no_queue_ack).
-define(MAX_LINGER_BYTES, (10 bsl 20)).
-define(EMPTY, empty).
-define(SYNC_REFS(Caller, Ref), {Caller, Ref}).
-define(IS_SYNC_REFS(Caller, Ref), ((is_reference(Caller) orelse is_pid(Caller)) andalso is_reference(Ref))).
zmstone marked this conversation as resolved.
Show resolved Hide resolved

-type ack_fun() :: wolff:ack_fun().
-type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()).
-type sent() :: #{req_ref := reference(),
q_items := [?Q_ITEM(_CallId, _Ts, _Batch)],
q_ack_ref := replayq:ack_ref(),
attempts := pos_integer()
}.

-type sync_refs() :: {pid() | reference(), reference()}.
-type calls() :: #{ts := pos_integer(),
bytes := pos_integer(),
batch_r := [send_req()]
}.
-type state() :: #{ client_id := wolff:client_id()
, config := config_state()
, conn := undefined | _
Expand All @@ -118,7 +126,7 @@
, sent_reqs_count := non_neg_integer()
, inflight_calls := non_neg_integer()
, topic := topic()
, calls := empty | #{ts := pos_integer(), bytes := pos_integer(), batch_r := [send_req()]}
, calls := ?EMPTY | calls()
}.

%% @doc Start a per-partition producer worker.
Expand Down Expand Up @@ -171,7 +179,7 @@ send(Pid, Batch, AckFun) ->
%% NOTE: It's possible that two or more batches get included into one produce request.
%% But a batch is never split into produce requests.
%% Make sure it will not exceed the `max_batch_bytes' limit when sending a batch.
-spec send(pid(), [wolff:msg()], wolff:ack_fun(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok.
-spec send(pid(), [wolff:msg()], wolff:ack_fun() | sync_refs(), WaitForQueued::wait_for_queued | no_wait_for_queued) -> ok.
send(Pid, [_ | _] = Batch0, AckFun, wait_for_queued) ->
Caller = self(),
Mref = erlang:monitor(process, Pid),
Expand All @@ -196,10 +204,11 @@ send_sync(Pid, Batch0, Timeout) ->
Caller = caller(),
Mref = erlang:monitor(process, Pid),
%% synced local usage, safe to use anonymous fun
zmstone marked this conversation as resolved.
Show resolved Hide resolved
AckFun = {fun ?MODULE:ack_cb/4, [Caller, Mref]},
ok = send(Pid, Batch0, AckFun, no_wait_for_queued),
Ack = ?SYNC_REFS(Caller, Mref),
ok = send(Pid, Batch0, Ack, no_wait_for_queued),
receive
{Mref, Partition, BaseOffset} ->
%% sent from eval_ack_cb
erlang:demonitor(Mref, [flush]),
{Partition, BaseOffset};
{'DOWN', Mref, _, _, Reason} ->
Expand All @@ -217,11 +226,6 @@ send_sync(Pid, Batch0, Timeout) ->
end
end.

%% @hidden Callbak exported for send_sync/3.
ack_cb(Partition, BaseOffset, Caller, Mref) ->
_ = erlang:send(Caller, {Mref, Partition, BaseOffset}),
ok.

init(St) ->
erlang:process_flag(trap_exit, true),
%% ensure init/1 can never fail
Expand Down Expand Up @@ -282,7 +286,7 @@ do_init(#{client_id := ClientId,
inflight_calls => 0,
conn := undefined,
client_id => ClientId,
calls => empty
calls => ?EMPTY
}.

handle_call(stop, From, St) ->
Expand Down Expand Up @@ -831,12 +835,16 @@ request_async(Conn, Req) when is_pid(Conn) ->

%% collect send calls which are already sent to mailbox,
%% the collection is size-limited by the max_linger_bytes config.
collect_send_calls(Call, Bytes, empty, Max) ->
collect_send_calls(Call, Bytes, ?EMPTY, Max) ->
Init = #{ts => now_ts(), bytes => 0, batch_r => []},
collect_send_calls(Call, Bytes, Init, Max);
collect_send_calls(Call, Bytes, #{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls, Max) ->
collect_send_calls(Call, Bytes, Calls, Max) ->
#{ts := Ts, bytes := Bytes0, batch_r := BatchR} = Calls,
Sum = Bytes0 + Bytes,
R = Calls#{ts => Ts, bytes => Sum, batch_r => [Call | BatchR]},
R = Calls#{ts => Ts,
bytes => Sum,
batch_r => [Call | BatchR]
},
case Sum < Max of
true ->
collect_send_calls2(R, Max);
Expand Down Expand Up @@ -879,7 +887,7 @@ is_linger_continue(#{calls := Calls, config := Config}) ->
false
end.

enqueue_calls(#{calls := empty} = St, _) ->
enqueue_calls(#{calls := ?EMPTY} = St, _) ->
%% no call to enqueue
St;
enqueue_calls(St, maybe_linger) ->
Expand All @@ -892,7 +900,7 @@ enqueue_calls(St, maybe_linger) ->
enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) ->
Calls = lists:reverse(CallsR),
St = ensure_linger_expire_timer_cancel(St0),
enqueue_calls2(Calls, St#{calls => empty}).
enqueue_calls2(Calls, St#{calls => ?EMPTY}).

enqueue_calls2(Calls,
#{replayq := Q,
Expand Down Expand Up @@ -933,9 +941,12 @@ maybe_reply_queued(?SEND_REQ({Caller, Ref}, _, _)) ->

eval_ack_cb(?ACK_CB(AckFun, Partition), BaseOffset) when is_function(AckFun, 2) ->
ok = AckFun(Partition, BaseOffset); %% backward compatible
eval_ack_cb(?ACK_CB({F, A}, Partition), BaseOffset) ->
eval_ack_cb(?ACK_CB({F, A}, Partition), BaseOffset) when is_function(F) ->
true = is_function(F, length(A) + 2),
ok = erlang:apply(F, [Partition, BaseOffset | A]).
ok = erlang:apply(F, [Partition, BaseOffset | A]);
eval_ack_cb(?ACK_CB({Caller, Ref}, Partition), BaseOffset) when ?IS_SYNC_REFS(Caller, Ref) ->
_ = erlang:send(Caller, {Ref, Partition, BaseOffset}),
ok.

handle_overflow(St, Overflow) when Overflow =< 0 ->
ok = maybe_log_discard(St, 0),
Expand Down
Loading