Skip to content

Commit

Permalink
Merge pull request #53 from zmstone/1029-check-topic-with-clinet-pid
Browse files Browse the repository at this point in the history
1029 check topic with clinet pid
  • Loading branch information
zmstone authored Oct 29, 2023
2 parents a582855 + aab147c commit 9518667
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
17 changes: 15 additions & 2 deletions src/wolff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
send_sync/3
]).

-export([check_connectivity/1, check_connectivity/2, check_if_topic_exists/3]).
-export([check_connectivity/1,
check_connectivity/2,
check_if_topic_exists/2,
check_if_topic_exists/3]).

%% for test
-export([get_producer/2]).
Expand Down Expand Up @@ -149,6 +152,16 @@ check_connectivity(Hosts, ConnConfig) ->

%% @doc Check if the cluster is reachable and the topic is created.
-spec check_if_topic_exists([host()], wolff_client:config(), topic()) ->
ok | {error, [{FormatedHostPort :: binary(), any()}]}.
ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}.
check_if_topic_exists(Hosts, ConnConfig, Topic) ->
wolff_client:check_if_topic_exists(Hosts, ConnConfig, Topic).

%% @doc Check if a topic exists using a supervised client or a client porcess.
-spec check_if_topic_exists(client_id() | pid(), topic()) -> ok | {error, unknown_topic_or_partition | any()}.
check_if_topic_exists(ClientId, Topic) when is_binary(ClientId) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} -> check_if_topic_exists(Pid, Topic);
{error, Error} -> {error, Error}
end;
check_if_topic_exists(ClientPid, Topic) when is_pid(ClientPid) ->
wolff_client:check_topic_exists_with_client_pid(ClientPid, Topic).
16 changes: 13 additions & 3 deletions src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-export([start_link/3, stop/1]).
-export([get_leader_connections/2, recv_leader_connection/4, get_id/1, delete_producers_metadata/2]).
-export([check_connectivity/1, check_connectivity/2]).
-export([check_if_topic_exists/2, check_if_topic_exists/3]).
-export([check_if_topic_exists/2, check_if_topic_exists/3, check_topic_exists_with_client_pid/2]).

%% gen_server callbacks
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]).
Expand Down Expand Up @@ -104,6 +104,7 @@ check_connectivity(Hosts, ConnConfig) when Hosts =/= [] ->
{error, tr_reasons(Reasons)}
end.

%% @doc Check if a topic exists by creating a temp connecton to any of the seed hosts.
-spec check_if_topic_exists([host()], kpro:conn_config(), topic()) ->
ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}.
check_if_topic_exists(Hosts, ConnConfig, Topic) when Hosts =/= [] ->
Expand All @@ -126,6 +127,13 @@ check_if_topic_exists(Pid, Topic) when is_pid(Pid) ->
{error, Errors}
end.

%% @doc Check if a topic exists by calling a already started client process.
%% In contrast, check_if_topic_exists/3 creates a temp connection to do the work.
-spec check_topic_exists_with_client_pid(pid(), topic()) ->
ok | {error, unknown_topic_or_partition | any()}.
check_topic_exists_with_client_pid(Pid, Topic) ->
safe_call(Pid, {check_if_topic_exists, Topic}).

safe_call(Pid, Call) ->
try gen_server:call(Pid, Call, infinity)
catch exit : Reason -> {error, Reason}
Expand All @@ -146,6 +154,8 @@ handle_call(Call, From, #{connect := _Fun} = St) ->
handle_call(Call, From, upgrade(St));
handle_call(get_id, _From, #{client_id := Id} = St) ->
{reply, Id, St};
handle_call({check_if_topic_exists, Topic}, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) ->
{reply, check_if_topic_exists(Hosts, ConnConfig, Topic), St};
handle_call({get_leader_connections, Topic}, _From, St0) ->
case ensure_leader_connections(St0, Topic) of
{ok, St} ->
Expand All @@ -161,8 +171,8 @@ handle_call(stop, From, #{conns := Conns} = St) ->
handle_call(check_connectivity, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) ->
Res = check_connectivity(Hosts, ConnConfig),
{reply, Res, St};
handle_call(_Call, _From, St) ->
{noreply, St}.
handle_call(Call, _From, St) ->
{reply, {error, {unknown_call, Call}}, St}.

handle_info(_Info, St) ->
{noreply, upgrade(St)}.
Expand Down
8 changes: 6 additions & 2 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ send_test() ->
CntrEventsTable = ets:new(cntr_events, [public]),
install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),
ClientCfg = client_config(),
{ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg),
ClientId = <<"client-1">>,
Topic = <<"test-topic">>,
{ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg),
?assertEqual(ok, wolff:check_if_topic_exists(ClientId, Topic)),
ProducerCfg = #{partitioner => fun(_, _) -> 0 end},
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg),
{ok, Producers} = wolff:start_producers(Client, Topic, ProducerCfg),
Msg = #{key => ?KEY, value => <<"value">>},
Ref = make_ref(),
Self = self(),
Expand Down Expand Up @@ -579,6 +582,7 @@ test_producers_init_failure() ->
Topic = <<"test-topic">>,
ClientCfg = #{connection_strategy => per_broker},
{ok, ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg),
?assertEqual({error, {unknown_call, foo}}, gen_server:call(ClientPid, foo)),
%% suspend the client so it will not respond to calls
sys:suspend(ClientPid),
ProducerCfg = #{required_acks => all_isr,
Expand Down

0 comments on commit 9518667

Please sign in to comment.