From 306376532054ca9319e22c937b74454e6bab5d30 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 7 Feb 2024 16:13:10 +0100 Subject: [PATCH 1/2] fix(wolff_client): keep metadata connection alive --- changelog.md | 5 +++ src/wolff.app.src | 2 +- src/wolff.appup.src | 6 +-- src/wolff_client.erl | 103 ++++++++++++++++++++++++++++++------------- test/wolff_tests.erl | 36 +++++++++++++++ 5 files changed, 118 insertions(+), 34 deletions(-) diff --git a/changelog.md b/changelog.md index a4ac8ca..419fe4e 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,8 @@ +* 1.5.13 + - Use long-lived metadata connection. + This is to avoid having to excessively re-establish connection when there are many concurrent connectivity checks. +* 1.5.12 + - Fix connection error reason translation, the error log is now more compact when e.g. connect timeout happens. * 1.5.11 - Fixed a try catch pattern in `gen_server` call towards client process, this should prevent `wolff_producers` from crash if `wolff_client` is killed during initialization. * 1.5.10 diff --git a/src/wolff.app.src b/src/wolff.app.src index 81d4716..ef8954b 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "1.5.12"}, + {vsn, "1.5.13"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff.appup.src b/src/wolff.appup.src index 969f474..092a91d 100644 --- a/src/wolff.appup.src +++ b/src/wolff.appup.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- -{"1.5.12", +{"1.5.13", [ - {"1.5.11", + {<<"1\\.5\\.1[1-2]">>, [ {load_module, wolff_client, brutal_purge, soft_purge, []} ]}, {"1.5.10", @@ -37,7 +37,7 @@ } ], [ - {"1.5.11", + {<<"1\\.5\\.1[1-2]">>, [ {load_module, wolff_client, brutal_purge, soft_purge, []} ]}, {"1.5.10", diff --git a/src/wolff_client.erl b/src/wolff_client.erl index 21ae393..91d4d87 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -40,6 +40,7 @@ config := config(), conn_config := kpro:conn_config(), conns := #{conn_id() => connection()}, + metadata_conn := pid() | not_initialized | down, metadata_ts := #{topic() => erlang:timestamp()}, %% only applicable when connection strategy is per_broker %% because in this case the connections are keyed by host() @@ -70,6 +71,7 @@ start_link(ClientId, Hosts, Config) -> config => MyCfg, conn_config => ConnCfg, conns => #{}, + metadata_conn => not_initialized, metadata_ts => #{}, leaders => #{} }, @@ -89,19 +91,21 @@ get_id(Pid) -> get_leader_connections(Client, Topic) -> safe_call(Client, {get_leader_connections, Topic}). +%% @doc Check if client has a metadata connection alive. +%% Trigger a reconnect if the connection is down for whatever reason. -spec check_connectivity(pid()) -> ok | {error, any()}. check_connectivity(Pid) -> safe_call(Pid, check_connectivity). +%% @doc Connect to any host in the list and immediately disconnect. -spec check_connectivity([host()], kpro:conn_config()) -> ok | {error, any()}. check_connectivity(Hosts, ConnConfig) -> - case kpro:connect_any(Hosts, ConnConfig) of - {ok, Conn} -> - close_connection(Conn), - ok; - {error, Reasons} -> - {error, tr_reasons(Reasons)} - end. + case kpro:connect_any(Hosts, ConnConfig) of + {ok, Conn} -> + ok = close_connection(Conn); + {error, Reasons} -> + {error, tr_reasons(Reasons)} + end. safe_call(Pid, Call) -> try gen_server:call(Pid, Call, infinity) @@ -135,9 +139,21 @@ handle_call(stop, From, #{conns := Conns} = St) -> ok = close_connections(Conns), gen_server:reply(From, ok), {stop, normal, St#{conns := #{}}}; -handle_call(check_connectivity, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) -> - Res = check_connectivity(Hosts, ConnConfig), - {reply, Res, St}; +handle_call(check_connectivity, _From, + #{seed_hosts := Hosts, + conn_config := ConnConfig} = St) -> + Pid = maps:get(metadata_conn, St, none), + case is_pid(Pid) andalso erlang:is_process_alive(Pid) of + true -> + {reply, ok, St}; + false -> + case kpro:connect_any(Hosts, ConnConfig) of + {ok, NewPid} -> + {reply, ok, St#{metadata_conn => NewPid}}; + {error, Reasons} -> + {reply, {error, tr_reasons(Reasons)}, St} + end + end; handle_call(_Call, _From, St) -> {noreply, St}. @@ -171,7 +187,9 @@ code_change(_OldVsn, St, _Extra) -> {ok, St}. terminate(_, #{conns := Conns} = St) -> + MetadataConn = maps:get(metadata_conn, St, none), ok = close_connections(Conns), + ok = close_connection(MetadataConn), {ok, St#{conns := #{}}}. %% == internals ====================================================== @@ -228,24 +246,38 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) -> ensure_leader_connections(St, Topic) -> case is_metadata_fresh(St, Topic) of true -> {ok, St}; - false -> do_ensure_leader_connections(St, Topic) + false -> ensure_leader_connections2(St, Topic) end. -do_ensure_leader_connections(#{conn_config := ConnConfig, - seed_hosts := SeedHosts, - metadata_ts := MetadataTs - } = St0, Topic) -> - case get_metadata(SeedHosts, ConnConfig, Topic, []) of +ensure_leader_connections2(#{metadata_conn := Pid} = St, Topic) when is_pid(Pid) -> + case do_get_metadata(Pid, Topic) of {ok, {Brokers, PartitionMetaList}} -> - St = lists:foldl(fun(PartitionMeta, StIn) -> - ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta) - end, St0, PartitionMetaList), - {ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()}}}; + ensure_leader_connections3(St, Topic, Pid, Brokers, PartitionMetaList); + {error, _Reason} -> + %% ensure metadata connection is down, try to establish a new one in the next clause, + %% reason is discarded here, because the next clause will log error if the immediate retry fails + exit(Pid, kill), + ensure_leader_connections2(St#{metadata_conn => down}, Topic) + end; +ensure_leader_connections2(#{conn_config := ConnConfig, + seed_hosts := SeedHosts} = St, Topic) -> + case get_metadata(SeedHosts, ConnConfig, Topic, []) of + {ok, {ConnPid, {Brokers, PartitionMetaList}}} -> + ensure_leader_connections3(St, Topic, ConnPid, Brokers, PartitionMetaList); {error, Reason} -> log_warn("Failed to get metadata\nreason: ~p", [Reason]), {error, failed_to_fetch_metadata} end. +ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, Topic, + ConnPid, Brokers, PartitionMetaList) -> + St = lists:foldl(fun(PartitionMeta, StIn) -> + ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta) + end, St0, PartitionMetaList), + {ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()}, + metadata_conn => ConnPid + }}. + %% This function ensures each Topic-Partition pair has a connection record %% either a pid when the leader is healthy, or the error reason %% if failed to discover the leader or failed to connect to the leader @@ -354,23 +386,32 @@ split_config(Config) -> {maps:from_list(ConnCfg), maps:from_list(MyCfg)}. get_metadata([], _ConnectFun, _Topic, Errors) -> - %% failed to connect to ALL seed hosts, crash instead of return {error, Reason} {error, Errors}; get_metadata([Host | Rest], ConnConfig, Topic, Errors) -> case do_connect(Host, ConnConfig) of {ok, Pid} -> - try - {ok, Vsns} = kpro:get_api_versions(Pid), - {_, Vsn} = maps:get(metadata, Vsns), - do_get_metadata(Vsn, Pid, Topic) - after - _ = close_connection(Pid) + case do_get_metadata(Pid, Topic) of + {ok, Result} -> + {ok, {Pid, Result}}; + {error, Reason} -> + %% failed to fetch metadata, make sure this connection is closed + ok = close_connection(Pid), + {error, Reason} end; {error, Reason} -> get_metadata(Rest, ConnConfig, Topic, [Reason | Errors]) end. -do_get_metadata(Vsn, Connection, Topic) -> +do_get_metadata(Connection, Topic) -> + case kpro:get_api_versions(Connection) of + {ok, Vsns} -> + {_, Vsn} = maps:get(metadata, Vsns), + do_get_metadata2(Vsn, Connection, Topic); + {error, Reason} -> + {error, Reason} + end. + +do_get_metadata2(Vsn, Connection, Topic) -> Req = kpro:make_request(metadata, Vsn, [{topics, [Topic]}, {allow_auto_topic_creation, false}]), case kpro:request_sync(Connection, Req, ?DEFAULT_METADATA_TIMEOUT) of @@ -381,8 +422,10 @@ do_get_metadata(Vsn, Connection, Topic) -> ErrorCode = kpro:find(error_code, TopicMeta), Partitions = kpro:find(partition_metadata, TopicMeta), case ErrorCode =:= ?no_error of - true -> {ok, {Brokers, Partitions}}; - false -> {error, ErrorCode} %% no such topic ? + true -> + {ok, {Brokers, Partitions}}; + false -> + {error, ErrorCode} %% no such topic ? end; {error, Reason} -> {error, Reason} diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index 47f35d9..d1c60cb 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -33,6 +33,42 @@ ack_cb(Partition, Offset, Self, Ref) -> Self ! {ack, Ref, Partition, Offset}, ok. +metadata_connection_restart_test() -> + ClientCfg = client_config(), + ClientId = <<"client-1">>, + {ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg), + GetMetadataConn = fun() -> + ok = wolff:check_connectivity(ClientId), + ok = wolff_client:check_connectivity(Client), + State = sys:get_state(Client), + Pid = maps:get(metadata_conn, State), + ?assert(is_process_alive(Pid)), + Pid + end, + Pid1 = GetMetadataConn(), + exit(Pid1, kill), + Pid2 = GetMetadataConn(), + ok = stop_client(Client), + ?assertNot(is_process_alive(Pid2)). + +metadata_connection_restart2_test() -> + ClientCfg0 = client_config(), + ClientCfg = ClientCfg0#{min_metadata_refresh_interval => 0}, + ClientId = <<"client-1">>, + {ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg), + GetMetadataConn = fun() -> + ?assertMatch({ok, _}, wolff_client:get_leader_connections(Client, <<"test-topic">>)), + State = sys:get_state(Client), + Pid = maps:get(metadata_conn, State), + ?assert(is_process_alive(Pid)), + Pid + end, + Pid1 = GetMetadataConn(), + exit(Pid1, kill), + Pid2 = GetMetadataConn(), + ok = stop_client(Client), + ?assertNot(is_process_alive(Pid2)). + send_test() -> ClientCfg = client_config(), {ok, Client} = start_client(<<"client-1">>, ?HOSTS, ClientCfg), From 6ceb221d9f7496976f921d6ed4275365508e243e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 7 Feb 2024 21:11:56 +0100 Subject: [PATCH 2/2] fix(wolff_client): change internal export deprecation to a comment --- src/wolff_client.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wolff_client.erl b/src/wolff_client.erl index 06da4fc..908caa6 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -32,7 +32,7 @@ -export_type([config/0]). --deprecated({check_if_topic_exists, 3}). +%% deprecated -export([check_if_topic_exists/3]). -type config() :: map().