Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nextgenrepl to real-time replicate reaps (#6) #1879

Open
wants to merge 1 commit into
base: develop-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1337,13 +1337,20 @@
{default, "q1_ttaaefs:block_rtq"}
]}.

%% @doc Enable this node zlib compress objects over the wire
%% @doc Enable this node to zlib compress objects over the wire
{mapping, "replrtq_compressonwire", "riak_kv.replrtq_compressonwire", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to replicate reap requests to other clusters
{mapping, "repl_reap", "riak_kv.repl_reap", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
{commented, enabled}
]}.

%% @doc Enable this node to act as a sink and consume from a src cluster
{mapping, "replrtq_enablesink", "riak_kv.replrtq_enablesink", [
{datatype, {flag, enabled, disabled}},
Expand Down
47 changes: 36 additions & 11 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ replrtq_reset_all_workercounts(WorkerC, PerPeerL) ->
{ok, riak_object:riak_object()} |
{ok, queue_empty} |
{ok, {deleted, vclock:vclock(), riak_object:riak_object()}} |
{ok, {reap, {riak_object:bucket(), riak_object:key(), vclock:vclock(), erlang:timestamp()}}}|
{error, timeout} |
{error, not_yet_implemented} |
{error, Err :: term()}.
Expand Down Expand Up @@ -223,10 +224,11 @@ fetch(QueueName, {?MODULE, [Node, _ClientId]}) ->
-spec push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{ok, reap} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
push(RObjMaybeBin, IsDeleted, Opts, RiakClient) ->
RObj =
case riak_object:is_robject(RObjMaybeBin) of
% May get pushed a riak object, or a riak object as a binary, but
Expand All @@ -236,6 +238,25 @@ push(RObjMaybeBin, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
false ->
riak_object:nextgenrepl_decode(RObjMaybeBin)
end,
case RObj of
{reap, {B, K, TC, LMD}} ->
{repl_reap(B, K, TC), LMD};
RObj ->
repl_push(RObj, IsDeleted, Opts, RiakClient)
end.

-spec repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock()) -> ok.
repl_reap(B, K, TC) ->
riak_kv_reaper:request_reap({{B, K}, TC, false}).

-spec repl_push(riak_object:riak_object()|binary(),
boolean(), list(), riak_client()) ->
{ok, erlang:timestamp()} |
{error, too_many_fails} |
{error, timeout} |
{error, {n_val_violation, N::integer()}}.
repl_push(RObj, IsDeleted, _Opts, {?MODULE, [Node, _ClientId]}) ->
Bucket = riak_object:bucket(RObj),
Key = riak_object:key(RObj),
Me = self(),
Expand Down Expand Up @@ -579,26 +600,30 @@ consistent_delete(Bucket, Key, Options, _Timeout, {?MODULE, [Node, _ClientId]})
end.


-spec reap(riak_object:bucket(), riak_object:key(), riak_client())
-> boolean().
-spec reap(
riak_object:bucket(), riak_object:key(), riak_client()) -> boolean().
reap(Bucket, Key, Client) ->
case normal_get(Bucket, Key, [deletedvclock], Client) of
{error, {deleted, TombstoneVClock}} ->
DeleteHash = riak_object:delete_hash(TombstoneVClock),
reap(Bucket, Key, DeleteHash, Client);
reap(Bucket, Key, TombstoneVClock, Client);
_Unexpected ->
false
end.

-spec reap(riak_object:bucket(), riak_object:key(), pos_integer(),
riak_client()) -> boolean().
reap(Bucket, Key, DeleteHash, {?MODULE, [Node, _ClientId]}) ->
-spec reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client())
-> boolean().
reap(Bucket, Key, TombClock, {?MODULE, [Node, _ClientId]}) ->
case node() of
Node ->
riak_kv_reaper:direct_reap({{Bucket, Key}, DeleteHash});
riak_kv_reaper:direct_reap({{Bucket, Key}, TombClock, true});
_ ->
riak_core_util:safe_rpc(Node, riak_kv_reaper, direct_reap,
[{{Bucket, Key}, DeleteHash}])
riak_core_util:safe_rpc(
Node,
riak_kv_reaper,
direct_reap,
[{{Bucket, Key}, TombClock, true}]
)
end.

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), riak_client()) ->
Expand Down
12 changes: 7 additions & 5 deletions src/riak_kv_clusteraae_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ json_encode_results(find_keys, Result) ->
Keys = {struct, [{<<"results">>, [{struct, encode_find_key(Key, Int)} || {_Bucket, Key, Int} <- Result]}
]},
mochijson2:encode(Keys);
json_encode_results(find_tombs, Result) ->
json_encode_results(find_keys, Result);
json_encode_results(find_tombs, KeysNClocks) ->
encode_keys_and_clocks(KeysNClocks);
json_encode_results(reap_tombs, Count) ->
mochijson2:encode({struct, [{<<"dispatched_count">>, Count}]});
json_encode_results(erase_keys, Count) ->
Expand Down Expand Up @@ -616,7 +616,7 @@ pb_encode_results(merge_branch_nval, _QD, Branches) ->
level_two = L2
};
pb_encode_results(fetch_clocks_nval, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(merge_tree_range, QD, Tree) ->
Expand Down Expand Up @@ -662,8 +662,10 @@ pb_encode_results(find_keys, _QD, Results) ->
end,
#rpbaaefoldkeycountresp{response_type = <<"find_keys">>,
keys_count = lists:map(KeyCountMap, Results)};
pb_encode_results(find_tombs, QD, Results) ->
pb_encode_results(find_keys, QD, Results);
pb_encode_results(find_tombs, _QD, KeysNClocks) ->
#rpbaaefoldkeyvalueresp{
response_type = atom_to_binary(clock, unicode),
keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)};
pb_encode_results(reap_tombs, _QD, Count) ->
#rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>,
keys_count =
Expand Down
7 changes: 7 additions & 0 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ queue_fetch(timeout, StateData) ->
Msg = {ReqID, {ok, {deleted, ExpectedClock, Obj}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData};
{Bucket, Key, TombClock, {reap, LMD}} ->
% A reap request was queued - so there is no need to fetch
% A tombstone was queued - so there is no need to fetch
Msg = {ReqID, {ok, {reap, {Bucket, Key, TombClock, LMD}}}},
Pid ! Msg,
ok = riak_kv_stat:update(ngrfetch_prefetch),
{stop, normal, StateData}
end.

Expand Down
70 changes: 44 additions & 26 deletions src/riak_kv_pb_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
case Result of
{ok, queue_empty} ->
{reply, #rpbfetchresp{queue_empty = true}, State};
{ok, {reap, {B, K, TC, LMD}}} ->
EncObj =
riak_object:nextgenrepl_encode(
repl_v1, {reap, {B, K, TC, LMD}}, false),
CRC32 = erlang:crc32(EncObj),
Resp =
#rpbfetchresp{
queue_empty = false,
replencoded_object = EncObj,
crc_check = CRC32},
{reply,
encode_nextgenrepl_response(Encoding, Resp, {B, K, TC}),
State};
{ok, {deleted, TombClock, RObj}} ->
% Never bother compressing tombstones, they're practically empty
EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, false),
Expand All @@ -212,18 +225,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
replencoded_object = EncObj,
crc_check = CRC32,
deleted_vclock = pbify_rpbvc(TombClock)},
case Encoding of
internal ->
{reply, Resp, State};
internal_aaehash ->
BK = make_binarykey(RObj),
{SegID, SegHash} =
leveled_tictac:tictac_hash(BK, lists:sort(TombClock)),
{reply,
Resp#rpbfetchresp{segment_id = SegID,
segment_hash = SegHash},
State}
end;
{reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State};
{ok, RObj} ->
EncObj = riak_object:nextgenrepl_encode(repl_v1, RObj, ToCompress),
CRC32 = erlang:crc32(EncObj),
Expand All @@ -232,19 +234,7 @@ process(#rpbfetchreq{queuename = QueueName, encoding = EncodingBin},
deleted = false,
replencoded_object = EncObj,
crc_check = CRC32},
case Encoding of
internal ->
{reply, Resp, State};
internal_aaehash ->
BK = make_binarykey(RObj),
Clock = lists:sort(riak_object:vclock(RObj)),
{SegID, SegHash} =
leveled_tictac:tictac_hash(BK, Clock),
{reply,
Resp#rpbfetchresp{segment_id = SegID,
segment_hash = SegHash},
State}
end;
{reply, encode_nextgenrepl_response(Encoding, Resp, RObj), State};
{error, Reason} ->
{error, {format, Reason}, State}
end;
Expand Down Expand Up @@ -443,7 +433,35 @@ process_stream(_,_,State) ->
%% Internal functions
%% ===================================================================

