Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use ets table for client ID registration #57

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ jobs:
fail-fast: false
matrix:
otp:
- '24.1'
- '23.3.4.7'
- '26.2'
- '25.3'
kafka:
- '2.4'
- '1.1'
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 1.9.1
- Use ETS (named `wolff_clients_global`) for client ID registration.
When there are thousands of clients, `supervisor:which_children` becomes quite expensive.

* 1.9.0
- No global stats collection by default.
There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated.
Expand Down
13 changes: 13 additions & 0 deletions include/wolff.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,18 @@
%% Since Kafka 2.4, it has been extended to 1048588.
%% We keep it backward compatible here.
-define(WOLFF_KAFKA_DEFAULT_MAX_MESSAGE_BYTES, 1000000).

%% Table to register ClientID -> Pid mapping.
%% Applications may often need to find the client pid and run
%% some ad-hoc requests e.g. for health check purposes.
%% This talbe helps to avoid calling supervisor:which_children intensively.
-define(WOLFF_CLIENTS_GLOBAL_TABLE, wolff_clients_global).

%% Table to register {ClientID, Topic, Partition} -> Pid mapping.
%% This allows all producers to share this one ETS table for quick
%% partition-worker lookup.
%% A special record {{ClientId, Topic, partition_count}, Count}
%% is inserted to cache the partition count.
-define(WOLFF_PRODUCERS_GLOBAL_TABLE, wolff_producers_global).

