From 89ab927f2c9fc5b7a2ce05c59f07c40b12110f16 Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Tue, 4 Jun 2024 13:22:16 +0300 Subject: [PATCH 1/4] Add missing messages to debugging `get_api_vsns` and `{ssl, Sock, Bin}` were missing from print_msg/3 cases and were reported as "unknown messages". --- src/kpro_connection.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index 1df45ed..bdd0885 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -500,8 +500,12 @@ format_status(Opt, Status) -> print_msg(Device, {_From, {send, Request}}, State) -> do_print_msg(Device, "send: ~p", [Request], State); +print_msg(Device, {_From, {get_api_vsns, Request}}, State) -> + do_print_msg(Device, "get_api_vsns", [Request], State); print_msg(Device, {tcp, _Sock, Bin}, State) -> do_print_msg(Device, "tcp: ~p", [Bin], State); +print_msg(Device, {ssl, _Sock, Bin}, State) -> + do_print_msg(Device, "ssl: ~p", [Bin], State); print_msg(Device, {tcp_closed, _Sock}, State) -> do_print_msg(Device, "tcp_closed", [], State); print_msg(Device, {tcp_error, _Sock, Reason}, State) -> From 09af52dbe0b9e10dd7a1eb087767d33b4488ed73 Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Fri, 9 Aug 2024 14:16:26 +0300 Subject: [PATCH 2/4] Allow SASL callback modules to return {ok, ServerResponse} This expansion of the callback return values allows `kpro_connection` to interrogate the server response message, in preparation for re-authenticating SASL connections before session lifetime expires. Authentication was moved to a separate function to allow repeating authentication flow, which also required storing connection configuration in process state. --- src/kpro_auth_backend.erl | 10 ++++++---- src/kpro_connection.erl | 31 +++++++++++++++++++++++-------- src/kpro_sasl.erl | 2 ++ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/kpro_auth_backend.erl b/src/kpro_auth_backend.erl index 568cbf6..60c9f3e 100644 --- a/src/kpro_auth_backend.erl +++ b/src/kpro_auth_backend.erl @@ -18,15 +18,17 @@ -export([auth/8]). +-type server_auth_response() :: term(). + -callback auth(Host :: string(), Sock :: gen_tcp:socket() | ssl:sslsocket(), Mod :: gen_tcp | ssl, ClientName :: binary(), Timeout :: pos_integer(), SaslOpts :: term()) -> - ok | {error, Reason :: term()}. + ok | {ok, server_auth_response()} | {error, Reason :: term()}. -callback auth(Host :: string(), Sock :: gen_tcp:socket() | ssl:sslsocket(), HandShakeVsn :: non_neg_integer(), Mod :: gen_tcp | ssl, ClientName :: binary(), Timeout :: pos_integer(), SaslOpts :: term()) -> - ok | {error, Reason :: term()}. + ok | {ok, server_auth_response()} | {error, Reason :: term()}. -optional_callbacks([auth/6]). @@ -34,7 +36,7 @@ Sock :: gen_tcp:socket() | ssl:sslsocket(), Mod :: gen_tcp | ssl, ClientName :: binary(), Timeout :: pos_integer(), SaslOpts :: term()) -> - ok | {error, Reason :: term()}. + ok | {ok, server_auth_response()} | {error, Reason :: term()}. auth(CallbackModule, Host, Sock, Mod, ClientName, Timeout, SaslOpts) -> CallbackModule:auth(Host, Sock, Mod, ClientName, Timeout, SaslOpts). @@ -43,7 +45,7 @@ auth(CallbackModule, Host, Sock, Mod, ClientName, Timeout, SaslOpts) -> HandShakeVsn :: non_neg_integer(), Mod :: gen_tcp | ssl, ClientName :: binary(), Timeout :: pos_integer(), SaslOpts :: term()) -> - ok | {error, Reason :: term()}. + ok | {ok, server_auth_response()} | {error, Reason :: term()}. auth(CallbackModule, Host, Sock, Vsn, Mod, ClientName, Timeout, SaslOpts) -> case is_exported(CallbackModule, auth, 7) of true -> diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index bdd0885..82eb5d3 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -93,6 +93,7 @@ -record(state, { client_id :: client_id() , parent :: pid() + , config :: config() , remote :: kpro:endpoint() , sock :: gen_tcp:socket() | ssl:sslsocket() , mod :: ?undef | gen_tcp | ssl @@ -229,6 +230,7 @@ connect(Parent, Host, Port, Config) -> State = #state{ client_id = get_client_id(Config) , parent = Parent , remote = {Host, Port} + , config = Config , sock = Sock }, init_connection(State, Config, Deadline); @@ -260,14 +262,8 @@ init_connection(#state{ client_id = ClientId #{query_api_versions := false} -> ?undef; _ -> query_api_versions(NewSock, Mod, ClientId, Deadline) end, - HandshakeVsn = case Versions of - #{sasl_handshake := {_, V}} -> V; - _ -> 0 - end, - SaslOpts = get_sasl_opt(Config), - ok = kpro_sasl:auth(Host, NewSock, Mod, ClientId, - timeout(Deadline), SaslOpts, HandshakeVsn), - State#state{mod = Mod, sock = NewSock, api_vsns = Versions}. + State1 = State#state{mod = Mod, sock = NewSock, api_vsns = Versions}, + sasl_authenticate(State1). query_api_versions(Sock, Mod, ClientId, Deadline) -> Req = kpro_req_lib:make(api_versions, 0, []), @@ -476,6 +472,25 @@ handle_msg(Msg, #state{} = State, Debug) -> [?MODULE, self(), Msg]), ?MODULE:loop(State, Debug). +sasl_authenticate(#state{client_id = ClientId, mod = Mod, sock = Sock, remote = {Host, _Port}, api_vsns = Versions, config = Config} = State) -> + Timeout = get_connect_timeout(Config), + Deadline = deadline(Timeout), + SaslOpts = get_sasl_opt(Config), + HandshakeVsn = case Versions of + #{sasl_handshake := {_, V}} -> V; + _ -> 0 + end, + ok = setopts(Sock, Mod, [{active, false}]), + case kpro_sasl:auth(Host, Sock, Mod, ClientId, + timeout(Deadline), SaslOpts, HandshakeVsn) of + ok -> + ok; + {ok, _ServerResponse} -> + ok + end, + ok = setopts(Sock, Mod, [{active, once}]), + State. + cast(Pid, Msg) -> try Pid ! Msg, diff --git a/src/kpro_sasl.erl b/src/kpro_sasl.erl index ec274a0..f09814a 100644 --- a/src/kpro_sasl.erl +++ b/src/kpro_sasl.erl @@ -39,6 +39,8 @@ auth(Host, Sock, Mod, ClientId, Timeout, ClientId, Timeout, Opts) of ok -> ok; + {ok, ServerResponse} -> + {ok, ServerResponse}; {error, Reason} -> ?ERROR(Reason) end; From 0cfeaf46a06e2e5d2ed0fdc87ecf45e2235dae3b Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Fri, 9 Aug 2024 14:31:28 +0300 Subject: [PATCH 3/4] Reauthenticate SASL connections based on session lifetime The broker response to a SASL authentication request can contain a maximum session lifetime (see the [KIP][kip]). Session lifetime is returned by the broker in [Version 1 SaslAuthenticate Response][sasl_authenticate_protocol]. When a SASL authentication callback returns `{ok, ServerResponse}` and the ServerResponse contains a larger than 0 session lifetime, kpro_connection automatically sets a timer to re-authenticate in half the session lifetime. As kpro_sasl mechanisms are synchronous, in-flight requests must first be drained to ensure that kpro_sasl receives a response to its own SASL request. The draining algorithm behaves as follows: * `sasl_authenticate` message handler adds the message onto the backlog and immediately flush the backlog if there are no in-flight requests. * `{From, {send, Request}}` handler adds the request onto the backlog if the backlog has any items to allow in-flight requests to drain. * Inbound message handler flushes the backlog if in-flight requests are empty. [kip]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-368%3A+Allow+SASL+Connections+to+Periodically+Re-Authenticate [sasl_authenticate_protocol]: https://kafka.apache.org/protocol#The_Messages_SaslAuthenticate --- src/kpro_connection.erl | 108 +++++++++++++++++++++++---------- src/kpro_sent_reqs.erl | 4 ++ test/kpro_connection_tests.erl | 29 +++++++++ 3 files changed, 109 insertions(+), 32 deletions(-) diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index 82eb5d3..8aca105 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -100,6 +100,7 @@ , req_timeout :: ?undef | timeout() , api_vsns :: ?undef | kpro:vsn_ranges() , requests :: ?undef | requests() + , backlog :: false | queue:queue() }). -type state() :: #state{}. @@ -227,11 +228,12 @@ connect(Parent, Host, Port, Config) -> SockOpts = [{active, false}, binary] ++ get_extra_sock_opts(Config), case gen_tcp:connect(Host, Port, SockOpts, Timeout) of {ok, Sock} -> - State = #state{ client_id = get_client_id(Config) - , parent = Parent - , remote = {Host, Port} - , config = Config - , sock = Sock + State = #state{ client_id = get_client_id(Config) + , parent = Parent + , remote = {Host, Port} + , config = Config + , sock = Sock + , backlog = false }, init_connection(State, Config, Deadline); {error, Reason} -> @@ -404,7 +406,8 @@ handle_msg({_, Sock, Bin}, #state{ sock = Sock Rsp = kpro_rsp_lib:decode(API, Vsn, Body, Ref), ok = cast(Caller, {msg, self(), Rsp}), NewRequests = kpro_sent_reqs:del(Requests, CorrId), - ?MODULE:loop(State#state{requests = NewRequests}, Debug); + State1 = maybe_flush_backlog(State#state{requests = NewRequests}), + ?MODULE:loop(State1, Debug); handle_msg(assert_max_req_age, #state{ requests = Requests , req_timeout = ReqTimeout } = State, Debug) -> @@ -422,12 +425,41 @@ handle_msg({tcp_error, Sock, Reason}, #state{sock = Sock}, _) -> exit({tcp_error, Reason}); handle_msg({ssl_error, Sock, Reason}, #state{sock = Sock}, _) -> exit({ssl_error, Reason}); -handle_msg({From, {send, Request}}, - #state{ client_id = ClientId - , mod = Mod - , sock = Sock - , requests = Requests - } = State, Debug) -> +handle_msg({_From, {send, _}} = Msg, #state{backlog = false} = State, Debug) -> + State1 = send_request(Msg, State), + ?MODULE:loop(State1, Debug); +handle_msg({_From, {send, _}} = Msg, #state{backlog = Q} = State, Debug) -> + %% Avoid sending new requests until in-flight requests have been resolved + State1 = State#state{backlog = queue:in(Msg, Q)}, + ?MODULE:loop(State1, Debug); +handle_msg({From, get_api_vsns}, State, Debug) -> + maybe_reply(From, {ok, State#state.api_vsns}), + ?MODULE:loop(State, Debug); +handle_msg({From, get_endpoint}, State, Debug) -> + maybe_reply(From, {ok, State#state.remote}), + ?MODULE:loop(State, Debug); +handle_msg({From, get_tcp_sock}, State, Debug) -> + maybe_reply(From, {ok, State#state.sock}), + ?MODULE:loop(State, Debug); +handle_msg({From, stop}, #state{mod = Mod, sock = Sock}, _Debug) -> + Mod:close(Sock), + maybe_reply(From, ok), + ok; +handle_msg(sasl_authenticate, State, Debug) -> + State1 = State#state{backlog = queue:from_list([sasl_authenticate])}, + State2 = maybe_flush_backlog(State1), + ?MODULE:loop(State2, Debug); +handle_msg(Msg, #state{} = State, Debug) -> + error_logger:warning_msg("[~p] ~p got unrecognized message: ~p", + [?MODULE, self(), Msg]), + ?MODULE:loop(State, Debug). + +send_request({From, {send, Request}}, + #state{ client_id = ClientId + , mod = Mod + , sock = Sock + , requests = Requests + } = State) -> {Caller, _Ref} = From, #kpro_req{api = API, vsn = Vsn} = Request, {CorrId, NewRequests} = @@ -453,24 +485,25 @@ handle_msg({From, {send, Request}}, ], exit({send_error, Reason}) end, - ?MODULE:loop(State#state{requests = NewRequests}, Debug); -handle_msg({From, get_api_vsns}, State, Debug) -> - maybe_reply(From, {ok, State#state.api_vsns}), - ?MODULE:loop(State, Debug); -handle_msg({From, get_endpoint}, State, Debug) -> - maybe_reply(From, {ok, State#state.remote}), - ?MODULE:loop(State, Debug); -handle_msg({From, get_tcp_sock}, State, Debug) -> - maybe_reply(From, {ok, State#state.sock}), - ?MODULE:loop(State, Debug); -handle_msg({From, stop}, #state{mod = Mod, sock = Sock}, _Debug) -> - Mod:close(Sock), - maybe_reply(From, ok), - ok; -handle_msg(Msg, #state{} = State, Debug) -> - error_logger:warning_msg("[~p] ~p got unrecognized message: ~p", - [?MODULE, self(), Msg]), - ?MODULE:loop(State, Debug). + State#state{requests = NewRequests}. + +maybe_flush_backlog(#state{backlog = false} = State) -> + State; +maybe_flush_backlog(#state{requests = Requests, backlog = Backlog} = State) -> + case kpro_sent_reqs:is_empty(Requests) of + true -> + NewState = case queue:out(Backlog) of + {{value, sasl_authenticate}, RemainingBacklog} -> + sasl_authenticate(State#state{backlog = RemainingBacklog}); + {{value, {_From, {send, _}} = Msg}, RemainingBacklog} -> + send_request(Msg, State#state{backlog = RemainingBacklog}); + {empty, _} -> + State#state{backlog = false} + end, + maybe_flush_backlog(NewState); + false -> + State + end. sasl_authenticate(#state{client_id = ClientId, mod = Mod, sock = Sock, remote = {Host, _Port}, api_vsns = Versions, config = Config} = State) -> Timeout = get_connect_timeout(Config), @@ -485,8 +518,17 @@ sasl_authenticate(#state{client_id = ClientId, mod = Mod, sock = Sock, remote = timeout(Deadline), SaslOpts, HandshakeVsn) of ok -> ok; - {ok, _ServerResponse} -> - ok + {ok, ServerResponse} -> + case find(session_lifetime_ms, ServerResponse) of + Lifetime when is_integer(Lifetime) andalso Lifetime > 0 -> + %% Broker can report back a maximal session lifetime: https://kafka.apache.org/protocol#The_Messages_SaslAuthenticate. + %% Respect the session lifetime by draining in-flight requests and re-authenticating in half the time. + ReauthenticationDeadline = Lifetime div 2, + _ = erlang:send_after(ReauthenticationDeadline, self(), sasl_authenticate), + ok; + _ -> + ok + end end, ok = setopts(Sock, Mod, [{active, once}]), State. @@ -517,6 +559,8 @@ print_msg(Device, {_From, {send, Request}}, State) -> do_print_msg(Device, "send: ~p", [Request], State); print_msg(Device, {_From, {get_api_vsns, Request}}, State) -> do_print_msg(Device, "get_api_vsns", [Request], State); +print_msg(Device, sasl_authenticate, State) -> + do_print_msg(Device, "sasl_authenticate", [], State); print_msg(Device, {tcp, _Sock, Bin}, State) -> do_print_msg(Device, "tcp: ~p", [Bin], State); print_msg(Device, {ssl, _Sock, Bin}, State) -> diff --git a/src/kpro_sent_reqs.erl b/src/kpro_sent_reqs.erl index 2dc09f9..8dc8161 100644 --- a/src/kpro_sent_reqs.erl +++ b/src/kpro_sent_reqs.erl @@ -33,6 +33,7 @@ , get_corr_id/1 , increment_corr_id/1 , scan_for_max_age/1 + , is_empty/1 ]). -export_type([requests/0]). @@ -56,6 +57,9 @@ -spec new() -> requests(). new() -> #requests{}. +-spec is_empty(requests()) -> boolean(). +is_empty(#requests{sent = Sent}) -> maps:size(Sent) == 0. + %% @doc Add a new request to sent collection. %% Return the last corrlation ID and the new collection. -spec add(requests(), pid(), reference(), kpro:api(), kpro:vsn()) -> diff --git a/test/kpro_connection_tests.erl b/test/kpro_connection_tests.erl index 44c0f16..0ee17d3 100644 --- a/test/kpro_connection_tests.erl +++ b/test/kpro_connection_tests.erl @@ -17,6 +17,8 @@ -include_lib("eunit/include/eunit.hrl"). -include("kpro_private.hrl"). +-export([ auth/7 ]). + plaintext_test() -> Config = kpro_test_lib:connection_config(plaintext), {ok, Pid} = connect(Config), @@ -57,6 +59,33 @@ sasl_file_test() -> ok = kpro_connection:stop(Pid) end. +% SASL callback implementation for subsequent tests +auth(_Host, _Sock, _Vsn, _Mod, _ClientName, _Timeout, #{test_pid := TestPid} = SaslOpts) -> + case SaslOpts of + #{response_session_lifetime_ms := ResponseSessionLifeTimeMs} -> + TestPid ! sasl_authenticated, + {ok, #{session_lifetime_ms => ResponseSessionLifeTimeMs}}; + _ -> + ok + end. + +sasl_callback_test() -> + Config0 = kpro_test_lib:connection_config(sasl_ssl), + case kpro_test_lib:get_kafka_version() of + ?KAFKA_0_9 -> + ok; + _ -> + Config = Config0#{sasl => {callback, ?MODULE, #{response_session_lifetime_ms => 51, test_pid => self()}}}, + {ok, Pid} = connect(Config), + + % initial authentication + receive sasl_authenticated -> ok end, + % repeated authentication as session expires + receive sasl_authenticated -> ok end, + + ok = kpro_connection:stop(Pid) + end. + no_api_version_query_test() -> Config = #{query_api_versions => false}, {ok, Pid} = connect(Config), From 49ccc86bc057197c870ad4cb68340cc2e51aa0ed Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Tue, 13 Aug 2024 17:14:50 +0300 Subject: [PATCH 4/4] docs: add changelog for 4.1.7 --- changelog.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/changelog.md b/changelog.md index 231f7f4..cde7431 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,9 @@ +* 4.1.7 + - Automatically re-authenticate before session lifetime expires if SASL + authentication module returns `{ok, ServerResponse}` and ServerResponse + contains a non-zero `session_timeout_ms`. + https://github.com/kafka4beam/kafka_protocol/pull/122 + * 4.1.6 - Fix docs. PR #120