-spec make_binarykey(riak_object:riak_object()) -> binary().
-spec encode_nextgenrepl_response(
intenal|internal_aaehash,
#rpbfetchresp{},
riak_object:riak_object()|
{riak_object:bucket(), riak_object:key(), vclock:vclock()})
-> #rpbfetchresp{}.
encode_nextgenrepl_response(Encoding, Resp, RObj) ->
case Encoding of
internal ->
Resp;
internal_aaehash ->
{SegID, SegHash} =
case RObj of
{B, K, TC} ->
BK = make_binarykey({B, K}),
leveled_tictac:tictac_hash(BK, lists:sort(TC));
RObj ->
BK = make_binarykey(RObj),
leveled_tictac:tictac_hash(
BK, lists:sort(riak_object:vclock(RObj)))
end,
Resp#rpbfetchresp{segment_id = SegID, segment_hash = SegHash}
end.

-spec make_binarykey(
riak_object:riak_object()|{riak_object:bucket(), riak_object:key()})
-> binary().
make_binarykey({B, K}) ->
make_binarykey(B, K);
make_binarykey(RObj) ->
make_binarykey(riak_object:bucket(RObj), riak_object:key(RObj)).

Expand Down
28 changes: 25 additions & 3 deletions src/riak_kv_reaper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@
redo/0]).

