Skip to content

Commit

Permalink
perf: use ets table for client ID registration
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Dec 16, 2023
1 parent 40b9300 commit a1437b0
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 103 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
fail-fast: false
matrix:
otp:
- '24.1'
- '23.3.4.7'
- '26.2'
- '25.3'
kafka:
- '2.4'
- '1.1'
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 1.9.1
- Use ETS (named `wolff_clients_global`) for client ID registration.
When there are thousands of clients, `supervisor:which_children` becomes quite expensive.

* 1.9.0
- No global stats collection by default.
There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated.
Expand Down
13 changes: 13 additions & 0 deletions include/wolff.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,18 @@
%% Since Kafka 2.4, it has been extended to 1048588.
%% We keep it backward compatible here.
-define(WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES, 1000000).

%% Table to register ClientID -> Pid mapping.
%% Applications may often need to find the client pid and run
%% some ad-hoc requests e.g. for health check purposes.
%% This talbe helps to avoid calling supervisor:which_children intensively.
-define(WOLFF_CLIENTS_GLOBAL_TABLE, wolff_clients_global).

%% Table to register {ClientID, Topic, Partition} -> Pid mapping.
%% This allows all producers to share this one ETS table for quick
%% partition-worker lookup.
%% A special record {{ClientId, Topic, partition_count}, Count}
%% is inserted to cache the partition count.
-define(WOLFF_PRODUCERS_GLOBAL_TABLE, wolff_producers_global).

