Skip to content

Commit

Permalink
Merge pull request #1751 from basho/mas-i1750-nodeconfirms
Browse files Browse the repository at this point in the history
Mas i1750 nodeconfirms
  • Loading branch information
martinsumner authored Feb 25, 2020
2 parents 9127669 + 3288532 commit 6e8d145
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 71 deletions.
6 changes: 3 additions & 3 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
{eunit_formatters, ".*", {git, "git://github.com/seancribbs/eunit_formatters", {tag, "0.1.2"}}},
{leveled, ".*", {git, "https://github.com/martinsumner/leveled.git", {tag, "riak_kv-2.9.1"}}},
{kv_index_tictactree, ".*", {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "riak_kv-2.9.1"}}},
{riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-2.9.1"}}},
{riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {tag, "riak_kv-2.9.1"}}},
{riak_core, ".*", {git, "https://github.com/basho/riak_core.git", {branch, "develop-2.9"}}},
{riak_api, ".*", {git, "git://github.com/basho/riak_api.git", {branch, "develop-2.9"}}},
{hyper, ".*", {git, "git://github.com/basho/hyper", {tag, "1.0.1"}}},
{riakhttpc, ".*", {git, "git://github.com/basho/riak-erlang-http-client", {tag, "riak_kv-2.9.1"}}}
{riakhttpc, ".*", {git, "git://github.com/basho/riak-erlang-http-client", {branch, "develop-2.9"}}}
]}.
99 changes: 73 additions & 26 deletions src/riak_kv_get_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
%%
%% -------------------------------------------------------------------
-module(riak_kv_get_core).
-export([init/9, update_init/2, head_merge/1,
add_result/3, update_result/4, result_shortcode/1,
-export([init/10, update_init/2, head_merge/1,
add_result/4, update_result/5, result_shortcode/1,
enough/1, response/1, has_all_results/1, final_action/1, info/1]).
-export_type([getcore/0, result/0, reply/0, final_action/0]).

Expand Down Expand Up @@ -65,7 +65,9 @@
num_upd = 0 :: non_neg_integer(),
idx_type :: idx_type(),
head_merge = false :: boolean(),
expected_fetchclock = false :: boolean()|vclock:vclock()}).
expected_fetchclock = false :: boolean()|vclock:vclock(),
node_confirms = 0 :: non_neg_integer(),
confirmed_nodes = []}).
-opaque getcore() :: #getcore{}.

%% ====================================================================
Expand All @@ -77,8 +79,10 @@
FailThreshold::pos_integer(), NotFoundOK::boolean(),
AllowMult::boolean(), DeletedVClock::boolean(),
IdxType::idx_type(),
ExpClock::false|vclock:vclock()) -> getcore().
init(N, R, PR, FailThreshold, NotFoundOk, AllowMult, DeletedVClock, IdxType, ExpClock) ->
ExpClock::false|vclock:vclock(),
NodeConfirms::non_neg_integer()) -> getcore().
init(N, R, PR, FailThreshold, NotFoundOk, AllowMult,
DeletedVClock, IdxType, ExpClock, NodeConfirms) ->
#getcore{n = N,
r = case ExpClock of false -> R; _ -> N end,
pr = PR,
Expand All @@ -88,7 +92,8 @@ init(N, R, PR, FailThreshold, NotFoundOk, AllowMult, DeletedVClock, IdxType, Exp
allow_mult = AllowMult,
deletedvclock = DeletedVClock,
idx_type = IdxType,
expected_fetchclock = ExpClock}.
expected_fetchclock = ExpClock,
node_confirms = NodeConfirms}.