-type reap_reference() ::
{{riak_object:bucket(), riak_object:key()}, non_neg_integer()}.
{{riak_object:bucket(), riak_object:key()}, vclock:vclock(), boolean()}.
%% the reap reference is {Bucket, Key, Clock (of tombstone), Forward}. The
%% Forward boolean() indicates if this reap should be replicated if
%% riak_kv.repl_reap is true. When a reap is received via replication
%% Forward should be set to false, to prevent reaps from perpetually
%% circulating
-type job_id() :: pos_integer().

-export_type([reap_reference/0, job_id/0]).
Expand Down Expand Up @@ -149,7 +154,7 @@ get_limits() ->
%% we will not redo - redo is only to handle the failure related to unavailable
%% primaries
-spec action(reap_reference(), boolean()) -> boolean().
action({{Bucket, Key}, DeleteHash}, Redo) ->
action({{Bucket, Key}, TombClock, ToRepl}, Redo) ->
BucketProps = riak_core_bucket:get_bucket(Bucket),
DocIdx = riak_core_util:chash_key({Bucket, Key}, BucketProps),
{n_val, N} = lists:keyfind(n_val, 1, BucketProps),
Expand All @@ -160,7 +165,11 @@ action({{Bucket, Key}, DeleteHash}, Redo) ->
PL0 = lists:map(fun({Target, primary}) -> Target end, PrefList),
case check_all_mailboxes(PL0) of
ok ->
riak_kv_vnode:reap(PL0, {Bucket, Key}, DeleteHash),
riak_kv_vnode:reap(
PL0,
{Bucket, Key},
riak_object:delete_hash(TombClock)),
maybe_repl_reap(Bucket, Key, TombClock, ToRepl),
timer:sleep(TombPause),
true;
soft_loaded ->
Expand All @@ -171,6 +180,7 @@ action({{Bucket, Key}, DeleteHash}, Redo) ->
if Redo -> false; true -> true end
end.


