diff --git a/changelog.md b/changelog.md index b619c0e..84e1757 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,8 @@ +* 1.10.2 (merge 1.5.13) + - Use long-lived metadata connection. + This is to avoid having to excessively re-establish connection when there are many concurrent connectivity checks. + - Fix connection error reason translation, the error log is now more compact when e.g. connect timeout happens. + * 1.10.1 - Add `max_partitions` producer config to limit the number of partition producers so the client side is also possible to have control over resource utilization. diff --git a/src/wolff.app.src b/src/wolff.app.src index 5d0f201..8866472 100644 --- a/src/wolff.app.src +++ b/src/wolff.app.src @@ -1,6 +1,6 @@ {application, wolff, [{description, "Kafka's publisher"}, - {vsn, "1.10.1"}, + {vsn, "1.10.2"}, {registered, []}, {applications, [kernel, diff --git a/src/wolff.appup.src b/src/wolff.appup.src index 312eb38..5a5a3a5 100644 --- a/src/wolff.appup.src +++ b/src/wolff.appup.src @@ -1,5 +1,5 @@ %% -*- mode: erlang; -*- -{"1.9.1", +{"1.10.2", [ ], [ diff --git a/src/wolff.erl b/src/wolff.erl index bd0a62f..0491b74 100644 --- a/src/wolff.erl +++ b/src/wolff.erl @@ -48,6 +48,7 @@ -export_type([client_id/0, host/0, producers/0, msg/0, ack_fun/0, partitioner/0, name/0, offset_reply/0, topic/0]). +-deprecated({check_if_topic_exists, 3}). -deprecated({stop_and_delete_supervised_producers, 3}). -type client_id() :: binary(). @@ -158,7 +159,7 @@ check_connectivity(ClientId) -> check_connectivity(Hosts, ConnConfig) -> wolff_client:check_connectivity(Hosts, ConnConfig). -%% @doc Check if the cluster is reachable and the topic is created. +%% @hidden Deprecated. Check if the cluster is reachable and the topic is created. -spec check_if_topic_exists([host()], wolff_client:config(), topic()) -> ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}. check_if_topic_exists(Hosts, ConnConfig, Topic) -> diff --git a/src/wolff_client.erl b/src/wolff_client.erl index 78f1638..908caa6 100644 --- a/src/wolff_client.erl +++ b/src/wolff_client.erl @@ -25,13 +25,16 @@ get_id/1, delete_producers_metadata/2]). -export([check_connectivity/1, check_connectivity/2]). --export([check_if_topic_exists/2, check_if_topic_exists/3, check_topic_exists_with_client_pid/2]). +-export([check_topic_exists_with_client_pid/2]). %% gen_server callbacks -export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1, terminate/2]). -export_type([config/0]). +%% deprecated +-export([check_if_topic_exists/3]). + -type config() :: map(). -type topic() :: kpro:topic(). -type partition() :: kpro:partition(). @@ -45,6 +48,7 @@ config := config(), conn_config := kpro:conn_config(), conns := #{conn_id() => connection()}, + metadata_conn := pid() | not_initialized | down, metadata_ts := #{topic() => erlang:timestamp()}, %% only applicable when connection strategy is per_broker %% because in this case the connections are keyed by host() @@ -75,6 +79,7 @@ start_link(ClientId, Hosts, Config) -> config => MyCfg, conn_config => ConnCfg, conns => #{}, + metadata_conn => not_initialized, metadata_ts => #{}, leaders => #{} }, @@ -99,39 +104,29 @@ get_leader_connections(Client, Topic) -> get_leader_connections(Client, Topic, MaxPartitions) -> safe_call(Client, {get_leader_connections, Topic, MaxPartitions}). +%% @doc Check if client has a metadata connection alive. +%% Trigger a reconnect if the connection is down for whatever reason. -spec check_connectivity(pid()) -> ok | {error, any()}. check_connectivity(Pid) -> safe_call(Pid, check_connectivity). +%% @doc Connect to any host in the list and immediately disconnect. -spec check_connectivity([host()], kpro:conn_config()) -> ok | {error, any()}. check_connectivity(Hosts, ConnConfig) when Hosts =/= [] -> - case kpro:connect_any(Hosts, ConnConfig) of - {ok, Conn} -> - close_connection(Conn), - ok; - {error, Reasons} -> - {error, tr_reasons(Reasons)} - end. + case kpro:connect_any(Hosts, ConnConfig) of + {ok, Conn} -> + ok = close_connection(Conn); + {error, Reasons} -> + {error, tr_reasons(Reasons)} + end. -%% @doc Check if a topic exists by creating a temp connecton to any of the seed hosts. +%% @hidden Deprecated. Check if a topic exists by creating a temp connecton to any of the seed hosts. -spec check_if_topic_exists([host()], kpro:conn_config(), topic()) -> ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}. check_if_topic_exists(Hosts, ConnConfig, Topic) when Hosts =/= [] -> case get_metadata(Hosts, ConnConfig, Topic) of - {ok, _} -> - ok; - {error, Errors} -> - {error, Errors} - end. - --spec check_if_topic_exists(pid(), topic()) -> - ok | {error, unknown_topic_or_partition | [#{host := binary(), reason := term()}] | any()}. -check_if_topic_exists(Pid, Topic) when is_pid(Pid) -> - {ok, Vsns} = kpro:get_api_versions(Pid), - {_, Vsn} = maps:get(metadata, Vsns), - case do_get_metadata(Vsn, Pid, Topic) of - {ok, _} -> - ok; + {ok, {Pid, _}} -> + ok = close_connection(Pid); {error, Errors} -> {error, Errors} end. @@ -164,8 +159,14 @@ handle_call(Call, From, #{connect := _Fun} = St) -> handle_call(Call, From, upgrade(St)); handle_call(get_id, _From, #{client_id := Id} = St) -> {reply, Id, St}; -handle_call({check_if_topic_exists, Topic}, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) -> - {reply, check_if_topic_exists(Hosts, ConnConfig, Topic), St}; +handle_call({check_if_topic_exists, Topic}, _From, #{conn_config := ConnConfig} = St0) -> + case ensure_metadata_conn(St0) of + {ok, #{metadata_conn := ConnPid} = St} -> + Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), + {reply, check_if_topic_exists2(ConnPid, Topic, Timeout), St}; + {error, Reason} -> + {reply, {error, Reason}, St0} + end; handle_call({get_leader_connections, Topic, MaxPartitions}, _From, St0) -> case ensure_leader_connections(St0, Topic, MaxPartitions) of {ok, St} -> @@ -178,9 +179,13 @@ handle_call(stop, From, #{conns := Conns} = St) -> ok = close_connections(Conns), gen_server:reply(From, ok), {stop, normal, St#{conns := #{}}}; -handle_call(check_connectivity, _From, #{seed_hosts := Hosts, conn_config := ConnConfig} = St) -> - Res = check_connectivity(Hosts, ConnConfig), - {reply, Res, St}; +handle_call(check_connectivity, _From, St0) -> + case ensure_metadata_conn(St0) of + {ok, St} -> + {reply, ok, St}; + {error, Reason} -> + {reply, {error, Reason}, St0} + end; handle_call(Call, _From, St) -> {reply, {error, {unknown_call, Call}}, St}. @@ -215,11 +220,34 @@ code_change(_OldVsn, St, _Extra) -> terminate(_, #{client_id := ClientID, conns := Conns} = St) -> ok = wolff_client_sup:deregister_client(ClientID), + MetadataConn = maps:get(metadata_conn, St, none), ok = close_connections(Conns), + ok = close_connection(MetadataConn), {ok, St#{conns := #{}}}. %% == internals ====================================================== +ensure_metadata_conn(#{seed_hosts := Hosts, conn_config := ConnConfig, metadata_conn := Pid} = St) -> + case is_alive(Pid) of + true -> + {ok, St}; + false -> + case kpro:connect_any(Hosts, ConnConfig) of + {ok, NewPid} -> + {ok, St#{metadata_conn => NewPid}}; + {error, Reasons} -> + {error, tr_reasons(Reasons)} + end + end. + +check_if_topic_exists2(Pid, Topic, Timeout) when is_pid(Pid) -> + case do_get_metadata(Pid, Topic, Timeout) of + {ok, _} -> + ok; + {error, Reason} -> + {error, Reason} + end. + close_connections(Conns) -> lists:foreach(fun({_, Pid}) -> close_connection(Pid) end, maps:to_list(Conns)). @@ -272,25 +300,40 @@ is_metadata_fresh(#{metadata_ts := Topics, config := Config}, Topic) -> ensure_leader_connections(St, Topic, MaxPartitions) -> case is_metadata_fresh(St, Topic) of true -> {ok, St}; - false -> do_ensure_leader_connections(St, Topic, MaxPartitions) + false -> ensure_leader_connections2(St, Topic, MaxPartitions) end. -do_ensure_leader_connections(#{conn_config := ConnConfig, - seed_hosts := SeedHosts, - metadata_ts := MetadataTs - } = St0, Topic, MaxPartitions) -> - case get_metadata(SeedHosts, ConnConfig, Topic) of - {ok, {Brokers, PartitionMetaList0}} -> - PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions), - St = lists:foldl(fun(PartitionMeta, StIn) -> - ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta) - end, St0, PartitionMetaList), - {ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()}}}; +ensure_leader_connections2(#{metadata_conn := Pid, conn_config := ConnConfig} = St, Topic, MaxPartitions) when is_pid(Pid) -> + Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), + case do_get_metadata(Pid, Topic, Timeout) of + {ok, {Brokers, PartitionMetaList}} -> + ensure_leader_connections3(St, Topic, Pid, Brokers, PartitionMetaList, MaxPartitions); + {error, _Reason} -> + %% ensure metadata connection is down, try to establish a new one in the next clause, + %% reason is discarded here, because the next clause will log error if the immediate retry fails + exit(Pid, kill), + ensure_leader_connections2(St#{metadata_conn => down}, Topic, MaxPartitions) + end; +ensure_leader_connections2(#{conn_config := ConnConfig, + seed_hosts := SeedHosts} = St, Topic, MaxPartitions) -> + case get_metadata(SeedHosts, ConnConfig, Topic, []) of + {ok, {ConnPid, {Brokers, PartitionMetaList}}} -> + ensure_leader_connections3(St, Topic, ConnPid, Brokers, PartitionMetaList, MaxPartitions); {error, Errors} -> log_warn(failed_to_fetch_metadata, #{topic => Topic, errors => Errors}), {error, failed_to_fetch_metadata} end. +ensure_leader_connections3(#{metadata_ts := MetadataTs} = St0, Topic, + ConnPid, Brokers, PartitionMetaList0, MaxPartitions) -> + PartitionMetaList = limit_partitions_count(PartitionMetaList0, MaxPartitions), + St = lists:foldl(fun(PartitionMeta, StIn) -> + ensure_leader_connection(StIn, Brokers, Topic, PartitionMeta) + end, St0, PartitionMetaList), + {ok, St#{metadata_ts := MetadataTs#{Topic => erlang:timestamp()}, + metadata_conn => ConnPid + }}. + limit_partitions_count(PartitionMetaList, Max) when is_integer(Max) andalso Max < length(PartitionMetaList) -> lists:sublist(PartitionMetaList, Max); limit_partitions_count(PartitionMetaList, _) -> @@ -409,27 +452,33 @@ get_metadata(Hosts, ConnectFun, Topic) -> get_metadata(Hosts, ConnectFun, Topic, []). get_metadata([], _ConnectFun, _Topic, Errors) -> - %% failed to connect to ALL seed hosts, crash instead of return {error, Reason} {error, Errors}; get_metadata([Host | Rest], ConnConfig, Topic, Errors) -> case do_connect(Host, ConnConfig) of {ok, Pid} -> - try - {ok, Vsns} = kpro:get_api_versions(Pid), - {_, Vsn} = maps:get(metadata, Vsns), - Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), - do_get_metadata(Vsn, Pid, Topic, Timeout) - after - _ = close_connection(Pid) + Timeout = maps:get(request_timeout, ConnConfig, ?DEFAULT_METADATA_TIMEOUT), + case do_get_metadata(Pid, Topic, Timeout) of + {ok, Result} -> + {ok, {Pid, Result}}; + {error, Reason} -> + %% failed to fetch metadata, make sure this connection is closed + ok = close_connection(Pid), + {error, Reason} end; {error, Reason} -> get_metadata(Rest, ConnConfig, Topic, [Reason | Errors]) end. -do_get_metadata(Vsn, Connection, Topic) -> - do_get_metadata(Vsn, Connection, Topic, ?DEFAULT_METADATA_TIMEOUT). +do_get_metadata(Connection, Topic, Timeout) -> + case kpro:get_api_versions(Connection) of + {ok, Vsns} -> + {_, Vsn} = maps:get(metadata, Vsns), + do_get_metadata2(Vsn, Connection, Topic, Timeout); + {error, Reason} -> + {error, Reason} + end. -do_get_metadata(Vsn, Connection, Topic, Timeout) -> +do_get_metadata2(Vsn, Connection, Topic, Timeout) -> Req = kpro_req_lib:metadata(Vsn, [Topic], _IsAutoCreateAllowed = false), case kpro:request_sync(Connection, Req, Timeout) of {ok, #kpro_rsp{msg = Meta}} -> @@ -439,8 +488,10 @@ do_get_metadata(Vsn, Connection, Topic, Timeout) -> ErrorCode = kpro:find(error_code, TopicMeta), Partitions = kpro:find(partitions, TopicMeta), case ErrorCode =:= ?no_error of - true -> {ok, {Brokers, Partitions}}; - false -> {error, ErrorCode} %% no such topic ? + true -> + {ok, {Brokers, Partitions}}; + false -> + {error, ErrorCode} %% no such topic ? end; {error, Reason} -> {error, Reason} diff --git a/test/wolff_tests.erl b/test/wolff_tests.erl index 9cefc5e..4f2869a 100644 --- a/test/wolff_tests.erl +++ b/test/wolff_tests.erl @@ -45,6 +45,42 @@ ack_cb(Partition, Offset, Self, Ref) -> Self ! {ack, Ref, Partition, Offset}, ok. +metadata_connection_restart_test() -> + ClientCfg = client_config(), + ClientId = <<"client-1">>, + {ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg), + GetMetadataConn = fun() -> + ok = wolff:check_connectivity(ClientId), + ok = wolff_client:check_connectivity(Client), + State = sys:get_state(Client), + Pid = maps:get(metadata_conn, State), + ?assert(is_process_alive(Pid)), + Pid + end, + Pid1 = GetMetadataConn(), + exit(Pid1, kill), + Pid2 = GetMetadataConn(), + ok = stop_client(Client), + ?assertNot(is_process_alive(Pid2)). + +metadata_connection_restart2_test() -> + ClientCfg0 = client_config(), + ClientCfg = ClientCfg0#{min_metadata_refresh_interval => 0}, + ClientId = <<"client-1">>, + {ok, Client} = start_client(ClientId, ?HOSTS, ClientCfg), + GetMetadataConn = fun() -> + ?assertMatch({ok, _}, wolff_client:get_leader_connections(Client, <<"test-topic">>)), + State = sys:get_state(Client), + Pid = maps:get(metadata_conn, State), + ?assert(is_process_alive(Pid)), + Pid + end, + Pid1 = GetMetadataConn(), + exit(Pid1, kill), + Pid2 = GetMetadataConn(), + ok = stop_client(Client), + ?assertNot(is_process_alive(Pid2)). + send_test() -> CntrEventsTable = ets:new(cntr_events, [public]), install_event_logging(?FUNCTION_NAME, CntrEventsTable, false),