From 45d71ed08b139c8d29125b075061ed8c687d57b0 Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 10 Sep 2024 01:45:53 +0200 Subject: [PATCH] refactor: compact pending-acks --- src/wolff_pendack.erl | 124 ++++++++++++++++++++++++++++ src/wolff_producer.erl | 37 +++------ test/wolff_dynamic_topics_tests.erl | 75 ++++++++++++++++- test/wolff_tests.erl | 47 +++++++++++ 4 files changed, 255 insertions(+), 28 deletions(-) create mode 100644 src/wolff_pendack.erl diff --git a/src/wolff_pendack.erl b/src/wolff_pendack.erl new file mode 100644 index 0000000..0697a1a --- /dev/null +++ b/src/wolff_pendack.erl @@ -0,0 +1,124 @@ +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + + +%% @doc Implement a data structure to hold pending acks towards `send' or `cast' +%% callers. +%% The pending acks are stored in a queue, +%% each item in the queue is a pair of call IDs and callback. +%% The call ID is a monotonically increasing integer, starting from current +%% time in microseconds. +-module(wolff_pendack). + +-export([new/0, count/1, insert/2, take/2]). + +-export_type([acks/0]). + +-define(EMPTY, empty). + +-type call_id() :: pos_integer(). +-type key() :: call_id() | {call_id(), call_id()}. +-type cb() :: term(). +-opaque acks() :: #{next_id := integer(), + cbs := queue:queue({key(), cb()})}. + +%% @doc Initialize a new data structure. +new() -> + %% use a timestamp for call ID base so the items recovered from disk + %% will not be possible to clash with newer generation call after + %% the process crashed or node restarted. + Now = erlang:system_time(microsecond), + #{next_id => Now, + cbs => queue:new() + }. + +%% @doc count the total number of pending acks. +-spec count(acks()) -> non_neg_integer(). +count(#{cbs := Cbs}) -> + sum(queue:to_list(Cbs), 0). + +sum([], Acc) -> + Acc; +sum([{CallId, _} | Rest], Acc) when is_integer(CallId) -> + sum(Rest, Acc + 1); +sum([{{MinCallId, MaxCallId}, _} | Rest], Acc) -> + sum(Rest, Acc + MaxCallId - MinCallId + 1). + +%% @doc insert a callback into the data structure. +-spec insert(acks(), cb()) -> {call_id(), acks()}. +insert(#{next_id := Id, cbs := Cbs} = X, Cb) -> + NewCbs = insert_cb(Cbs, Id, Cb), + {Id, X#{next_id => Id + 1, cbs => NewCbs}}. + +insert_cb(Cbs, Id, Cb) -> + case queue:out_r(Cbs) of + {empty, _} -> + queue:in({Id, Cb}, Cbs); + {{value, {Key1, Cb1}}, Cbs1} -> + insert_cb1(Cbs1, Key1, Cb1, Id, Cb) + end. + +%% If the callback is identical to the previous one, then just update the +%% call ID range. +%% Otherwise, insert the new callback. +insert_cb1(Cbs, Key, Cb, Id, Cb1) when Cb =:= Cb1 -> + Key1 = exmapnd_id(Key, Id), + queue:in({Key1, Cb1}, Cbs); +insert_cb1(Cbs, Key, Cb, Id, Cb1) -> + queue:in({Id, Cb1}, queue:in({Key, Cb}, Cbs)). + +%% If the ID is a single integer, then expand it to a range. +exmapnd_id(Id0, Id) when is_integer(Id0) -> + Id =:= Id0 + 1 orelse error({unexpected_id, Id0, Id}), + exmapnd_id({Id0, Id0}, Id); +exmapnd_id({MinId, MaxId}, Id) -> + Id =:= MaxId + 1 orelse error({unexpected_id, {MinId, MaxId}, Id}), + {MinId, Id}. + +%% @doc Take the callback for a given call ID. +%% The ID is expected to be the oldest in the queue. +%% Return the callback and the updated data structure. +-spec take(acks(), call_id()) -> {ok, cb(), acks()} | false. +take(#{cbs := Cbs} = X, Id) -> + case take1(Cbs, Id) of + false -> + %% stale ack + false; + {ok, Cb, Cbs1} -> + {ok, Cb, X#{cbs => Cbs1}} + end. + +take1(Cbs0, Id) -> + case queue:out(Cbs0) of + {empty, _} -> + false; + {{value, {Key, Cb}}, Cbs} -> + take2(Cbs, Key, Cb, Id) + end. + +take2(Cbs, Id0, Cb, Id) when is_integer(Id0) -> + take2(Cbs, {Id0, Id0}, Cb, Id); +take2(_Cbs, {MinId, _MaxId}, _Cb, Id) when Id < MinId -> + %% stale ack + false; +take2(Cbs, {MinId, MaxId}, Cb, Id) when Id =:= MinId -> + %% ack the oldest item + case MaxId =:= MinId of + true -> + {ok, Cb, Cbs}; + false -> + {ok, Cb, queue:in_r({{Id + 1, MaxId}, Cb}, Cbs)} + end; +take2(_Cbs, {MinId, MaxId}, _Cb, Id) -> + error(#{cause => unexpected_id, min => MinId, max => MaxId, got => Id}). diff --git a/src/wolff_producer.erl b/src/wolff_producer.erl index 1c05d99..6244dc2 100644 --- a/src/wolff_producer.erl +++ b/src/wolff_producer.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2018-2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2018-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -97,7 +97,6 @@ -define(no_queued_reply, no_queued_reply). -define(ACK_CB(AckCb, Partition), {AckCb, Partition}). -define(no_queue_ack, no_queue_ack). --define(no_caller_ack, no_caller_ack). -define(MAX_LINGER_BYTES, (10 bsl 20)). -type ack_fun() :: wolff:ack_fun(). -type send_req() :: ?SEND_REQ({pid(), reference()}, [wolff:msg()], ack_fun()). @@ -107,13 +106,12 @@ attempts := pos_integer() }. --type state() :: #{ call_id_base := pos_integer() - , client_id := wolff:client_id() +-type state() :: #{ client_id := wolff:client_id() , config := config_state() , conn := undefined | _ , ?linger_expire_timer := false | reference() , partition := partition() - , pending_acks := #{} % CallId => AckCb + , pending_acks := wolff_pendack:acks() , produce_api_vsn := undefined | _ , replayq := replayq:q() , sent_reqs := queue:queue(sent()) @@ -277,8 +275,7 @@ do_init(#{client_id := ClientId, wolff_metrics:inflight_set(Config, 0), St#{replayq => Q, config := Config, - call_id_base => erlang:system_time(microsecond), - pending_acks => #{}, % CallId => AckCb + pending_acks => wolff_pendack:new(), produce_api_vsn => undefined, sent_reqs => queue:new(), % {kpro:req(), replayq:ack_ref(), [{CallId, MsgCount}]} sent_reqs_count => 0, @@ -388,9 +385,6 @@ ensure_ts(Batch) -> (Msg) -> Msg#{ts => now_ts()} end, Batch). -make_call_id(Base) -> - Base + erlang:unique_integer([positive]). - resolve_max_linger_bytes(#{max_linger_bytes := 0, max_batch_bytes := Mb } = Config, Q) -> @@ -568,12 +562,7 @@ get_batch_from_queue_items([], Acc) -> get_batch_from_queue_items([?Q_ITEM(_CallId, _Ts, Batch) | Items], Acc) -> get_batch_from_queue_items(Items, lists:reverse(Batch, Acc)). -count_calls([]) -> - 0; -count_calls([?Q_ITEM(?no_caller_ack, _Ts, _Batch) | Rest]) -> - count_calls(Rest); -count_calls([?Q_ITEM(_CallId, _Ts, _Batch) | Rest]) -> - 1 + count_calls(Rest). +count_calls(Items) -> length(Items). count_msgs([]) -> 0; @@ -761,16 +750,14 @@ ensure_delayed_reconnect(St, _Delay) -> St. evaluate_pending_ack_funs(PendingAcks, [], _BaseOffset) -> PendingAcks; -evaluate_pending_ack_funs(PendingAcks, [{?no_caller_ack, BatchSize} | Rest], BaseOffset) -> - evaluate_pending_ack_funs(PendingAcks, Rest, offset(BaseOffset, BatchSize)); evaluate_pending_ack_funs(PendingAcks, [{CallId, BatchSize} | Rest], BaseOffset) -> NewPendingAcks = - case maps:get(CallId, PendingAcks, false) of - false -> - PendingAcks; - AckCb -> + case wolff_pendack:take(PendingAcks, CallId) of + {ok, AckCb, PendingAcks1} -> ok = eval_ack_cb(AckCb, BaseOffset), - maps:without([CallId], PendingAcks) + PendingAcks1; + false -> + PendingAcks end, evaluate_pending_ack_funs(NewPendingAcks, Rest, offset(BaseOffset, BatchSize)). @@ -910,18 +897,16 @@ enqueue_calls(#{calls := #{batch_r := CallsR}} = St0, no_linger) -> enqueue_calls2(Calls, #{replayq := Q, pending_acks := PendingAcks0, - call_id_base := CallIdBase, partition := Partition, config := Config0 } = St0) -> {QueueItems, PendingAcks, CallByteSize} = lists:foldl( fun(?SEND_REQ(_From, Batch, AckFun), {Items, PendingAcksIn, Size}) -> - CallId = make_call_id(CallIdBase), %% keep callback funs in memory, do not seralize it into queue because %% saving anonymous function to disk may easily lead to badfun exception %% in case of restart on newer version beam. - PendingAcksOut = PendingAcksIn#{CallId => ?ACK_CB(AckFun, Partition)}, + {CallId, PendingAcksOut} = wolff_pendack:insert(PendingAcksIn, ?ACK_CB(AckFun, Partition)), NewItems = [make_queue_item(CallId, Batch) | Items], {NewItems, PendingAcksOut, Size + batch_bytes(Batch)} end, {[], PendingAcks0, 0}, Calls), diff --git a/test/wolff_dynamic_topics_tests.erl b/test/wolff_dynamic_topics_tests.erl index 1603e69..04513b4 100644 --- a/test/wolff_dynamic_topics_tests.erl +++ b/test/wolff_dynamic_topics_tests.erl @@ -28,8 +28,16 @@ dynamic_topics_test() -> T1 = <<"test-topic">>, T2 = <<"test-topic-2">>, T3 = <<"test-topic-3">>, - ?assertMatch({0, Pid} when is_pid(Pid), wolff:send2(Producers, T1, [Msg], AckFun)), - receive acked -> ok end, + N = 100, + %% send N messages + lists:foreach(fun(_I) -> + ?assertMatch({0, Pid} when is_pid(Pid), wolff:send2(Producers, T1, [Msg], AckFun)) + end, lists:seq(1, N)), + %% wait for N acks + lists:foreach(fun(_I) -> + receive acked -> ok end + end, lists:seq(1, N)), + ok = assert_producers_state(Producers, [T1, T2, T3]), ?assertMatch({0, Offset} when is_integer(Offset), send(Producers, T2, Msg)), ?assertMatch({0, Offset} when is_integer(Offset), cast(Producers, T3, Msg)), ?assertMatch(#{metadata_ts := #{T1 := _, T2 := _, T3 := _}, @@ -51,6 +59,69 @@ dynamic_topics_test() -> ok = application:stop(wolff), ok. +ack_cb_interlave_test() -> + _ = application:stop(wolff), %% ensure stopped + {ok, _} = application:ensure_all_started(wolff), + ClientId = <<"ack_cb_interleave_tes">>, + ClientCfg = client_config(), + {ok, _ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg), + Group = atom_to_binary(?FUNCTION_NAME), + ProducerCfg = #{required_acks => all_isr, + group => Group, + partitioner => 0 + }, + {ok, Producers} = start(ClientId, ProducerCfg), + ?assertEqual({ok, Producers}, start(ClientId, ProducerCfg)), + Children = supervisor:which_children(wolff_producers_sup), + ?assertMatch([_], Children), + %% We can send from each producer. + Msg = #{key => ?KEY, value => <<"value">>}, + Self = self(), + AckFun1 = fun(_Partition, _BaseOffset) -> Self ! acked1, ok end, + AckFun2 = fun(_Partition, _BaseOffset) -> Self ! acked2, ok end, + T1 = <<"test-topic">>, + N = 10, + {0, Pid} = wolff:send2(Producers, T1, [Msg], AckFun1), + {0, Pid} = wolff:send2(Producers, T1, [Msg], AckFun2), + receive acked1 -> ok end, + receive acked2 -> ok end, + %% suspend the connection, to make sure the acks will be pending + #{conn := Conn} = sys:get_state(Pid), + sys:suspend(Conn), + %% send N messages + lists:foreach(fun(_I) -> + ?assertMatch({0, Pid} when is_pid(Pid), wolff:cast2(Producers, T1, [Msg], AckFun1)), + ?assertMatch({0, Pid} when is_pid(Pid), wolff:cast2(Producers, T1, [Msg], AckFun2)) + end, lists:seq(1, N)), + %% inspect the pending acks + #{conn := Conn, pending_acks := Acks} = sys:get_state(Pid), + ?assertEqual(N * 2, wolff_pendack:count(Acks)), + #{cbs := Cbs} = Acks, + ?assertEqual(N * 2, queue:len(Cbs)), + %% resume the connection + sys:resume(Conn), + %% wait for 2*N acks + lists:foreach(fun(_I) -> receive acked1 -> ok end end, lists:seq(1, N)), + lists:foreach(fun(_I) -> receive acked2 -> ok end end, lists:seq(1, N)), + %% We now stop one of the producers. The other should keep working. + ok = wolff:stop_and_delete_supervised_producers(Producers), + ok = wolff:stop_and_delete_supervised_client(ClientId), + ok = application:stop(wolff), + ok. + +assert_producers_state(_Producers, []) -> + ok; +assert_producers_state(Producers, [Topic | Topics]) -> + ok = assert_producer_state(Producers, Topic), + assert_producers_state(Producers, Topics). + +assert_producer_state(Producers, Topic) -> + {0, Pid} = wolff_producers:pick_producer2(Producers, Topic, [#{value => <<"a">>}]), + #{pending_acks := Acks, sent_reqs := SentReqs} = sys:get_state(Pid), + ?assertEqual(0, wolff_pendack:count(Acks)), + ?assertEqual(0, queue:len(SentReqs)), + ok. + send(Producers, Topic, Message) -> wolff:send_sync2(Producers, Topic, [Message], 10_000). diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index b80b965..0b6c4cd 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -472,6 +472,53 @@ mem_only_replayq_test() -> ets:delete(CntrEventsTable), deinstall_event_logging(?FUNCTION_NAME). +recover_from_replayq_test() -> + ClientCfg = client_config(), + {ok, Client} = start_client(<<"client-2">>, ?HOSTS, ClientCfg), + ProducerCfg = #{replayq_dir => "test-data-2"}, + {ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + Msg = #{key => ?KEY, value => <<"value">>}, + {0, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000), + ProducerPid = wolff_producers:lookup_producer(Producers, 0), + #{conn := Conn} = sys:get_state(ProducerPid), + %% suspend the connection process so to make sure the messages will remain in the replayq + sys:suspend(Conn), + %% do not expect any ack. + AckFun = fun(_Partition, _BaseOffset) -> error(unexpected) end, + N = 10, + L = lists:seq(1, 10), + lists:foreach(fun(_) -> wolff:cast(Producers, [Msg], AckFun) end, L), + %% ensure the messages are in replayq + ProducerPid ! linger_expire, + #{pending_acks := Acks} = sys:get_state(ProducerPid), + ?assertEqual(N, wolff_pendack:count(Acks)), + %% kill the processes + unlink(ProducerPid), + exit(ProducerPid, kill), + exit(Conn, kill), + %% restart the producers + {ok, Producers1} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg), + ProducerPid1 = wolff_producers:lookup_producer(Producers1, 0), + retry(fun() -> + #{pending_acks := Acks1, replayq := Q} = sys:get_state(ProducerPid1), + ?assertEqual(0, wolff_pendack:count(Acks1)), + ?assertEqual(0, replayq:count(Q)) + end, 3000), + {0, BaseOffset1} = wolff:send_sync(Producers1, [Msg], 2000), + ?assertEqual(BaseOffset + N + 1, BaseOffset1), + ok = wolff:stop_producers(Producers1), + ok = stop_client(Client). + +retry(_F, T) when T < 0 -> + error(retry_failed); +retry(F, T) -> + try + F() + catch _:_ -> + timer:sleep(100), + retry(F, T - 100) + end. + replayq_offload_test() -> CntrEventsTable = ets:new(cntr_events, [public]), install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),