Skip to content

Commit

Permalink
Nhse develop d30upd (#15)
Browse files Browse the repository at this point in the history
* Merge pull request #1 from nhs-riak/nhse-contrib-kv1871

KV i1871 - Handle timeout on remote connection

* Trigger batch correctly at each size (#4)

* Force timeout to trigger (#3)

Previously, the inactivity timeout on handle_continue could be cancelled by a call to riak_kv_rpelrtq_snk (e.g. from riak_kv_rpelrtq_peer).  this might lead to the log_stats loop never being triggered.

* Configurable %key query on leveled (#8)

Can be configured to ignore tombstone keys by default.

* Allow nextgenrepl to real-time replicate reaps (#6)

* Allow nextgenrepl to real-time replicate reaps

This is to address the issue of reaping across sync'd clusters.  Without this feature it is necessary to disable full-sync whilst independently replicating on each cluster.

Now if reaping via riak_kv_reaper the reap will be replicated assuming the `riak_kv.repl_reap` flag has been enabled.  At the receiving cluster the reap will not be replicated any further.

There are some API changes to support this.  The `find_tombs` aae_fold will now return Keys/Clocks and not Keys/DeleteHash.  The ReapReference for riak_kv_repaer will now expect a clock (version vector) not a DeleteHash, and will also now expect an additional boolean to indicate if this repl is a replication candidate (it will be false for all pushed reaps).

The object encoding for nextgenrepl now has a flag to indicate a reap, with a special encoding for reap references.

* Update riak_object.erl

Clarify specs

* Take timestamp at correct point (after push)

* Updates following review

* Update rebar.config

* Make current_peers empty when disabled (#10)

* Make current_peers empty when disabled

* Peer discovery to recognise suspend and disable of sink

* Update src/riak_kv_replrtq_peer.erl

Co-authored-by: Thomas Arts <[email protected]>

* Update src/riak_kv_replrtq_peer.erl

Co-authored-by: Thomas Arts <[email protected]>

---------

Co-authored-by: Thomas Arts <[email protected]>

* De-lager

* Add support for v0 object in parallel-mode AAE (#11)

* Add support for v0 object in parallel-mode AAE

Cannot assume that v0 objects will not happen - capability negotiation down to v0 on 3.0 Riak during failure scenarios

* Update following review

As ?MAGIC is distinctive constant, then it should be the one on the pattern match - with everything else assume to be convertible by term_to_binary.

* Update src/riak_object.erl

Co-authored-by: Thomas Arts <[email protected]>

---------

Co-authored-by: Thomas Arts <[email protected]>

* Update riak_kv_ttaaefs_manager.erl (#13)

For bucket-based full-sync `{tree_compare, 0}` is the return on success.

* Correct log macro typo

---------

Co-authored-by: Thomas Arts <[email protected]>
  • Loading branch information
martinsumner and ThomasArts authored Feb 13, 2024
1 parent bde345e commit 8922222
Show file tree
Hide file tree
Showing 14 changed files with 496 additions and 267 deletions.
20 changes: 18 additions & 2 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1337,13 +1337,20 @@
{default, "q1_ttaaefs:block_rtq"}
]}.

%% @doc Enable this node zlib compress objects over the wire
%% @doc Enable this node to zlib compress objects over the wire
{mapping, "replrtq_compressonwire", "riak_kv.replrtq_compressonwire", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to replicate reap requests to other clusters
{mapping, "repl_reap", "riak_kv.repl_reap", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to act as a sink and consume from a src cluster
{mapping, "replrtq_enablesink", "riak_kv.replrtq_enablesink", [
{datatype, {flag, enabled, disabled}},
Expand Down Expand Up @@ -1478,7 +1485,7 @@
%% @doc Choose to read repair to primary vnodes only
%% When fallback vnodes are elected, then read repair will by default repair
%% any missing data from the vnode - i.e. every GET while the fallback is in
%% play will lead to a PUT to add the rewuested object to the fallback vnode,
%% play will lead to a PUT to add the requested object to the fallback vnode,
%% as the fallback by default starts empty.
%% If the expectation is that failed vnodes are replaced quickly, as would be
%% possible in a Cloud scenario, this may not be desirable. Read repair to
Expand Down Expand Up @@ -1508,4 +1515,13 @@
{mapping, "handoff_deletes", "riak_kv.handoff_deletes", [
{datatype, {flag, enabled, disabled}},
{default, disabled}
]}.

%% @doc For $key index queries, should keys which are tombstones be returned.
%% This config will only make a difference with the leveled backend, it is
%% ignored on other backends. Disable to change default behaviour and stop
%% returning keys of tombstones in $key queries
{mapping, "dollarkey_readtombs", "riak_kv.dollarkey_readtombs", [
{datatype, {flag, enabled, disabled}},
{default, enabled}
]}.
47 changes: 36 additions & 11 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) ->
{ok, riak_object:riak_object()} |
{ok, queue_empty} |
{ok, {deleted, vclock:vclock(), riak_object:riak_object()}} |
{ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}|
{error, timeout} |
{error, not_yet_implemented} |
{error, Err :: term()}.
Expand Down Expand Up @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) ->
-spec push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{ok, reap} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
push(RObjMaybeBin, IsDeleted, Opts, RiakClient) ->
RObj =
case riak_object:is_robject(RObjMaybeBin) of
% May get pushed a riak object, or a riak object as a binary, but
Expand All @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
false ->
riak_object:nextgenrepl_decode(RObjMaybeBin)
end,
case RObj of
{reap, {B, K, TC, LMD}} ->
{repl_reap(B, K, TC), LMD};
RObj ->
repl_push(RObj, IsDeleted, Opts, RiakClient)
end.

-spec repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok.
repl_reap(B, K, TC) ->
riak_kv_reaper:request_reap({{B, K}, TC, false}).

-spec repl_push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
Bucket = riak_object:bucket(RObj),
Key = riak_object:key(RObj),
Me = self(),
Expand Down Expand Up @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]})
end.


-spec reap(riak_object:bucket(), riak_object:key(), riak_client())
-> boolean().
-spec reap(
riak_object:bucket(), riak_object:key(), riak_client()) -> boolean().
reap(Bucket, Key, Client) ->
case normal_get(Bucket, Key, [deletedvclock], Client) of
{error, {deleted, TombstoneVClock}} ->
DeleteHash = riak_object:delete_hash(TombstoneVClock),
reap(Bucket, Key, DeleteHash, Client);
reap(Bucket, Key, TombstoneVClock, Client);
_Unexpected ->
false
end.

-spec reap(riak_object:bucket(), riak_object:key(), pos_integer(),
riak_client()) -> boolean().
reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) ->
-spec reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client())
-> boolean().
reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) ->
case node() of
Node ->
riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash});
riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true});
_ ->
riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap,
[{{Bucket, Key}, DeleteHash}])
riak_core_util:safe_rpc(
Node,
riak_kv_reaper,
direct_reap,
[{{Bucket, Key}, TombClock, true}]
)
end.

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) ->
Expand Down
12 changes: 7 additions & 5 deletions src/riak_kv_clusteraae_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,8 @@ json_encode_results(find_keys, Result) ->
Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]}
]},
mochijson2:encode(Keys);
json_encode_results(find_tombs, Result) ->
json_encode_results(find_keys, Result);
json_encode_results(find_tombs, KeysNClocks) ->
encode_keys_and_clocks(KeysNClocks);
json_encode_results(reap_tombs, Count) ->
mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]});
json_encode_results(erase_keys, Count) ->
Expand Down Expand Up @@ -620,7 +620,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) ->
level_two = L2
};
pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(merge_tree_range, QD, Tree) ->
Expand Down Expand Up @@ -666,8 +666,10 @@ pb_encode_results(find_keys, _QD, Results) ->
end,
#rpbaaefoldkeycountresp{response_type = <<"find_keys">>,
keys_count = lists:map(KeyCountMap, Results)};
pb_encode_results(find_tombs, QD, Results) ->
pb_encode_results(find_keys, QD, Results);
pb_encode_results(find_tombs, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(reap_tombs, _QD, Count) ->
#rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>,
keys_count =
Expand Down
7 changes: 7 additions & 0 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ queue_fetch(timeout, StateData) ->
Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData};
{Bucket, Key, TombClock, {reap, LMD}} ->
% A reap request was queued - so there is no need to fetch
% A tombstone was queued - so there is no need to fetch
Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData}
end.