-spec redo() -> boolean().
redo() -> true.

Expand All @@ -180,6 +190,18 @@ redo() -> true.

-type preflist_entry() :: {non_neg_integer(), node()}.

-spec maybe_repl_reap(
riak_object:bucket(), riak_object:key(), vclock:vclock(), boolean()) -> ok.
maybe_repl_reap(Bucket, Key, TombClock, ToReap) ->
case application:get_env(riak_kv, repl_reap, false) and ToReap of
true ->
riak_kv_replrtq_src:replrtq_reap(
Bucket, Key, TombClock, os:timestamp());
false ->
ok
end.


%% Protect against overloading the system when not reaping should any
%% mailbox be in soft overload state
-spec check_all_mailboxes(list(preflist_entry())) -> ok|soft_loaded.
Expand Down
30 changes: 19 additions & 11 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@
% Modified time by bucket - second, minute, hour, day, longer}

-type reply_tuple() ::
{queue_empty, non_neg_integer()} |
{tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()} |
{object, non_neg_integer(), non_neg_integer(), non_neg_integer()} |
{error, any(), any()}.
{queue_empty, non_neg_integer()}|
{tomb, non_neg_integer(), non_neg_integer(), non_neg_integer()}|
{object, non_neg_integer(), non_neg_integer(), non_neg_integer()}|
{error, any(), any()}.

-export_type([peer_info/0, queue_name/0]).

Expand Down Expand Up @@ -712,9 +712,9 @@ repl_fetcher(WorkItem) ->
SWFetched = os:timestamp(),
{ok, LMD} = riak_client:push(RObj, false, [], LocalClient),
SWPushed = os:timestamp(),
ModSplit = timer:now_diff(SWPushed, LMD),
FetchSplit = timer:now_diff(SWFetched, SW),
PushSplit = timer:now_diff(SWPushed, SWFetched),
ModSplit = timer:now_diff(SWPushed, LMD),
ok = riak_kv_stat:update(ngrrepl_object),
done_work(WorkItem, true,
{object, FetchSplit, PushSplit, ModSplit});
Expand All @@ -739,9 +739,16 @@ repl_fetcher(WorkItem) ->
done_work(UpdWorkItem, false, {error, error, remote_error})
end
catch
Type:Exception ->
lager:warning("Snk worker failed at Peer ~w due to ~w error ~w",
[Peer, Type, Exception]),
Type:Exception:Stk ->
lager:warning(
"Snk worker failed at Peer ~w due to ~w error ~w",
[Peer, Type, Exception]),
case app_helper:get_env(riak_kv, log_snk_stacktrace, false) of
true ->
lager:warning("Snk worker failed due to ~p", [Stk]);
_ ->
ok
end,
RemoteFun(close),
UpdWorkItem0 = setelement(3, WorkItem, RenewClientFun()),
ok = riak_kv_stat:update(ngrrepl_error),
Expand Down Expand Up @@ -786,8 +793,9 @@ add_success({{success, Success}, F, FT, PT, RT, MT}) ->
add_failure({S, {failure, Failure}, FT, PT, RT, MT}) ->
{S, {failure, Failure + 1}, FT, PT, RT, MT}.

-spec add_repltime(queue_stats(),
{integer(), integer(), integer()}) -> queue_stats().
-spec add_repltime(
queue_stats(), {non_neg_integer(), non_neg_integer(), non_neg_integer()})
-> queue_stats().
add_repltime({S,
F,
{replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT},
Expand All @@ -799,7 +807,7 @@ add_repltime({S,
{replmod_time, RT + RT0},
MT}.

-spec add_modtime(queue_stats(), integer()) -> queue_stats().
-spec add_modtime(queue_stats(), non_neg_integer()) -> queue_stats().
add_modtime({S, F, FT, PT, RT, MT}, ModTime) ->
E = mod_split_element(ModTime div 1000) + 1,
C = element(E, MT),
Expand Down
Loading
Loading