Skip to content

Commit

Permalink
refactor: delete wolff_stats
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Sep 6, 2024
1 parent 6ac6047 commit 6566ee7
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 108 deletions.
36 changes: 7 additions & 29 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
telemetry_meta_data => map(),
enable_global_stats => boolean(),
max_partitions => pos_integer()
}.

Expand All @@ -80,7 +79,6 @@
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
telemetry_meta_data => map(),
enable_global_stats => boolean(),
max_partitions => pos_integer()
}.

Expand Down Expand Up @@ -117,7 +115,6 @@
, sent_reqs_count := non_neg_integer()
, inflight_calls := non_neg_integer()
, topic := topic()
, enable_global_stats := boolean()
}.

%% @doc Start a per-partition producer worker.
Expand All @@ -136,9 +133,6 @@
%% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for
%% the last request. Must be 0 if messages must be delivered in strict order.
%% * `compression': `no_compression', `snappy' or `gzip'.
%% * `enable_global_stats': `true' | `false'.
%% Introduced in 1.9.0, default is `false'. Set to `true' to enalbe a global
%% send/receive stats table created in `wolff_stats' module.
-spec start_link(wolff:client_id(), topic(), partition(), pid() | ?conn_down(any()), config_in()) ->
{ok, pid()} | {error, any()}.
start_link(ClientId, Topic, Partition, MaybeConnPid, Config) ->
Expand Down Expand Up @@ -269,8 +263,7 @@ do_init(#{client_id := ClientId,
sent_reqs_count => 0,
inflight_calls => 0,
conn := undefined,
client_id => ClientId,
enable_global_stats => maps:get(enable_global_stats, Config0, false)
client_id => ClientId
}.

handle_call(stop, From, St) ->
Expand All @@ -285,8 +278,7 @@ handle_info({do_init, St0}, _) ->
handle_info(?linger_expire, St) ->
{noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})};
handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) ->
{Calls, Cnt, Oct} = collect_send_calls([Call], 1, batch_bytes(Batch), Limit),
ok = recv_stats(St0, #{cnt => Cnt, oct => Oct}),
{Calls, _CollectedBytes} = collect_send_calls([Call], batch_bytes(Batch), Limit),
St1 = enqueue_calls(Calls, St0),
St = maybe_send_to_kafka(St1),
{noreply, St};
Expand Down Expand Up @@ -510,7 +502,6 @@ send_to_kafka(#{sent_reqs := SentReqs,
inflight_calls := NewInflightCalls
},
ok = request_async(Conn, Req),
ok = send_stats(St2, Items),
St3 = maybe_fake_kafka_ack(NoAck, Sent, St2),
maybe_send_to_kafka(St3).

Expand Down Expand Up @@ -781,19 +772,6 @@ log_error(Topic, Partition, Msg, Args) ->
log(Level, Report) ->
logger:log(Level, Report).

send_stats(#{enable_global_stats := false}, _) ->
ok;
send_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Items) ->
Batch = get_batch_from_queue_items(Items),
{Cnt, Oct} =
lists:foldl(fun(Msg, {C, O}) -> {C + 1, O + oct(Msg)} end, {0, 0}, Batch),
ok = wolff_stats:sent(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}).

recv_stats(#{enable_global_stats := false}, _) ->
ok;
recv_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Increments) ->
ok = wolff_stats:recv(ClientId, Topic, Partition, Increments).

%% Estimation of size in bytes of one payload sent to Kafka.
%% According to Kafka protocol, a v2 record consists of below fields:
%% Length => varint # varint_bytes(SizeOfAllRestFields)
Expand Down Expand Up @@ -848,15 +826,15 @@ request_async(Conn, Req) when is_pid(Conn) ->
ok = kpro:send(Conn, Req).

%% Collect all send requests which are already in process mailbox
collect_send_calls(Calls, Count, Size, Limit) when Size >= Limit ->
{lists:reverse(Calls), Count, Size};
collect_send_calls(Calls, Count, Size, Limit) ->
collect_send_calls(Calls, Size, Limit) when Size >= Limit ->
{lists:reverse(Calls), Size};
collect_send_calls(Calls, Size, Limit) ->
receive
?SEND_REQ(_, Batch, _) = Call ->
collect_send_calls([Call | Calls], Count + 1, Size + batch_bytes(Batch), Limit)
collect_send_calls([Call | Calls], Size + batch_bytes(Batch), Limit)
after
0 ->
{lists:reverse(Calls), Count, Size}
{lists:reverse(Calls), Size}
end.

enqueue_calls(Calls, #{replayq := Q,
Expand Down
11 changes: 1 addition & 10 deletions src/wolff_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,10 @@ init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 10,
period => 5},
Children = [stats_worker(), client_sup(), producers_sup()],
Children = [client_sup(), producers_sup()],
ets:new(?WOLFF_PRODUCERS_GLOBAL_TABLE, [named_table, public, ordered_set, {read_concurrency, true}]),
{ok, {SupFlags, Children}}.

stats_worker() ->
#{id => wolff_stats,
start => {wolff_stats, start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [wolff_stats]
}.

client_sup() ->
#{id => wolff_client_sup,
start => {wolff_client_sup, start_link, []},
Expand Down
20 changes: 2 additions & 18 deletions test/wolff_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ start(WorkersCnt) ->
SendFun = fun(Msgs) ->
{_, _} = wolff:send_sync(Producers, Msgs, timer:seconds(10))
end,
ok = spawn_workers(SendFun, WorkersCnt),
ok = spawn_reporter(ClientId, maps:size(Producers)).
ok = spawn_workers(SendFun, WorkersCnt).

start_producers(Client) ->
ProducerCfg = #{required_acks => all_isr,
max_batch_bytes => 800*1000,
max_linger_ms => 1000,
max_send_ahead => 100,
enable_global_stats => true
max_send_ahead => 100
},
wolff:start_producers(Client, ?TOPIC, ProducerCfg).

Expand All @@ -41,17 +39,3 @@ worker_loop(SendFun) ->
Msgs = [#{key => <<I>>, value => Value} || I <- lists:seq(1,100)],
SendFun(Msgs),
worker_loop(SendFun).

spawn_reporter(ClientId, Partitions) ->
_ = spawn_link(fun() -> reporter_loop(ClientId, Partitions, 0, 0) end),
ok.

reporter_loop(ClientId, Partitions, LastCnt, LastOct) ->
IntervalSec = 5,
#{send_cnt := Cnt, send_oct := Oct} = wolff_stats:getstat(),
io:format("count=~p/s bytes=~p/s\n", [(Cnt - LastCnt) / IntervalSec,
(Oct - LastOct) / IntervalSec]),
timer:sleep(timer:seconds(IntervalSec)),
reporter_loop(ClientId, Partitions, Cnt, Oct).


6 changes: 0 additions & 6 deletions test/wolff_supervised_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,11 @@ supervised_client_test() ->
{Partition, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000),
io:format(user, "\nmessage produced to partition ~p at offset ~p\n",
[Partition, BaseOffset]),
?assertMatch(#{send_oct := O, send_cnt := C} when O > 0 andalso C > 0,
wolff_stats:getstat()),
?assertMatch(#{send_oct := O, send_cnt := C} when O > 0 andalso C > 0,
wolff_stats:getstat(ClientId, <<"test-topic">>, Partition)),
ok = wolff:stop_producers(Producers),
ok = wolff:stop_and_delete_supervised_client(ClientId),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
ok = application:stop(wolff),
?assertEqual(undefined, whereis(wolff_sup)),
?assertEqual(undefined, whereis(wolff_stats)),
assert_last_event_is_zero(queuing, CntrEventsTable),
assert_last_event_is_zero(inflight, CntrEventsTable),
[1] = get_telemetry_seq(CntrEventsTable, [wolff,success]),
Expand Down Expand Up @@ -531,7 +526,6 @@ client_config() -> #{}.

producer_config(Name) ->
#{replayq_dir => "test-data",
enable_global_stats => true,
name => Name
}.

Expand Down
48 changes: 3 additions & 45 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -491,49 +491,8 @@ replayq_offload_test() ->
ets:delete(CntrEventsTable),
deinstall_event_logging(?FUNCTION_NAME).

stats_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
ClientId = <<"client-stats-test">>,
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
?assertMatch(#{send_oct := 0, send_cnt := 0, recv_cnt := 0, recv_oct := 0},
wolff_stats:getstat()),
?assertMatch(#{send_oct := 0, send_cnt := 0, recv_cnt := 0, recv_oct := 0},
wolff_stats:getstat(ClientId, <<"nonexisting-topic">>, 0)),
ClientCfg = client_config(),
{ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg),
ProducerCfg0 = producer_config(),
ProducerCfg = ProducerCfg0#{required_acks => leader_only},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
Msg = #{key => ?KEY, value => <<"value">>},
{Partition, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000),
io:format(user, "\nmessage produced to partition ~p at offset ~p\n",
[Partition, BaseOffset]),
?assertMatch(#{send_oct := O, send_cnt := C,
recv_oct := O, recv_cnt := C} when O > 0 andalso C > 0,
wolff_stats:getstat()),
?assertMatch(#{send_oct := O, send_cnt := C,
recv_oct := O, recv_cnt := C} when O > 0 andalso C > 0,
wolff_stats:getstat(ClientId, <<"test-topic">>, Partition)),
ok = wolff:stop_producers(Producers),
ok = stop_client(Client),
ok = application:stop(wolff),
?assertEqual(undefined, whereis(wolff_sup)),
?assertEqual(undefined, whereis(wolff_stats)),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, queuing])),
[0, 1, 0]),
?assert_eq_optional_tail(
wolff_test_utils:dedup_list(get_telemetry_seq(CntrEventsTable, [wolff, inflight])),
[0, 1, 0]),
[1] = get_telemetry_seq(CntrEventsTable, [wolff, success]),
ets:delete(CntrEventsTable),
deinstall_event_logging(?FUNCTION_NAME),
ok.

check_connectivity_test() ->
ClientId = <<"client-stats-test">>,
ClientId = <<"client-connectivity-test">>,
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
ClientCfg = client_config(),
Expand All @@ -551,7 +510,7 @@ check_connectivity_test() ->
ok = application:stop(wolff).

client_state_upgrade_test() ->
ClientId = <<"client-stats-test">>,
ClientId = <<"client-state-upgrade-test">>,
_ = application:stop(wolff), %% ensure stopped
{ok, _} = application:ensure_all_started(wolff),
ClientCfg = client_config(),
Expand Down Expand Up @@ -772,8 +731,7 @@ to_old_client_state(St0) ->
client_config() -> #{}.

producer_config() ->
#{replayq_dir => "test-data",
enable_global_stats => true}.
#{replayq_dir => "test-data"}.

key(Name) ->
iolist_to_binary(io_lib:format("~p/~p", [Name, calendar:local_time()])).
Expand Down

0 comments on commit 6566ee7

Please sign in to comment.