Expand Down
108 changes: 74 additions & 34 deletions src/riak_kv_leveled_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,14 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{bookie=Bookie}) ->
if
Index /= false ->
{index, QBucket, Q} = Index,
?KV_INDEX_Q{filter_field=Field,
start_key=StartKey0,
start_term=StartTerm,
end_term=EndTerm,
return_terms=ReturnTerms,
start_inclusive=StartInc,
term_regex=TermRegex} = riak_index:upgrade_query(Q),
?KV_INDEX_Q{
filter_field=Field,
start_key=StartKey0,
start_term=StartTerm,
end_term=EndTerm,
return_terms=ReturnTerms,
start_inclusive=StartInc,
term_regex=TermRegex} = riak_index:upgrade_query(Q),

StartKey =
case StartInc of
Expand All @@ -339,44 +340,50 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{bookie=Bookie}) ->
% If this is a $key index query, the start key is assumed
% to mean the start of the range, and so we want to use
% this start key inclusively (and so don't advance it to
% the next_key.
% the next_key).

case Field of
<<"$bucket">> ->
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG,
QBucket,
{StartKey, null},
{FoldKeysFun, Acc},
TermRegex);
leveled_bookie:book_keylist(
Bookie,
?RIAK_TAG,
QBucket,
{StartKey, null},
{FoldKeysFun, Acc},
TermRegex);
<<"$key">> ->
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG,
QBucket,
{StartKey, EndTerm},
{FoldKeysFun, Acc},
TermRegex);
ReadTombs =
application:get_env(
riak_kv, dollarkey_readtombs, true),
FoldHeadsFun =
dollarkey_foldfun(
FoldKeysFun, ReadTombs, TermRegex),
leveled_bookie:book_headfold(
Bookie,
?RIAK_TAG,
{range, QBucket, {StartKey, EndTerm}},
{FoldHeadsFun, Acc},
false,
SnapPreFold,
false
);
_ ->
leveled_bookie:book_indexfold(Bookie,
{QBucket, StartKey},
{FoldKeysFun, Acc},
{Field,
StartTerm,
EndTerm},
{ReturnTerms,
TermRegex})
leveled_bookie:book_indexfold(
Bookie,
{QBucket, StartKey},
{FoldKeysFun, Acc},
{Field, StartTerm, EndTerm},
{ReturnTerms, TermRegex})
end;
Bucket /= false ->
% Equivalent to $bucket query, but without the StartKey
{bucket, B} = Bucket,
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG, B,
{FoldKeysFun, Acc});
leveled_bookie:book_keylist(
Bookie, ?RIAK_TAG, B, {FoldKeysFun, Acc});
true ->
% All key query - don't constrain by bucket
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG,
{FoldKeysFun, Acc})
leveled_bookie:book_keylist(
Bookie, ?RIAK_TAG, {FoldKeysFun, Acc})
end,

case {lists:member(async_fold, Opts), SnapPreFold} of
Expand Down Expand Up @@ -640,6 +647,39 @@ callback(Ref, UnexpectedCallback, State) ->
%% ===================================================================


-spec dollarkey_foldfun(
riak_kv_backend:fold_keys_fun(), boolean(), re:mp()|undefined)
-> riak_kv_backend:fold_objects_fun().
dollarkey_foldfun(FoldKeysFun, ReadTombs, TermRegex) ->
FilteredFoldKeysFun =
fun(B, K, Acc) ->
case TermRegex of
undefined ->
FoldKeysFun(B, K, Acc);
TermRegex ->
case re:run(K, TermRegex) of
nomatch ->
Acc;
_ ->
FoldKeysFun(B, K, Acc)
end
end
end,
fun(B, K, HeadObj, KeyAcc) ->
case ReadTombs of
true ->
FilteredFoldKeysFun(B, K, KeyAcc);
false ->
MetaBin = element(5, riak_object:summary_from_binary(HeadObj)),
case riak_object:is_aae_object_deleted(MetaBin, false) of
{true, undefined} ->
KeyAcc;
_ ->
FilteredFoldKeysFun(B, K, KeyAcc)
end
end
end.

-spec log_fragmentation(eheap_alloc|binary_alloc) -> ok.
log_fragmentation(Allocator) ->
{MB_BS, MB_CS, SB_BS, SB_CS} =
Expand Down
Loading

0 comments on commit 8922222

Please sign in to comment.