-endif.
5 changes: 3 additions & 2 deletions src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.8.0"},
{vsn, "1.9.1"},
{registered, []},
{applications,
[kernel,
stdlib,
kafka_protocol,
replayq,
telemetry
telemetry,
lc
]},
{env,[]},
{mod, {wolff_app, start}},
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*- mode: erlang; -*-
{"1.8.0",
{"1.9.1",
[
],
[
Expand Down
16 changes: 12 additions & 4 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
%% Supervised producer management APIs
-export([ensure_supervised_producers/3,
stop_and_delete_supervised_producers/1,
stop_and_delete_supervised_producers/2,
%% /3 is deprecated, call /2 instead
stop_and_delete_supervised_producers/3
]).

Expand All @@ -44,7 +46,9 @@
-export([get_producer/2]).

-export_type([client_id/0, host/0, producers/0, msg/0, ack_fun/0, partitioner/0,
name/0, offset_reply/0]).
name/0, offset_reply/0, topic/0]).

-deprecated({stop_and_delete_supervised_producers, 3}).

-type client_id() :: binary().
-type host() :: kpro:endpoint().
Expand Down Expand Up @@ -92,10 +96,14 @@ stop_producers(Producers) ->
ensure_supervised_producers(ClientId, Topic, ProducerCfg) ->
wolff_producers:start_supervised(ClientId, Topic, ProducerCfg).

%% @doc Ensure supervised producers are stopped then deleted.
%% @hidden Deprecated.
-spec stop_and_delete_supervised_producers(client_id(), topic(), name()) -> ok.
stop_and_delete_supervised_producers(ClientId, Topic, Name) ->
wolff_producers:stop_supervised(ClientId, Topic, Name).
stop_and_delete_supervised_producers(ClientId, Topic, _Name) ->
stop_and_delete_supervised_producers(ClientId, Topic).

%% @doc Ensure supervised producers are stopped then deleted.
stop_and_delete_supervised_producers(ClientId, Topic) ->
wolff_producers:stop_supervised(ClientId, Topic).

%% @doc Ensure supervised producers are stopped then deleted.
-spec stop_and_delete_supervised_producers(wolff_producers:producers()) -> ok.
Expand Down
6 changes: 4 additions & 2 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ recv_leader_connection(Client, Topic, Partition, Pid) ->
delete_producers_metadata(Client, Topic) ->
gen_server:cast(Client, {delete_producers_metadata, Topic}).

init(St) ->
init(#{client_id := ClientID} = St) ->
erlang:process_flag(trap_exit, true),
ok = wolff_client_sup:register_client(ClientID),
{ok, St}.

handle_call(Call, From, #{connect := _Fun} = St) ->
Expand Down Expand Up @@ -203,7 +204,8 @@ handle_cast(_Cast, St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.

terminate(_, #{conns := Conns} = St) ->
terminate(_, #{client_id := ClientID, conns := Conns} = St) ->
ok = wolff_client_sup:deregister_client(ClientID),
ok = close_connections(Conns),
{ok, St#{conns := #{}}}.

Expand Down
58 changes: 44 additions & 14 deletions src/wolff_client_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
-export([start_link/0, init/1]).

-export([ensure_present/3, ensure_absence/1, find_client/1]).
-export([register_client/1, deregister_client/1]).

-include("wolff.hrl").

-define(SUPERVISOR, ?MODULE).

Expand All @@ -29,10 +32,11 @@ init([]) ->
intensity => 10,
period => 5
},
ok = create_clients_table(),
Children = [], %% dynamically added/stopped
{ok, {SupFlags, Children}}.

%% ensure a client started under supervisor
%% @doc Ensure a client started under supervisor.
-spec ensure_present(wolff:client_id(), [wolff:host()], wolff_client:config()) ->
{ok, pid()} | {error, client_not_running}.
ensure_present(ClientId, Hosts, Config) ->
Expand All @@ -43,31 +47,57 @@ ensure_present(ClientId, Hosts, Config) ->
{error, already_present} -> {error, client_not_running}
end.

%% ensure client stopped and deleted under supervisor
%% @doc Ensure client stopped and deleted under supervisor.
-spec ensure_absence(wolff:client_id()) -> ok.
ensure_absence(ClientId) ->
case supervisor:terminate_child(?SUPERVISOR, ClientId) of
ok -> ok = supervisor:delete_child(?SUPERVISOR, ClientId);
{error, not_found} -> ok
end.
end,
%% wolff_client process' terminate callback deregisters itself
%% but we make sure it's deregistered in case the client is killed
ok = deregister_client(ClientId).

%% find client pid from client id
%% @doc Find client pid from client id.
-spec find_client(wolff:client_id()) -> {ok, pid()} | {error, any()}.
find_client(ClientId) ->
Children = supervisor:which_children(?SUPERVISOR),
case lists:keyfind(ClientId, 1, Children) of
{ClientId, Client, _, _} when is_pid(Client) ->
{ok, Client};
{ClientId, Restarting, _, _} ->
{error, Restarting};
false ->
{error, no_such_client}
end.
find_client(ClientID) ->
try
case ets:lookup(?WOLFF_CLIENTS_GLOBAL_TABLE, ClientID) of
[{ClientID, Pid}] ->
{ok, Pid};
[] ->
{error, no_such_client}
end
catch
error : badarg ->
{error, client_supervisor_not_initialized}
end.


%% @private Make supervisor child spec.
child_spec(ClientId, Hosts, Config) ->
#{id => ClientId,
start => {wolff_client, start_link, [ClientId, Hosts, Config]},
restart => transient,
type => worker,
modules => [wolff_client]
}.

%% @doc Create a ets table which is used for client registration.
%% Records are of format: {ClientId, Pid}
create_clients_table() ->
EtsName = ?WOLFF_CLIENTS_GLOBAL_TABLE,
EtsOpts = [named_table, public, ordered_set, {read_concurrency, true}],
EtsName = ets:new(EtsName, EtsOpts),
ok.

%% @doc Insert the client in the registration table.
register_client(ClientId) ->
Pid = self(),
true = ets:insert(?WOLFF_CLIENTS_GLOBAL_TABLE, {ClientId, Pid}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe use ets:insert_new to avoid overwriting a previous entry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should always overwrite.
client process may restart (by supervisor) after crash

ok.

%% @doc Delete the client from the registration table.
deregister_client(ClientId) ->
_ = ets:delete(?WOLFF_CLIENTS_GLOBAL_TABLE, ClientId),
ok.
2 changes: 1 addition & 1 deletion src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
-define(ACK_CB(AckCb, Partition), {AckCb, Partition}).

-type queue_item() :: {kpro:req(), replayq:ack_ref(), [{_CallId, _MsgCount}]}.
-type state() :: #{ call_id_base := timer:time()
-type state() :: #{ call_id_base := pos_integer()
, client_id := wolff:client_id()
, config := config()
, conn := undefined | _
Expand Down
Loading
Loading