Skip to content

Commit

Permalink
Merge pull request #1755 from keynslug/fix/[email protected]
Browse files Browse the repository at this point in the history
Fix spurious lost put coordinator acks
  • Loading branch information
martinsumner authored Jun 15, 2020
2 parents 17302e5 + 1b32ec4 commit db1b2de
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions src/riak_kv_put_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,24 @@ get_put_coordinator_failure_timeout() ->
app_helper:get_env(riak_kv, put_coordinator_failure_timeout, 3000).

make_ack_options(Options) ->
case (riak_core_capability:get(
{riak_kv, put_fsm_ack_execute}, disabled) == disabled
orelse not
app_helper:get_env(
riak_kv, retry_put_coordinator_failure, true)) of
true ->
AckOption = get_option(ack_execute, Options),
AckCap = riak_core_capability:get({riak_kv, put_fsm_ack_execute}, disabled),
RetryCoord =
app_helper:get_env(riak_kv, retry_put_coordinator_failure, true) andalso
get_option(retry_put_coordinator_failure, Options, true),
case {AckOption, AckCap, RetryCoord} of
{Pid, _, _} when is_pid(Pid) ->
%% Some process (probably on another node) is already waiting
%% for an ack, no need to monitor here.
{false, Options};
false ->
case get_option(retry_put_coordinator_failure, Options, true) of
true ->
{true, [{ack_execute, self()}|Options]};
_Else ->
{false, Options}
end
{undefined, disabled, _} ->
{false, Options};
{undefined, _, false} ->
{false, Options};
{undefined, enabled, true} ->
{true, [
%% ack forwarder
{ack_execute, self()}| Options]}
end.

spawn_coordinator_proc(CoordNode, Mod, Fun, Args) ->
Expand All @@ -211,7 +215,14 @@ monitor_remote_coordinator(false = _UseAckP, _MiddleMan, _CoordNode, StateData)
{stop, normal, StateData};
monitor_remote_coordinator(true = _UseAckP, MiddleMan, CoordNode, StateData) ->
receive
{ack, CoordNode, now_executing} ->
{ack, CoordNodeFinal, now_executing} ->
case CoordNodeFinal of
CoordNode ->
ok;
_ ->
lager:warning("unexpected forward-ack from ~p, expected from ~p",
[CoordNodeFinal, CoordNode])
end,
{stop, normal, StateData}
after StateData#state.coordinator_timeout ->
exit(MiddleMan, kill),
Expand Down Expand Up @@ -1292,9 +1303,7 @@ forward(CoordNode, State) ->
[
%% don't check mbox at new fsm, we
%% picked the "best"
{mbox_check, false},
%% ack forwarder
{ack_execute, self()}
{mbox_check, false}
| Options]),
MiddleMan = spawn_coordinator_proc(
CoordNode, riak_kv_put_fsm, start_link,
Expand Down

0 comments on commit db1b2de

Please sign in to comment.