Skip to content

Commit

Permalink
Merge pull request #49 from zmstone/0803-producers-should-not-crash-w…
Browse files Browse the repository at this point in the history
…hen-client-is-killed

fix: wolff_producers should not crash when client is killed
  • Loading branch information
zmstone authored Aug 3, 2023
2 parents 9a204e2 + 34b8d95 commit bc1fbf0
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 13 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 1.5.11
- Fixed a try catch pattern in `gen_server` call towards client process, this should prevent `wolff_producers` from crash if `wolff_client` is killed during initialization.
* 1.5.10
- Enhance: use `off_heap` spawn option in producer processes for better gc performance.
* 1.5.9
- Fix: when picking a producer PID, if it was dead, it could lead to an error being raised. [#38](https://github.com/kafka4beam/wolff/pull/38)
* 1.5.8
Expand Down
2 changes: 1 addition & 1 deletion src/wolff.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, wolff,
[{description, "Kafka's publisher"},
{vsn, "1.5.10"},
{vsn, "1.5.11"},
{registered, []},
{applications,
[kernel,
Expand Down
26 changes: 21 additions & 5 deletions src/wolff.appup.src
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
%% -*-: erlang -*-
{"1.5.10",
{"1.5.11",
[
{"1.5.10",
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
, {load_module, wolff_producers, brutal_purge, soft_purge, []}
, {load_module, wolff_producer, brutal_purge, soft_purge, []}
]},
{"1.5.9",
[ {load_module, wolff_producer, brutal_purge, soft_purge, []}
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
, {load_module, wolff_producers, brutal_purge, soft_purge, []}
, {load_module, wolff_producer, brutal_purge, soft_purge, []}
]},
{"1.5.8",
[ {load_module, wolff_producer, brutal_purge, soft_purge, []}
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
, {load_module, wolff_producers, brutal_purge, soft_purge, []}
, {load_module, wolff_producer, brutal_purge, soft_purge, []}
]},
{<<"1\\.5\\.[2-7]">>,
[ {load_module, wolff, brutal_purge, soft_purge, []}
Expand All @@ -26,12 +34,20 @@
}
],
[
{"1.5.10",
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
, {load_module, wolff_producers, brutal_purge, soft_purge, []}
, {load_module, wolff_producer, brutal_purge, soft_purge, []}
]},
{"1.5.9",
[ {load_module, wolff_producer, brutal_purge, soft_purge, []}
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
, {load_module, wolff_producers, brutal_purge, soft_purge, []}
, {load_module, wolff_producer, brutal_purge, soft_purge, []}
]},
{"1.5.8",
[ {load_module, wolff_producer, brutal_purge, soft_purge, []}
[ {load_module, wolff_client, brutal_purge, soft_purge, []}
, {load_module, wolff_producers, brutal_purge, soft_purge, []}
, {load_module, wolff_producer, brutal_purge, soft_purge, []}
]},
{<<"1\\.5\\.[2-7]">>,
[ {load_module, wolff, brutal_purge, soft_purge, []}
Expand Down
2 changes: 1 addition & 1 deletion src/wolff_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ check_connectivity(Hosts, ConnConfig) ->

safe_call(Pid, Call) ->
try gen_server:call(Pid, Call, infinity)
catch error : Reason -> {error, Reason}
catch exit : Reason -> {error, Reason}
end.

%% request client to send Pid the leader connection.
Expand Down
2 changes: 1 addition & 1 deletion src/wolff_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ handle_kafka_ack(#kpro_rsp{api = produce,
do_handle_kafka_ack(Ref, BaseOffset, St);
false ->
#{topic := Topic, partition := Partition} = St,
log_warn(Topic, Partition, "~s-~p: Produce response error-code = ~0p", [ErrorCode]),
log_warn(Topic, Partition, "Produce response error-code = ~0p", [ErrorCode]),
erlang:throw(ErrorCode)
end.

Expand Down
12 changes: 7 additions & 5 deletions src/wolff_producers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ handle_info(?refresh_partition_count, #{refresh_tref := Tref, config := Config}
St = refresh_partition_count(St0),
{noreply, St#{refresh_tref := start_partition_refresh_timer(Config)}};
handle_info(?rediscover_client, #{client_id := ClientId,
client_pid := false
client_pid := false,
topic := Topic
} = St0) ->
St1 = St0#{?rediscover_client_tref => false},
case wolff_client_sup:find_client(ClientId) of
Expand All @@ -224,16 +225,17 @@ handle_info(?rediscover_client, #{client_id := ClientId,
St = maybe_restart_producers(St3),
{noreply, St};
{error, Reason} ->
log_error("Failed to discover client, reason = ~p", [Reason]),
log_error("Failed to discover client ~s for topic ~s, reason: ~p", [ClientId, Topic, Reason]),
{noreply, ensure_rediscover_client_timer(St1)}
end;
handle_info(?init_producers, St) ->
%% this is a retry of last failure when initializing producer procs
{noreply, maybe_init_producers(St)};
handle_info({'DOWN', _, process, Pid, Reason}, #{client_id := ClientId,
client_pid := Pid
client_pid := Pid,
topic := Topic
} = St) ->
log_error("Client ~p (pid = ~p) down, reason: ~p", [ClientId, Pid, Reason]),
log_error("Client ~s for topic ~s down, pid: ~p, reason: ~p", [ClientId, Topic, Pid, Reason]),
%% client down, try to discover it after a delay
%% producers should all monitor client pid,
%% expect their 'EXIT' signals soon
Expand Down Expand Up @@ -363,7 +365,7 @@ refresh_partition_count(#{client_pid := Pid, topic := Topic} = St) ->
{ok, Connections} ->
start_new_producers(St, Connections);
{error, Reason} ->
log_warning("failed_to_refresh_partition_count topic:~s, reason; ~p", [Topic, Reason]),
log_warning("failed_to_refresh_partition_count topic:~s, reason: ~p", [Topic, Reason]),
St
end.

Expand Down
35 changes: 35 additions & 0 deletions test/wolff_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,41 @@ test_leader_restart() ->
end,
ok.

%% wolff_client died by the time when wolff_producers tries to initialize producers
%% it should not cause the wolff_producers_sup to restart
producers_init_failure_test_() ->
{timeout, 10, fun() -> test_producers_init_failure() end}.

test_producers_init_failure() ->
_ = application:stop(wolff),
{ok, _} = application:ensure_all_started(wolff),
ClientId = <<"proucers-init-failure-test">>,
Topic = <<"test-topic">>,
ClientCfg = #{connection_strategy => per_broker},
{ok, ClientPid} = wolff:ensure_supervised_client(ClientId, ?HOSTS, ClientCfg),
%% suspend the client so it will not respond to calls
sys:suspend(ClientPid),
ProducerCfg = #{required_acks => all_isr,
partition_count_refresh_interval_seconds => 0
},
%% this call will hang until the client pid is killed later
{TmpPid, _} = spawn_monitor(fun() -> {error, {killed, _}} = wolff_producers:start_linked_producers(ClientId, Topic, ProducerCfg), exit(normal) end),
%% wait a bit to ensure the spanwed process gets the chance to run
timer:sleep(100),
%% kill the client pid, so the gen_server:call towards the client will fail
exit(ClientPid, kill),
receive
{'DOWN', _, process, TmpPid, Reason} ->
?assertEqual(normal, Reason)
after 2000 ->
error(timeout)
end,
%% cleanup
ok = wolff:stop_and_delete_supervised_client(ClientId),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
ok = application:stop(wolff),
ok.

%% helpers

%% verify wolff_client state upgrade.
Expand Down

0 comments on commit bc1fbf0

Please sign in to comment.