Skip to content

Commit

Permalink
Merge pull request #54 from zmstone/no-stats-by-default
Browse files Browse the repository at this point in the history
refactor: do not enable global stats collection by default
  • Loading branch information
zmstone authored Nov 1, 2023
2 parents 7d8035d + 830ff47 commit 46a8708
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
7 changes: 6 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 1.9.0
- No global stats collection by default.
There is a ets table based stats collector to record the number of sent bytes and messages. Consider this feature deprecated.
Since 1.7.0, there there is a better integration for metrics.
* 1.8.0
- Add wolff:check_if_topic_exists/2 for checking if a topic exists making use of an existing client process. [#52](https://github.com/kafka4beam/wolff/pull/52)
- Improved logs when reporting connection errors. (merged 1.5.12)
Expand All @@ -20,8 +24,9 @@
- Upgrade `kafka_protocol` from version 4.1.0 to 4.1.1 to enable customizing the SNI without needing to set the `verify_peer` option.
* 1.7.1 (merged 1.5.9)
- Fix: when picking a producer PID, if it was dead, it could lead to an error being raised. [#37](https://github.com/kafka4beam/wolff/pull/37)
* 1.6.5
* 1.7.0
- Upgrade `kafka_protocol` from version 4.0.3 to version to 4.1.0 for SASL/GSSAPI auth support.
- Also added beam-telemetry for better metrics report.
* 1.6.4 (merged 1.5.8)
* 1.6.3 (merged 1.5.7)
- Stop supervised producer if failed to start. Otherwise the caller may have to call the wolff:stop_and_delete_supervised_producers/3
Expand Down
27 changes: 18 additions & 9 deletions src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
max_send_ahead => non_neg_integer(),
compression => kpro:compress_option(),
drop_if_highmem => boolean(),
telemetry_meta_data => map()
telemetry_meta_data => map(),
enable_global_stats => boolean()
}.

-define(no_timer, no_timer).
Expand Down Expand Up @@ -89,6 +90,7 @@
, sent_reqs_count => non_neg_integer()
, inflight_calls => non_neg_integer()
, topic := topic()
, enable_global_stats := boolean()
}.

%% @doc Start a per-partition producer worker.
Expand All @@ -107,6 +109,9 @@
%% * `max_send_ahead': Number of batches to be sent ahead without receiving ack for
%% the last request. Must be 0 if messages must be delivered in strict order.
%% * `compression': `no_compression', `snappy' or `gzip'.
%% * `enable_global_stats': `true' | `false'.
%% Introduced in 1.9.0, default is `false'. Set to `true' to enalbe a global
%% send/receive stats table created in `wolff_stats' module.
-spec start_link(wolff:client_id(), topic(), partition(), pid() | ?conn_down(any()), config()) ->
{ok, pid()} | {error, any()}.
start_link(ClientId, Topic, Partition, MaybeConnPid, Config) ->
Expand Down Expand Up @@ -224,8 +229,9 @@ do_init(#{client_id := ClientId,
sent_reqs_count => 0,
inflight_calls => 0,
conn := undefined,
client_id => ClientId
}.
client_id => ClientId,
enable_global_stats => maps:get(enable_global_stats, Config0, false)
}.

handle_call(stop, From, St) ->
gen_server:reply(From, ok),
Expand All @@ -238,13 +244,9 @@ handle_info({do_init, St0}, _) ->
{noreply, St};
handle_info(?linger_expire, St) ->
{noreply, maybe_send_to_kafka(St#{?linger_expire_timer := false})};
handle_info(?SEND_REQ(_, Batch, _) = Call, #{client_id := ClientId,
topic := Topic,
partition := Partition,
config := #{max_batch_bytes := Limit}
} = St0) ->
handle_info(?SEND_REQ(_, Batch, _) = Call, #{config := #{max_batch_bytes := Limit}} = St0) ->
{Calls, Cnt, Oct} = collect_send_calls([Call], 1, batch_bytes(Batch), Limit),
ok = wolff_stats:recv(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}),
ok = recv_stats(St0, #{cnt => Cnt, oct => Oct}),
St1 = enqueue_calls(Calls, St0),
St = maybe_send_to_kafka(St1),
{noreply, St};
Expand Down Expand Up @@ -653,11 +655,18 @@ log_error(Topic, Partition, Msg, Args) ->
log(Level, Report) ->
logger:log(Level, Report).

send_stats(#{enable_global_stats := false}, _) ->
ok;
send_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Batch) ->
{Cnt, Oct} =
lists:foldl(fun(Msg, {C, O}) -> {C + 1, O + oct(Msg)} end, {0, 0}, Batch),
ok = wolff_stats:sent(ClientId, Topic, Partition, #{cnt => Cnt, oct => Oct}).

recv_stats(#{enable_global_stats := false}, _) ->
ok;
recv_stats(#{client_id := ClientId, topic := Topic, partition := Partition}, Increments) ->
ok = wolff_stats:recv(ClientId, Topic, Partition, Increments).

%% Estimation of size in bytes of one payload sent to Kafka.
%% According to Kafka protocol, a v2 record consists of below fields:
%% Length => varint # varint_bytes(SizeOfAllRestFields)
Expand Down
3 changes: 2 additions & 1 deletion test/wolff_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ start_producers(Client) ->
ProducerCfg = #{required_acks => all_isr,
max_batch_bytes => 800*1000,
max_linger_ms => 1000,
max_send_ahead => 100
max_send_ahead => 100,
enable_global_stats => true
},
wolff:start_producers(Client, ?TOPIC, ProducerCfg).

Expand Down
4 changes: 3 additions & 1 deletion test/wolff_supervised_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ fetch(Connection, Topic, Partition, Offset, MaxBytes) ->
client_config() -> #{}.

producer_config() ->
#{replayq_dir => "test-data"}.
#{replayq_dir => "test-data",
enable_global_stats => true
}.

key(Name) ->
iolist_to_binary(io_lib:format("~p/~p/~p", [Name, calendar:local_time(),
Expand Down
3 changes: 2 additions & 1 deletion test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,8 @@ to_old_client_state(St0) ->
client_config() -> #{}.

producer_config() ->
#{replayq_dir => "test-data"}.
#{replayq_dir => "test-data",
enable_global_stats => true}.

key(Name) ->
iolist_to_binary(io_lib:format("~p/~p", [Name, calendar:local_time()])).
Expand Down

0 comments on commit 46a8708

Please sign in to comment.