Skip to content

Commit

Permalink
refactor: compact pending-acks
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Sep 10, 2024
1 parent 77b1a14 commit c49ad22
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 28 deletions.
124 changes: 124 additions & 0 deletions src/wolff_pendack.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.


%% @doc Implement a data structure to hold pending acks towards `send' or `cast'
%% callers.
%% The pending acks are stored in a queue,
%% each item in the queue is a pair of call IDs and callback.
%% The call ID is a monotonically increasing integer, starting from current
%% time in microseconds.
-module(wolff_pendack).

-export([new/0, count/1, insert/2, take/2]).

-export_type([acks/0]).

-define(EMPTY, empty).

-type call_id() :: pos_integer().
-type key() :: call_id() | {call_id(), call_id()}.
-type cb() :: term().
-opaque acks() :: #{next_id := integer(),
cbs := queue:queue({key(), cb()})}.

%% @doc Initialize a new data structure.
new() ->
%% use a timestamp for call ID base so the items recovered from disk
%% will not be possible to clash with newer generation call after
%% the process crashed or node restarted.
Now = erlang:system_time(microsecond),
#{next_id => Now,
cbs => queue:new()
}.

%% @doc count the total number of pending acks.
-spec count(acks()) -> non_neg_integer().
count(#{cbs := Cbs}) ->
sum(queue:to_list(Cbs), 0).

sum([], Acc) ->
Acc;
sum([{CallId, _} | Rest], Acc) when is_integer(CallId) ->
sum(Rest, Acc + 1);
sum([{{MinCallId, MaxCallId}, _} | Rest], Acc) ->
sum(Rest, Acc + MaxCallId - MinCallId + 1).

%% @doc insert a callback into the data structure.
-spec insert(acks(), cb()) -> {call_id(), acks()}.
insert(#{next_id := Id, cbs := Cbs} = X, Cb) ->
NewCbs = insert_cb(Cbs, Id, Cb),
{Id, X#{next_id => Id + 1, cbs => NewCbs}}.

insert_cb(Cbs, Id, Cb) ->
case queue:out_r(Cbs) of
{empty, _} ->
queue:in({Id, Cb}, Cbs);
{{value, {Key1, Cb1}}, Cbs1} ->
insert_cb1(Cbs1, Key1, Cb1, Id, Cb)
end.

%% If the callback is identical to the previous one, then just update the
%% call ID range.
%% Otherwise, insert the new callback.
insert_cb1(Cbs, Key, Cb, Id, Cb1) when Cb =:= Cb1 ->
Key1 = exmapnd_id(Key, Id),
queue:in({Key1, Cb1}, Cbs);
insert_cb1(Cbs, Key, Cb, Id, Cb1) ->
queue:in({Id, Cb1}, queue:in({Key, Cb}, Cbs)).

%% If the ID is a single integer, then expand it to a range.
exmapnd_id(Id0, Id) when is_integer(Id0) ->
Id =:= Id0 + 1 orelse error({unexpected_id, Id0, Id}),
exmapnd_id({Id0, Id0}, Id);
exmapnd_id({MinId, MaxId}, Id) ->
Id =:= MaxId + 1 orelse error({unexpected_id, {MinId, MaxId}, Id}),
{MinId, Id}.

%% @doc Take the callback for a given call ID.
%% The ID is expected to be the oldest in the queue.
%% Return the callback and the updated data structure.
-spec take(acks(), call_id()) -> {ok, cb(), acks()} | false.
take(#{cbs := Cbs} = X, Id) ->
case take1(Cbs, Id) of
false ->
%% stale ack
false;
{ok, Cb, Cbs1} ->
{ok, Cb, X#{cbs => Cbs1}}
end.

take1(Cbs0, Id) ->
case queue:out(Cbs0) of
{empty, _} ->
false;
{{value, {Key, Cb}}, Cbs} ->
take2(Cbs, Key, Cb, Id)
end.

take2(Cbs, Id0, Cb, Id) when is_integer(Id0) ->
take2(Cbs, {Id0, Id0}, Cb, Id);
take2(_Cbs, {MinId, _MaxId}, _Cb, Id) when Id < MinId ->
%% stale ack
false;
take2(Cbs, {MinId, MaxId}, Cb, Id) when Id =:= MinId ->
%% ack the oldest item
case MaxId =:= MinId of
true ->
{ok, Cb, Cbs};
false ->
{ok, Cb, queue:in_r({{Id + 1, MaxId}, Cb}, Cbs)}
end;
take2(_Cbs, {MinId, MaxId}, _Cb, Id) ->
error(#{cause => unexpected_id, min => MinId, max => MaxId, got => Id}).
37 changes: 11 additions & 26 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%% Copyright (c) 2018-2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,7 +97,6 @@
-define(no_queued_reply, no_queued_reply).
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).
-define(no_queue_ack, no_queue_ack).
-define(no_caller_ack, no_caller_ack).
-define(MAX_LINGER_BYTES, (10 bsl 20)).
-type ack_fun() :: wolff:ack_fun().
-type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()).
Expand All @@ -107,13 +106,12 @@
attempts := pos_integer()
}.

-type state() :: #{ call_id_base := pos_integer()
, client_id := wolff:client_id()
-type state() :: #{ client_id := wolff:client_id()
, config := config_state()
, conn := undefined | _
, ?linger_expire_timer := false | reference()
, partition := partition()
, pending_acks := #{} % CallId => AckCb
, pending_acks := wolff_pendack:acks()
, produce_api_vsn := undefined | _
, replayq := replayq:q()
, sent_reqs := queue:queue(sent())
Expand Down Expand Up @@ -277,8 +275,7 @@ do_init(#{client_id := ClientId,
wolff_metrics:inflight_set(Config, 0),
St#{replayq => Q,
config := Config,
call_id_base => erlang:system_time(microsecond),
pending_acks => #{}, % CallId => AckCb
pending_acks => wolff_pendack:new(),
produce_api_vsn => undefined,
sent_reqs => queue:new(), % {kpro:req(), replayq:ack_ref(), [{CallId, MsgCount}]}
sent_reqs_count => 0,
Expand Down Expand Up @@ -388,9 +385,6 @@ ensure_ts(Batch) ->
(Msg) -> Msg#{ts => now_ts()}
end, Batch).

make_call_id(Base) ->
Base + erlang:unique_integer([positive]).

resolve_max_linger_bytes(#{max_linger_bytes := 0,
max_batch_bytes := Mb
} = Config, Q) ->
Expand Down Expand Up @@ -568,12 +562,7 @@ get_batch_from_queue_items([], Acc) ->
get_batch_from_queue_items([?Q_ITEM(_CallId, _Ts, Batch) | Items], Acc) ->
get_batch_from_queue_items(Items, lists:reverse(Batch, Acc)).

count_calls([]) ->
0;
count_calls([?Q_ITEM(?no_caller_ack, _Ts, _Batch) | Rest]) ->
count_calls(Rest);
count_calls([?Q_ITEM(_CallId, _Ts, _Batch) | Rest]) ->
1 + count_calls(Rest).
count_calls(Items) -> length(Items).

count_msgs([]) ->
0;
Expand Down Expand Up @@ -761,16 +750,14 @@ ensure_delayed_reconnect(St, _Delay) ->
St.

evaluate_pending_ack_funs(PendingAcks, [], _BaseOffset) -> PendingAcks;
evaluate_pending_ack_funs(PendingAcks, [{?no_caller_ack, BatchSize} | Rest], BaseOffset) ->
evaluate_pending_ack_funs(PendingAcks, Rest, offset(BaseOffset, BatchSize));
evaluate_pending_ack_funs(PendingAcks, [{CallId, BatchSize} | Rest], BaseOffset) ->
NewPendingAcks =
case maps:get(CallId, PendingAcks, false) of
false ->
PendingAcks;
AckCb ->
case wolff_pendack:take(PendingAcks, CallId) of
{ok, AckCb, PendingAcks1} ->
ok = eval_ack_cb(AckCb, BaseOffset),
maps:without([CallId], PendingAcks)
PendingAcks1;
false ->
PendingAcks
end,
evaluate_pending_ack_funs(NewPendingAcks, Rest, offset(BaseOffset, BatchSize)).

Expand Down Expand Up @@ -910,18 +897,16 @@ enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) ->
enqueue_calls2(Calls,
#{replayq := Q,
pending_acks := PendingAcks0,
call_id_base := CallIdBase,
partition := Partition,
config := Config0
} = St0) ->
{QueueItems, PendingAcks, CallByteSize} =
lists:foldl(
fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) ->
CallId = make_call_id(CallIdBase),
%% keep callback funs in memory, do not seralize it into queue because
%% saving anonymous function to disk may easily lead to badfun exception
%% in case of restart on newer version beam.
PendingAcksOut = PendingAcksIn#{CallId => ?ACK_CB(AckFun, Partition)},
{CallId, PendingAcksOut} = wolff_pendack:insert(PendingAcksIn, ?ACK_CB(AckFun, Partition)),
NewItems = [make_queue_item(CallId, Batch) | Items],
{NewItems, PendingAcksOut, Size + batch_bytes(Batch)}
end, {[], PendingAcks0, 0}, Calls),
Expand Down
75 changes: 73 additions & 2 deletions test/wolff_dynamic_topics_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ dynamic_topics_test() ->
T1 = <<"test-topic">>,
T2 = <<"test-topic-2">>,
T3 = <<"test-topic-3">>,
?assertMatch({0, Pid} when is_pid(Pid), wolff:send2(Producers, T1, [Msg], AckFun)),
receive acked -> ok end,
N = 100,
%% send N messages
lists:foreach(fun(_I) ->
?assertMatch({0, Pid} when is_pid(Pid), wolff:send2(Producers, T1, [Msg], AckFun))
end, lists:seq(1, N)),
%% wait for N acks
lists:foreach(fun(_I) ->
receive acked -> ok end
end, lists:seq(1, N)),
ok = assert_producers_state(Producers, [T1, T2, T3]),
?assertMatch({0, Offset} when is_integer(Offset), send(Producers, T2, Msg)),
?assertMatch({0, Offset} when is_integer(Offset), cast(Producers, T3, Msg)),
?assertMatch(#{metadata_ts := #{T1 := _, T2 := _, T3 := _},
Expand All @@ -51,6 +59,69 @@ dynamic_topics_test() ->
ok = application:stop(wolff),
ok.

ack_cb_interlave_test() ->
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
ClientId = <<"ack_cb_interleave_tes">>,
ClientCfg = client_config(),
{ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg),
Group = atom_to_binary(?FUNCTION_NAME),
ProducerCfg = #{required_acks => all_isr,
group => Group,
partitioner => 0
},
{ok, Producers} = start(ClientId, ProducerCfg),
?assertEqual({ok, Producers}, start(ClientId, ProducerCfg)),
Children = supervisor:which_children(wolff_producers_sup),
?assertMatch([_], Children),
%% We can send from each producer.
Msg = #{key => ?KEY, value => <<"value">>},
Self = self(),
AckFun1 = fun(_Partition, _BaseOffset) -> Self ! acked1, ok end,
AckFun2 = fun(_Partition, _BaseOffset) -> Self ! acked2, ok end,
T1 = <<"test-topic">>,
N = 10,
{0, Pid} = wolff:send2(Producers, T1, [Msg], AckFun1),
{0, Pid} = wolff:send2(Producers, T1, [Msg], AckFun2),
receive acked1 -> ok end,
receive acked2 -> ok end,
%% suspend the connection, to make sure the acks will be pending
#{conn := Conn} = sys:get_state(Pid),
sys:suspend(Conn),
%% send N messages
lists:foreach(fun(_I) ->
?assertMatch({0, Pid} when is_pid(Pid), wolff:cast2(Producers, T1, [Msg], AckFun1)),
?assertMatch({0, Pid} when is_pid(Pid), wolff:cast2(Producers, T1, [Msg], AckFun2))
end, lists:seq(1, N)),
%% inspect the pending acks
#{conn := Conn, pending_acks := Acks} = sys:get_state(Pid),
?assertEqual(N * 2, wolff_pendack:count(Acks)),
#{cbs := Cbs} = Acks,
?assertEqual(N * 2, queue:len(Cbs)),
%% resume the connection
sys:resume(Conn),
%% wait for 2*N acks
lists:foreach(fun(_I) -> receive acked1 -> ok end end, lists:seq(1, N)),
lists:foreach(fun(_I) -> receive acked2 -> ok end end, lists:seq(1, N)),
%% We now stop one of the producers. The other should keep working.
ok = wolff:stop_and_delete_supervised_producers(Producers),
ok = wolff:stop_and_delete_supervised_client(ClientId),
ok = application:stop(wolff),
ok.

