diff --git a/rebar.config b/rebar.config index a1ae49c3..220ff04a 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {deps, [ {supervisor3, "1.1.11"} , {kafka_protocol, "4.1.0"} , {snappyer, "1.2.8"} + , {telemetry, "~> 1.0"} ]}. {project_plugins, [{rebar3_lint, "~> 1.0.2"}]}. {edoc_opts, [{preprocess, true}, {macros, [{build_brod_cli, true}]}]}. @@ -30,8 +31,9 @@ , {meck, "0.9.2"} , {proper, "1.4.0"} , {snabbkaffe, "1.0.1"} + , {telemetry, "~> 1.0"} ]}, - {erl_opts, [warnings_as_errors, {d, build_brod_cli}]} + {erl_opts, [warnings_as_errors, {d, build_brod_cli}, {d, brod_use_telemetry}]} ]} ]}. {ex_doc, diff --git a/rebar.lock b/rebar.lock index 2d546797..d98fd1cb 100644 --- a/rebar.lock +++ b/rebar.lock @@ -2,16 +2,19 @@ [{<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.8">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.0">>},0}, {<<"snappyer">>,{pkg,<<"snappyer">>,<<"1.2.8">>},0}, - {<<"supervisor3">>,{pkg,<<"supervisor3">>,<<"1.1.11">>},0}]}. + {<<"supervisor3">>,{pkg,<<"supervisor3">>,<<"1.1.11">>},0}, + {<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.1.0">>},0}]}. [ {pkg_hash,[ {<<"crc32cer">>, <<"C6C2275C5FB60A95F4935D414F30B50EE9CFED494081C9B36EBB02EDFC2F48DB">>}, {<<"kafka_protocol">>, <<"53FAC8866969484F783BFF204BD4E41E62A97CE9753C83F802A08D5BFC0E0C4C">>}, {<<"snappyer">>, <<"201CE9067A33C71A6A5087C0C3A49A010B17112D461E6DF696C722DCB6D0934A">>}, - {<<"supervisor3">>, <<"D81CDEC31D102FDE407423E1D05B569572850DEEBED86B951D5233C387CBA80B">>}]}, + {<<"supervisor3">>, <<"D81CDEC31D102FDE407423E1D05B569572850DEEBED86B951D5233C387CBA80B">>}, + {<<"telemetry">>, <<"A589817034A27EAB11144AD24D5C0F9FAB1F58173274B1E9BAE7074AF9CBEE51">>}]}, {pkg_hash_ext,[ {<<"crc32cer">>, <<"251499085482920DEB6C9B7AADABF9FB4C432F96ADD97AB42AEE4501E5B6F591">>}, {<<"kafka_protocol">>, <<"61CB8B80199BF95122CF8073E0F4C0AD62F82515B4D44C54F946A5972C3F5FA5">>}, {<<"snappyer">>, <<"35518E79A28548B56D8FD6AEE2F565F12F51C2D3D053F9CFA817C83BE88C4F3D">>}, - {<<"supervisor3">>, <<"E6C2DEDBCABCBA24995A218ACA12DB5E208B80D3252692B22EF0F1A266104B50">>}]} + {<<"supervisor3">>, <<"E6C2DEDBCABCBA24995A218ACA12DB5E208B80D3252692B22EF0F1A266104B50">>}, + {<<"telemetry">>, <<"B727B2A1F75614774CFF2D7565B64D0DFA5BD52BA517F16543E6FC7EFCC0DF48">>}]} ]. diff --git a/src/brod.app.src b/src/brod.app.src index 97557671..751df9c1 100644 --- a/src/brod.app.src +++ b/src/brod.app.src @@ -3,7 +3,7 @@ [{description,"Apache Kafka Erlang client library"}, {vsn,"git"}, {registered,[]}, - {applications,[kernel,stdlib,kafka_protocol,supervisor3,snappyer]}, + {applications,[kernel,stdlib,kafka_protocol,supervisor3,snappyer,telemetry]}, {env,[]}, {mod, {brod, []}}, {modules,[]}, diff --git a/src/brod_sup.erl b/src/brod_sup.erl index a7a29781..78bff9fc 100644 --- a/src/brod_sup.erl +++ b/src/brod_sup.erl @@ -108,6 +108,7 @@ start_link() -> brod:client_config()) -> ok | {error, any()}. start_client(Endpoints, ClientId, Config) -> ClientSpec = client_spec(Endpoints, ClientId, Config), + io:format(user, "start_client ClientId: ~w~n", [ClientId]), case supervisor3:start_child(?SUP, ClientSpec) of {ok, _Pid} -> ok; Error -> Error @@ -130,6 +131,9 @@ init(clients_sup) -> ClientSpecs = lists:map(fun({ClientId, Args}) -> is_atom(ClientId) orelse exit({bad_client_id, ClientId}), + brod_telemetry:execute([brod, client, init], + #{system_time => erlang:system_time()}, + #{client_id => ClientId, args => Args}), client_spec(ClientId, Args) end, Clients), %% A client may crash and restart due to network failure diff --git a/src/brod_telemetry.erl b/src/brod_telemetry.erl new file mode 100644 index 00000000..8ff37256 --- /dev/null +++ b/src/brod_telemetry.erl @@ -0,0 +1,39 @@ +%%% +%%% Copyright (c) 2017-2021 Klarna Bank AB (publ) +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +%% @private +-module(brod_telemetry). +-export([execute/2, + execute/3, + span/3]). + +-ifndef(brod_use_telemetry). + execute(_EventName, _Measurements) -> ok. + execute(_EventName, _Measurements, _Metadata) -> ok. + span(_EventPrefix, _StartMetadata, _SpanFunction) -> ok. +-else. + execute(EventName, Measurements) -> telemetry:execute(EventName, Measurements). + execute(EventName, Measurements, Metadata) -> + telemetry:execute(EventName, Measurements, Metadata). + span(EventPrefix, StartMetadata, SpanFunction) -> + telemetry:span(EventPrefix, StartMetadata, SpanFunction). +-endif. + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: diff --git a/test/brod_telemetry_SUITE.erl b/test/brod_telemetry_SUITE.erl new file mode 100644 index 00000000..503f45f4 --- /dev/null +++ b/test/brod_telemetry_SUITE.erl @@ -0,0 +1,76 @@ +%%% +%%% Copyright (c) 2019-2021, Klarna Bank AB (publ) +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +%% @private +-module(brod_telemetry_SUITE). + +%% Test framework +-export([ init_per_suite/1 + , end_per_suite/1 + , all/0 + , suite/0 + ,handle_event/4 + ]). + +%% Test cases +-export([ t_init_client/1]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(TIMEOUT, 280000). + +%%%_* ct callbacks ============================================================= + +suite() -> [{timetrap, {minutes, 1}}]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(telemetry), + Config. + +end_per_suite(_Config) -> ok. + +all() -> [t_init_client]. + +%%%_* Test functions =========================================================== +handle_event(Event, Measurements, Metadata, #{pid := Pid}) -> Pid! {Event, Measurements, Metadata}. + +t_init_client(Config) when is_list(Config) -> + ok = telemetry:attach_many( + <<"t-init-repo-handler">>, + [[brod, client, init]], + fun ?MODULE:handle_event/4, + #{pid => self()} + ), + application:stop(brod), + ok = application:set_env(brod, clients, [{t_init_client, + [ + {endpoints, [{"localhost", 9092}]} + , {auto_start_producers, true}]}]), + {ok, _} = application:ensure_all_started(brod), + receive + { [brod, client, init] + , #{system_time := _Time} + , #{client_id := t_init_client, args := _Args}} -> ok; + + Msg -> error({bad_message, t_init_client, Msg}) + end. + +%%%_* Emacs ==================================================================== +%%% Local Variables: +%%% allout-layout: t +%%% erlang-indent-level: 2 +%%% End: