Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nhse develop d30upd #15

Merged
merged 10 commits into from
Feb 13, 2024
20 changes: 18 additions & 2 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1337,13 +1337,20 @@
{default, "q1_ttaaefs:block_rtq"}
]}.

%% @doc Enable this node zlib compress objects over the wire
%% @doc Enable this node to zlib compress objects over the wire
{mapping, "replrtq_compressonwire", "riak_kv.replrtq_compressonwire", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to replicate reap requests to other clusters
{mapping, "repl_reap", "riak_kv.repl_reap", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to act as a sink and consume from a src cluster
{mapping, "replrtq_enablesink", "riak_kv.replrtq_enablesink", [
{datatype, {flag, enabled, disabled}},
Expand Down Expand Up @@ -1478,7 +1485,7 @@
%% @doc Choose to read repair to primary vnodes only
%% When fallback vnodes are elected, then read repair will by default repair
%% any missing data from the vnode - i.e. every GET while the fallback is in
%% play will lead to a PUT to add the rewuested object to the fallback vnode,
%% play will lead to a PUT to add the requested object to the fallback vnode,
%% as the fallback by default starts empty.
%% If the expectation is that failed vnodes are replaced quickly, as would be
%% possible in a Cloud scenario, this may not be desirable. Read repair to
Expand Down Expand Up @@ -1508,4 +1515,13 @@
{mapping, "handoff_deletes", "riak_kv.handoff_deletes", [
{datatype, {flag, enabled, disabled}},
{default, disabled}
]}.

%% @doc For $key index queries, should keys which are tombstones be returned.
%% This config will only make a difference with the leveled backend, it is
%% ignored on other backends. Disable to change default behaviour and stop
%% returning keys of tombstones in $key queries
{mapping, "dollarkey_readtombs", "riak_kv.dollarkey_readtombs", [
{datatype, {flag, enabled, disabled}},
{default, enabled}
]}.
47 changes: 36 additions & 11 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) ->
{ok, riak_object:riak_object()} |
{ok, queue_empty} |
{ok, {deleted, vclock:vclock(), riak_object:riak_object()}} |
{ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}|
{error, timeout} |
{error, not_yet_implemented} |
{error, Err :: term()}.
Expand Down Expand Up @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) ->
-spec push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{ok, reap} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
push(RObjMaybeBin, IsDeleted, Opts, RiakClient) ->
RObj =
case riak_object:is_robject(RObjMaybeBin) of
% May get pushed a riak object, or a riak object as a binary, but
Expand All @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
false ->
riak_object:nextgenrepl_decode(RObjMaybeBin)
end,
case RObj of
{reap, {B, K, TC, LMD}} ->
{repl_reap(B, K, TC), LMD};
RObj ->
repl_push(RObj, IsDeleted, Opts, RiakClient)
end.

-spec repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok.
repl_reap(B, K, TC) ->
riak_kv_reaper:request_reap({{B, K}, TC, false}).

-spec repl_push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
Bucket = riak_object:bucket(RObj),
Key = riak_object:key(RObj),
Me = self(),
Expand Down Expand Up @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]})
end.


-spec reap(riak_object:bucket(), riak_object:key(), riak_client())
-> boolean().
-spec reap(
riak_object:bucket(), riak_object:key(), riak_client()) -> boolean().
reap(Bucket, Key, Client) ->
case normal_get(Bucket, Key, [deletedvclock], Client) of
{error, {deleted, TombstoneVClock}} ->
DeleteHash = riak_object:delete_hash(TombstoneVClock),
reap(Bucket, Key, DeleteHash, Client);
reap(Bucket, Key, TombstoneVClock, Client);
_Unexpected ->
false
end.

-spec reap(riak_object:bucket(), riak_object:key(), pos_integer(),
riak_client()) -> boolean().
reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) ->
-spec reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client())
-> boolean().
reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) ->
case node() of
Node ->
riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash});
riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true});
_ ->
riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap,
[{{Bucket, Key}, DeleteHash}])
riak_core_util:safe_rpc(
Node,
riak_kv_reaper,
direct_reap,
[{{Bucket, Key}, TombClock, true}]
)
end.

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) ->
Expand Down
12 changes: 7 additions & 5 deletions src/riak_kv_clusteraae_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,8 @@ json_encode_results(find_keys, Result) ->
Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]}
]},
mochijson2:encode(Keys);
json_encode_results(find_tombs, Result) ->
json_encode_results(find_keys, Result);
json_encode_results(find_tombs, KeysNClocks) ->
encode_keys_and_clocks(KeysNClocks);
json_encode_results(reap_tombs, Count) ->
mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]});
json_encode_results(erase_keys, Count) ->
Expand Down Expand Up @@ -620,7 +620,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) ->
level_two = L2
};
pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(merge_tree_range, QD, Tree) ->
Expand Down Expand Up @@ -666,8 +666,10 @@ pb_encode_results(find_keys, _QD, Results) ->
end,
#rpbaaefoldkeycountresp{response_type = <<"find_keys">>,
keys_count = lists:map(KeyCountMap, Results)};
pb_encode_results(find_tombs, QD, Results) ->
pb_encode_results(find_keys, QD, Results);
pb_encode_results(find_tombs, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(reap_tombs, _QD, Count) ->
#rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>,
keys_count =
Expand Down
7 changes: 7 additions & 0 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ queue_fetch(timeout, StateData) ->
Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData};
{Bucket, Key, TombClock, {reap, LMD}} ->
% A reap request was queued - so there is no need to fetch
% A tombstone was queued - so there is no need to fetch
Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData}
end.

Expand Down
108 changes: 74 additions & 34 deletions src/riak_kv_leveled_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,14 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{bookie=Bookie}) ->
if
Index /= false ->
{index, QBucket, Q} = Index,
?KV_INDEX_Q{filter_field=Field,
start_key=StartKey0,
start_term=StartTerm,
end_term=EndTerm,
return_terms=ReturnTerms,
start_inclusive=StartInc,
term_regex=TermRegex} = riak_index:upgrade_query(Q),
?KV_INDEX_Q{
filter_field=Field,
start_key=StartKey0,
start_term=StartTerm,
end_term=EndTerm,
return_terms=ReturnTerms,
start_inclusive=StartInc,
term_regex=TermRegex} = riak_index:upgrade_query(Q),

StartKey =
case StartInc of
Expand All @@ -339,44 +340,50 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{bookie=Bookie}) ->
% If this is a $key index query, the start key is assumed
% to mean the start of the range, and so we want to use
% this start key inclusively (and so don't advance it to
% the next_key.
% the next_key).

case Field of
<<"$bucket">> ->
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG,
QBucket,
{StartKey, null},
{FoldKeysFun, Acc},
TermRegex);
leveled_bookie:book_keylist(
Bookie,
?RIAK_TAG,
QBucket,
{StartKey, null},
{FoldKeysFun, Acc},
TermRegex);
<<"$key">> ->
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG,
QBucket,
{StartKey, EndTerm},
{FoldKeysFun, Acc},
TermRegex);
ReadTombs =
application:get_env(
riak_kv, dollarkey_readtombs, true),
FoldHeadsFun =
dollarkey_foldfun(
FoldKeysFun, ReadTombs, TermRegex),
leveled_bookie:book_headfold(
Bookie,
?RIAK_TAG,
{range, QBucket, {StartKey, EndTerm}},
{FoldHeadsFun, Acc},
false,
SnapPreFold,
false
);
_ ->
leveled_bookie:book_indexfold(Bookie,
{QBucket, StartKey},
{FoldKeysFun, Acc},
{Field,
StartTerm,
EndTerm},
{ReturnTerms,
TermRegex})
leveled_bookie:book_indexfold(
Bookie,
{QBucket, StartKey},
{FoldKeysFun, Acc},
{Field, StartTerm, EndTerm},
{ReturnTerms, TermRegex})
end;
Bucket /= false ->
% Equivalent to $bucket query, but without the StartKey
{bucket, B} = Bucket,
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG, B,
{FoldKeysFun, Acc});
leveled_bookie:book_keylist(
Bookie, ?RIAK_TAG, B, {FoldKeysFun, Acc});
true ->
% All key query - don't constrain by bucket
leveled_bookie:book_keylist(Bookie,
?RIAK_TAG,
{FoldKeysFun, Acc})
leveled_bookie:book_keylist(
Bookie, ?RIAK_TAG, {FoldKeysFun, Acc})
end,

case {lists:member(async_fold, Opts), SnapPreFold} of
Expand Down Expand Up @@ -640,6 +647,39 @@ callback(Ref, UnexpectedCallback, State) ->
%% ===================================================================


-spec dollarkey_foldfun(
riak_kv_backend:fold_keys_fun(), boolean(), re:mp()|undefined)
-> riak_kv_backend:fold_objects_fun().
dollarkey_foldfun(FoldKeysFun, ReadTombs, TermRegex) ->
FilteredFoldKeysFun =
fun(B, K, Acc) ->
case TermRegex of
undefined ->
FoldKeysFun(B, K, Acc);
TermRegex ->
case re:run(K, TermRegex) of
nomatch ->
Acc;
_ ->
FoldKeysFun(B, K, Acc)
end
end
end,
fun(B, K, HeadObj, KeyAcc) ->
case ReadTombs of
true ->
FilteredFoldKeysFun(B, K, KeyAcc);
false ->
MetaBin = element(5, riak_object:summary_from_binary(HeadObj)),
case riak_object:is_aae_object_deleted(MetaBin, false) of
{true, undefined} ->
KeyAcc;
_ ->
FilteredFoldKeysFun(B, K, KeyAcc)
end
end
end.

-spec log_fragmentation(eheap_alloc|binary_alloc) -> ok.
log_fragmentation(Allocator) ->
{MB_BS, MB_CS, SB_BS, SB_CS} =
Expand Down
Loading
Loading