assert_producers_state(_Producers, []) ->
ok;
assert_producers_state(Producers, [Topic | Topics]) ->
ok = assert_producer_state(Producers, Topic),
assert_producers_state(Producers, Topics).

assert_producer_state(Producers, Topic) ->
{0, Pid} = wolff_producers:pick_producer2(Producers, Topic, [#{value => <<"a">>}]),
#{pending_acks := Acks, sent_reqs := SentReqs} = sys:get_state(Pid),
?assertEqual(0, wolff_pendack:count(Acks)),
?assertEqual(0, queue:len(SentReqs)),
ok.

send(Producers, Topic, Message) ->
wolff:send_sync2(Producers, Topic, [Message], 10_000).

Expand Down
47 changes: 47 additions & 0 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,53 @@ mem_only_replayq_test() ->
ets:delete(CntrEventsTable),
deinstall_event_logging(?FUNCTION_NAME).

recover_from_replayq_test() ->
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-2">>, ?HOSTS, ClientCfg),
ProducerCfg = #{replayq_dir => "test-data-2"},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Msg = #{key => ?KEY, value => <<"value">>},
{0, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000),
ProducerPid = wolff_producers:lookup_producer(Producers, 0),
#{conn := Conn} = sys:get_state(ProducerPid),
%% suspend the connection process so to make sure the messages will remain in the replayq
sys:suspend(Conn),
%% do not expect any ack.
AckFun = fun(_Partition, _BaseOffset) -> error(unexpected) end,
N = 10,
L = lists:seq(1, 10),
lists:foreach(fun(_) -> wolff:cast(Producers, [Msg], AckFun) end, L),
%% ensure the messages are in replayq
ProducerPid ! linger_expire,
#{pending_acks := Acks} = sys:get_state(ProducerPid),
?assertEqual(N, wolff_pendack:count(Acks)),
%% kill the processes
unlink(ProducerPid),
ok = wolff_producer:stop(ProducerPid),
exit(Conn, kill),
%% restart the producers
{ok, Producers1} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
ProducerPid1 = wolff_producers:lookup_producer(Producers1, 0),
retry(fun() ->
#{pending_acks := Acks1, replayq := Q} = sys:get_state(ProducerPid1),
?assertEqual(0, wolff_pendack:count(Acks1)),
?assertEqual(0, replayq:count(Q))
end, 3000),
{0, BaseOffset1} = wolff:send_sync(Producers1, [Msg], 2000),
?assertEqual(BaseOffset + N + 1, BaseOffset1),
ok = wolff:stop_producers(Producers1),
ok = stop_client(Client).

retry(_F, T) when T < 0 ->
error(retry_failed);
retry(F, T) ->
try
F()
catch _:_ ->
timer:sleep(100),
retry(F, T - 100)
end.

replayq_offload_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
Expand Down

0 comments on commit c49ad22

Please sign in to comment.