%% Re-initialise a get to a restricted number of vnodes (that must all respond)
-spec update_init(N::pos_integer(), getcore()) -> getcore().
Expand All @@ -108,8 +113,8 @@ head_merge(GetCore) ->
%% arrive is assumed to be preserved in the get core
%% datastructure. i.e. first arriving at the end of the list, latest
%% arrival at the head.
-spec add_result(non_neg_integer(), result(), getcore()) -> getcore().
add_result(Idx, {ok, RObj}, GetCore0) ->
-spec add_result(non_neg_integer(), result(), node(), getcore()) -> getcore().
add_result(Idx, {ok, RObj}, Node, GetCore0) ->
GetCore =
case GetCore0#getcore.expected_fetchclock of
false ->
Expand All @@ -134,21 +139,27 @@ add_result(Idx, {ok, RObj}, GetCore0) ->
results = [{Idx, Result}|GetCore#getcore.results],
merged = undefined,
num_ok = GetCore#getcore.num_ok + 1,
num_deleted = GetCore#getcore.num_deleted + Dels}, Idx);
add_result(Idx, {error, notfound} = Result, GetCore) ->
num_deleted = GetCore#getcore.num_deleted + Dels,
confirmed_nodes =
lists:usort([Node|GetCore#getcore.confirmed_nodes])},
Idx);
add_result(Idx, {error, notfound} = Result, Node, GetCore) ->
case GetCore#getcore.notfound_ok of
true ->
num_pr(GetCore#getcore{
results = [{Idx, Result}|GetCore#getcore.results],
merged = undefined,
num_ok = GetCore#getcore.num_ok + 1}, Idx);
num_ok = GetCore#getcore.num_ok + 1,
confirmed_nodes =
lists:usort([Node|GetCore#getcore.confirmed_nodes])},
Idx);
_ ->
GetCore#getcore{
results = [{Idx, Result}|GetCore#getcore.results],
merged = undefined,
num_notfound = GetCore#getcore.num_notfound + 1}
end;
add_result(Idx, {error, _Reason} = Result, GetCore) ->
add_result(Idx, {error, _Reason} = Result, _Node, GetCore) ->
GetCore#getcore{
results = [{Idx, Result}|GetCore#getcore.results],
merged = undefined,
Expand All @@ -161,8 +172,9 @@ add_result(Idx, {error, _Reason} = Result, GetCore) ->
-spec update_result(non_neg_integer(),
result(),
list(),
node(),
getcore()) -> getcore().
update_result(Idx, Result, IdxList, GetCore) ->
update_result(Idx, Result, IdxList, Node, GetCore) ->
case lists:member(Idx, IdxList) of
true ->
% This results should always be OK
Expand All @@ -180,7 +192,7 @@ update_result(Idx, Result, IdxList, GetCore) ->
% Add them to the result set - the result set will still be used
% for read repair. Will also detect if the last read was actually
% a more upto date object
add_result(Idx, Result, GetCore)
add_result(Idx, Result, Node, GetCore)
end.


Expand All @@ -190,30 +202,47 @@ result_shortcode(_) -> -1.

%% Check if enough results have been added to respond
-spec enough(getcore()) -> boolean().
%% Met quorum
%% Found expected clock
enough(#getcore{expected_fetchclock = true}) ->
true;
%% Met quorum
enough(#getcore{r = R, ur = UR, pr= PR,
num_ok = NumOK, num_pok = NumPOK,
num_upd = NumUPD})
when NumOK >= R andalso NumPOK >= PR andalso NumUPD >= UR ->
num_upd = NumUPD,
node_confirms = RequiredConfirms, confirmed_nodes = Nodes})
when NumOK >= R andalso
NumPOK >= PR andalso
NumUPD >= UR andalso
length(Nodes) >= RequiredConfirms ->
true;
%% too many failures
%% Too many failures
enough(#getcore{fail_threshold = FailThreshold, num_notfound = NumNotFound,
num_fail = NumFail})
when NumNotFound + NumFail >= FailThreshold ->
true;
%% Got all N responses, but unable to satisfy PR
%% Got all N responses, and no updated reads outstanding - not waiting on
%% anything.
%% In this case there has been a failure to satisfy PR or node_confirms
%% but enough is known, so can return true to prompt error via response
%% rather than sit waiting for a timeout.
enough(#getcore{n = N, ur = UR, num_ok = NumOK, num_notfound = NumNotFound,
num_fail = NumFail})
when NumOK + NumNotFound + NumFail >= N andalso UR == 0 ->
true;
%% Awaiting outstanding responses
enough(_) ->
false.

%% Get success/fail response once enough results received
-spec response(getcore()) -> {reply(), getcore()}.
%% Met quorum for a standard get request/response
response(#getcore{node_confirms = RequiredConfirms,
confirmed_nodes = Nodes} = GetCore)
when length(Nodes) < RequiredConfirms ->
check_overload({error,
{insufficient_nodes,
length(Nodes), need, RequiredConfirms}}, GetCore);
%% Insufficient nodes confirmed
response(#getcore{r = R, num_ok = NumOK, pr= PR, num_pok = NumPOK,
expected_fetchclock = ExpClock, head_merge = HM} = GetCore)
when
Expand Down Expand Up @@ -491,13 +520,13 @@ update_test() ->
idx_type = [],
results = [{1, {ok, fake_head1}}, {2, {ok, fake_head2}}]},
GC1 = update_init(1, GC0),
GC2 = update_result(3, {ok, Obj3}, [2], GC1),
GC2 = update_result(3, {ok, Obj3}, [2], node(), GC1),
?assertMatch(3, GC2#getcore.num_ok),
?assertMatch(2, GC2#getcore.r),
?assertMatch(1, GC2#getcore.ur),
?assertMatch(0, GC2#getcore.num_upd),
?assertMatch(3, length(GC2#getcore.results)),
GC3 = update_result(2, {ok, fake_get2}, [2], GC2),
GC3 = update_result(2, {ok, fake_get2}, [2], node(), GC2),
?assertMatch(3, GC3#getcore.num_ok),
?assertMatch(2, GC3#getcore.r),
?assertMatch(1, GC3#getcore.ur),
Expand Down Expand Up @@ -557,23 +586,23 @@ enough_expectedclock_test() ->
results = [],
allow_mult = true,
expected_fetchclock = ExpectedClock},
GC1 = add_result(3, {ok, Obj1}, GC0),
GC1 = add_result(3, {ok, Obj1}, node(), GC0),
?assertEqual(false, enough(GC1)),
?assertEqual({{error, {r_val_unsatisfied, 3, 1}}, GC1}, response(GC1)),

GC2A = add_result(1, {ok, Obj3}, GC1),
GC2A = add_result(1, {ok, Obj3}, node(), GC1),
?assertEqual(true, enough(GC2A)),
?assertEqual({ok, Obj3}, element(1, response(GC2A))),

GC2B = add_result(1, {ok, Obj4}, GC1),
GC2B = add_result(1, {ok, Obj4}, node(), GC1),
?assertEqual(true, enough(GC2B)),
?assertEqual({ok, Obj4}, element(1, response(GC2B))),

GC2C = add_result(1, {ok, Obj2}, GC1),
GC2C = add_result(1, {ok, Obj2}, node(), GC1),
?assertEqual(false, enough(GC2C)),
?assertEqual({{error, {r_val_unsatisfied, 3, 2}}, GC2C}, response(GC2C)),

GC3C = add_result(2, {ok, Obj1}, GC2C),
GC3C = add_result(2, {ok, Obj1}, node(), GC2C),
io:format("GC32 ~w~n", [GC3C]),
?assertEqual(true, enough(GC3C)),
?assertEqual({ok, Obj2}, element(1, response(GC3C))).
Expand Down Expand Up @@ -784,6 +813,24 @@ response_test_() ->
{2, {error, notfound}},
{3, {error, notfound}}]})),
ok
end},
{"Confirms not met",
fun() ->
RObj = riak_object:new(<<"foo">>, <<"bar">>, <<"baz">>),
?assertMatch({{error,
{insufficient_nodes, 1, need, 2}}, _},
response(#getcore{n= 3, r = 3, pr=0,
fail_threshold = 1, num_ok = 3, num_pok = 0,
allow_mult = false,
num_notfound = 0, num_deleted = 0,
num_fail = 0,
node_confirms = 2,
confirmed_nodes = [node()],
results= [
{1, {ok, RObj}},
{2, {ok, RObj}},
{3, {ok, RObj}}]})),
ok
end}
]}.
-endif.
Loading

0 comments on commit 6e8d145

Please sign in to comment.