-endif.
5 changes: 3 additions & 2 deletions src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.8.0"},
{vsn, "1.9.1"},
{registered, []},
{applications,
[kernel,
stdlib,
kafka_protocol,
replayq,
telemetry
telemetry,
lc
]},
{env,[]},
{mod, {wolff_app, start}},
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*- mode: erlang; -*-
{"1.8.0",
{"1.9.1",
[
],
[
Expand Down
16 changes: 12 additions & 4 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
%% Supervised producer management APIs
-export([ensure_supervised_producers/3,
stop_and_delete_supervised_producers/1,
stop_and_delete_supervised_producers/2,
%% /3 is deprecated, call /2 instead
stop_and_delete_supervised_producers/3
]).

Expand All @@ -44,7 +46,9 @@
-export([get_producer/2]).

-export_type([client_id/0, host/0, producers/0, msg/0, ack_fun/0, partitioner/0,
name/0, offset_reply/0]).
name/0, offset_reply/0, topic/0]).

-deprecated({stop_and_delete_supervised_producers, 3}).

-type client_id() :: binary().
-type host() :: kpro:endpoint().
Expand Down Expand Up @@ -92,10 +96,14 @@ stop_producers(Producers) ->
ensure_supervised_producers(ClientId, Topic, ProducerCfg) ->
wolff_producers:start_supervised(ClientId, Topic, ProducerCfg).

%% @doc Ensure supervised producers are stopped then deleted.
%% @hidden Deprecated.
-spec stop_and_delete_supervised_producers(client_id(), topic(), name()) -> ok.
stop_and_delete_supervised_producers(ClientId, Topic, Name) ->
wolff_producers:stop_supervised(ClientId, Topic, Name).
stop_and_delete_supervised_producers(ClientId, Topic, _Name) ->
stop_and_delete_supervised_producers(ClientId, Topic).

%% @doc Ensure supervised producers are stopped then deleted.
stop_and_delete_supervised_producers(ClientId, Topic) ->
wolff_producers:stop_supervised(ClientId, Topic).

%% @doc Ensure supervised producers are stopped then deleted.
-spec stop_and_delete_supervised_producers(wolff_producers:producers()) -> ok.
Expand Down
6 changes: 4 additions & 2 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ recv_leader_connection(Client, Topic, Partition, Pid) ->
delete_producers_metadata(Client, Topic) ->
gen_server:cast(Client, {delete_producers_metadata, Topic}).

init(St) ->
init(#{client_id := ClientID} = St) ->
erlang:process_flag(trap_exit, true),
ok = wolff_client_sup:register_client(ClientID),
{ok, St}.

handle_call(Call, From, #{connect := _Fun} = St) ->
Expand Down Expand Up @@ -203,7 +204,8 @@ handle_cast(_Cast, St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.

terminate(_, #{conns := Conns} = St) ->
terminate(_, #{client_id := ClientID, conns := Conns} = St) ->
ok = wolff_client_sup:deregister_client(ClientID),
ok = close_connections(Conns),
{ok, St#{conns := #{}}}.

Expand Down
58 changes: 44 additions & 14 deletions src/wolff_client_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
-export([start_link/0, init/1]).

-export([ensure_present/3, ensure_absence/1, find_client/1]).
-export([register_client/1, deregister_client/1]).

-include("wolff.hrl").

-define(SUPERVISOR, ?MODULE).

Expand All @@ -29,10 +32,11 @@ init([]) ->
intensity => 10,
period => 5
},
ok = create_clients_table(),
Children = [], %% dynamically added/stopped
{ok, {SupFlags, Children}}.

%% ensure a client started under supervisor
%% @doc Ensure a client started under supervisor.
-spec ensure_present(wolff:client_id(), [wolff:host()], wolff_client:config()) ->
{ok, pid()} | {error, client_not_running}.
ensure_present(ClientId, Hosts, Config) ->
Expand All @@ -43,31 +47,57 @@ ensure_present(ClientId, Hosts, Config) ->
{error, already_present} -> {error, client_not_running}
end.

%% ensure client stopped and deleted under supervisor
%% @doc Ensure client stopped and deleted under supervisor.
-spec ensure_absence(wolff:client_id()) -> ok.
ensure_absence(ClientId) ->
case supervisor:terminate_child(?SUPERVISOR, ClientId) of
ok -> ok = supervisor:delete_child(?SUPERVISOR, ClientId);
{error, not_found} -> ok
end.
end,
%% wolff_client process' terminate callback deregisters itself
%% but we make sure it's deregistered in case the client is killed
ok = deregister_client(ClientId).

%% find client pid from client id
%% @doc Find client pid from client id.
-spec find_client(wolff:client_id()) -> {ok, pid()} | {error, any()}.
find_client(ClientId) ->
Children = supervisor:which_children(?SUPERVISOR),
case lists:keyfind(ClientId, 1, Children) of
{ClientId, Client, _, _} when is_pid(Client) ->
{ok, Client};
{ClientId, Restarting, _, _} ->
{error, Restarting};
false ->
{error, no_such_client}
end.
find_client(ClientID) ->
try
case ets:lookup(?WOLFF_CLIENTS_GLOBAL_TABLE, ClientID) of
[{ClientID, Pid}] ->
{ok, Pid};
[] ->
{error, no_such_client}
end
catch
error : badarg ->
{error, client_supervisor_not_initialized}
end.


%% @private Make supervisor child spec.
child_spec(ClientId, Hosts, Config) ->
#{id => ClientId,
start => {wolff_client, start_link, [ClientId, Hosts, Config]},
restart => transient,
type => worker,
modules => [wolff_client]
}.

%% @doc Create a ets table which is used for client registration.
%% Records are of format: {ClientId, Pid}
create_clients_table() ->
EtsName = ?WOLFF_CLIENTS_GLOBAL_TABLE,
EtsOpts = [named_table, public, ordered_set, {read_concurrency, true}],
EtsName = ets:new(EtsName, EtsOpts),
ok.

%% @doc Insert the client in the registration table.
register_client(ClientId) ->
Pid = self(),
true = ets:insert(?WOLFF_CLIENTS_GLOBAL_TABLE, {ClientId, Pid}),
ok.

%% @doc Delete the client from the registration table.
deregister_client(ClientId) ->
_ = ets:delete(?WOLFF_CLIENTS_GLOBAL_TABLE, ClientId),
ok.
2 changes: 1 addition & 1 deletion src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).

-type queue_item() :: {kpro:req(), replayq:ack_ref(), [{_CallId, _MsgCount}]}.
-type state() :: #{ call_id_base := timer:time()
-type state() :: #{ call_id_base := pos_integer()
, client_id := wolff:client_id()
, config := config()
, conn := undefined | _
Expand Down
Loading

0 comments on commit a1437b0

Please sign in to comment.