Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to force send non-idempotent write when doing duplication #1908

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e60dc0a
support_no_idempotent_dup
Feb 18, 2024
31a4271
format and IWYU
Feb 18, 2024
61b446f
add two matric and type key if dup non-idempotent write retried
Feb 26, 2024
0d1152d
deal with conflict
Feb 26, 2024
0db1a56
pull
Feb 27, 2024
cef197a
small fix
Feb 27, 2024
a4f510a
PASS IWYU
Feb 27, 2024
538090b
format code
Feb 27, 2024
c5e39ca
Update src/common/duplication_common.cpp
ninsmiracle Mar 4, 2024
11c013f
Merge branch 'apache:master' into support_no_idempotent_dup
ninsmiracle Mar 4, 2024
b521752
fix by github comment
Mar 4, 2024
0e19a01
format code
Mar 4, 2024
e827c78
pass IWYU
Mar 6, 2024
bb56598
pass IWYU2
Mar 7, 2024
3d0694f
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 13, 2024
7833e8c
fix by comment and format
Mar 13, 2024
95953ac
Merge branch 'support_no_idempotent_dup' of github.com:ninsmiracle/in…
Mar 13, 2024
d8e26b1
make comment in code more clearly
Mar 14, 2024
3831f7a
make function name better
Mar 14, 2024
01fa7af
fix type_force_send_non_idempotent_if_need
Mar 14, 2024
c400ef7
Update src/common/duplication_common.cpp
ninsmiracle Mar 14, 2024
8dc378f
move DSN_DECLARE in duplication common
Mar 14, 2024
fcaabfe
move dup_unsafe_received_non_idempotent_duplicate_request discription…
Mar 14, 2024
cc853e7
pass IWYU
Mar 14, 2024
5d35724
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 14, 2024
76faaa7
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 14, 2024
2f90f2e
Update src/server/pegasus_mutation_duplicator.cpp
ninsmiracle Mar 14, 2024
884aabe
Merge branch 'master' into support_no_idempotent_dup
ninsmiracle Aug 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "nlohmann/json_fwd.hpp"
#include "utils/config_api.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/singleton.h"
#include "utils/time_utils.h"
Expand All @@ -39,6 +40,16 @@ DSN_DEFINE_uint32(replication,
"send mutation log batch bytes size per rpc");
DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);


DSN_DEFINE_bool(
replication,
duplication_unsafe_allow_non_idempotent,
false,
"Turn on the switch so that the cluster can accept non-idempotent writes and forward these "
"writes via duplication. Note that this switch may cause data inconsistency between "
"clusters. So we say it is unsafe.");
DSN_TAG_VARIABLE(duplication_unsafe_allow_non_idempotent, FT_MUTABLE);

// While many clusters are duplicated to a target cluster, we have to add many cluster
// ids to the `*.ini` file of the target cluster, and the target cluster might be restarted
// very frequently.
Expand All @@ -50,6 +61,7 @@ DSN_DEFINE_bool(replication,
false,
"Allow any other cluster id except myself to be ignored for duplication");


namespace dsn {
namespace replication {

Expand Down
3 changes: 0 additions & 3 deletions src/common/duplication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@
#include "duplication_types.h"
#include "runtime/rpc/rpc_holder.h"
#include "utils/errors.h"
#include "utils/flags.h"
#include "utils/fmt_utils.h"

DSN_DECLARE_uint32(duplicate_log_batch_bytes);

namespace dsn {
namespace replication {

Expand Down
7 changes: 6 additions & 1 deletion src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#include <iterator>
#include <map>
#include <string>
#include <string_view>
#include <utility>

#include <string_view>

#include "absl/strings/string_view.h"
#include "common/duplication_common.h"
#include "duplication_types.h"
#include "load_from_private_log.h"
Expand All @@ -34,6 +36,7 @@
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"

Expand All @@ -58,6 +61,8 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kMutations,
"The number of mutations read from private log for dup");

DSN_DECLARE_uint32(duplicate_log_batch_bytes);

namespace dsn {
namespace replication {

Expand Down
6 changes: 5 additions & 1 deletion src/replica/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"

DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent);

METRIC_DEFINE_gauge_int64(replica,
dup_recent_lost_mutations,
dsn::metric_unit::kMutations,
Expand Down Expand Up @@ -192,7 +195,8 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree
// ERR_OPERATION_DISABLED, but there could still be a mutation written
// before the duplication was added.
// To ignore means this write will be lost, which is acceptable under this rare case.
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) {
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent &&
!FLAGS_duplication_unsafe_allow_non_idempotent) {
continue;
}

Expand Down
1 change: 1 addition & 0 deletions src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class binary_reader;
class binary_writer;
class blob;
class gpid;

namespace utils {
class latency_tracer;
} // namespace utils
Expand Down
5 changes: 3 additions & 2 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ DSN_DEFINE_bool(replication,
true,
"reject client write requests if disk status is space insufficient");
DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE);

DSN_DEFINE_int32(replication,
prepare_timeout_ms_for_secondaries,
3000,
Expand All @@ -106,6 +105,7 @@ DSN_DEFINE_uint64(

DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
DSN_DECLARE_int32(staleness_for_commit);
DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent);

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -156,7 +156,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
if (is_duplication_master() && !spec->rpc_request_is_write_idempotent &&
!FLAGS_duplication_unsafe_allow_non_idempotent) {
// Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
// make this write produce the same result on multiple clusters.
METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests);
Expand Down
2 changes: 2 additions & 0 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
INIT_COUNTER(incr_qps);
INIT_COUNTER(check_and_set_qps);
INIT_COUNTER(check_and_mutate_qps);
INIT_COUNTER(dup_unsafe_received_non_idempotent_duplicate_request);
INIT_COUNTER(scan_qps);
INIT_COUNTER(duplicate_qps);
INIT_COUNTER(dup_shipped_ops);
INIT_COUNTER(dup_retry_non_idempotent_duplicate_request);
INIT_COUNTER(dup_failed_shipping_ops);
INIT_COUNTER(dup_recent_mutation_loss_count);
INIT_COUNTER(recent_read_cu);
Expand Down
6 changes: 6 additions & 0 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ class info_collector
incr_qps->set(row_stats.incr_qps);
check_and_set_qps->set(row_stats.check_and_set_qps);
check_and_mutate_qps->set(row_stats.check_and_mutate_qps);
dup_unsafe_received_non_idempotent_duplicate_request->set(
row_stats.dup_unsafe_received_non_idempotent_duplicate_request);
scan_qps->set(row_stats.scan_qps);
duplicate_qps->set(row_stats.duplicate_qps);
dup_shipped_ops->set(row_stats.dup_shipped_ops);
dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops);
dup_retry_non_idempotent_duplicate_request->set(
row_stats.dup_retry_non_idempotent_duplicate_request);
dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count);
recent_read_cu->set(row_stats.recent_read_cu);
recent_write_cu->set(row_stats.recent_write_cu);
Expand Down Expand Up @@ -144,10 +148,12 @@ class info_collector
::dsn::perf_counter_wrapper incr_qps;
::dsn::perf_counter_wrapper check_and_set_qps;
::dsn::perf_counter_wrapper check_and_mutate_qps;
::dsn::perf_counter_wrapper dup_unsafe_received_non_idempotent_duplicate_request;
::dsn::perf_counter_wrapper scan_qps;
::dsn::perf_counter_wrapper duplicate_qps;
::dsn::perf_counter_wrapper dup_shipped_ops;
::dsn::perf_counter_wrapper dup_failed_shipping_ops;
::dsn::perf_counter_wrapper dup_retry_non_idempotent_duplicate_request;
::dsn::perf_counter_wrapper dup_recent_mutation_loss_count;
::dsn::perf_counter_wrapper recent_read_cu;
::dsn::perf_counter_wrapper recent_write_cu;
Expand Down
77 changes: 77 additions & 0 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "duplication_internal_types.h"
#include "pegasus/client.h"
#include "pegasus_key_schema.h"
#include "pegasus_rpc_types.h"
#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
#include "runtime/message_utils.h"
Expand All @@ -62,6 +63,14 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of failed DUPLICATE requests sent from client");

METRIC_DEFINE_counter(replica,
dup_retry_non_idempotent_duplicate_request,
dsn::metric_unit::kRequests,
"The number of retried non-idempotent DUPLICATE requests sent from client");

DSN_DECLARE_uint32(duplicate_log_batch_bytes);
DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent);

namespace dsn {
namespace replication {
struct replica_base;
Expand Down Expand Up @@ -111,6 +120,22 @@ using namespace dsn::literals::chrono_literals;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_INCR) {
dsn::apps::incr_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dsn::apps::check_and_set_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
dsn::apps::check_and_mutate_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}

LOG_FATAL("unexpected task code: {}", tc);
__builtin_unreachable();
}
Expand Down Expand Up @@ -215,6 +240,9 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
// retry this rpc
_inflights[hash].push_front(rpc);
_env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the non-idempotent request be retried twice here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,it will. So I record specific raw_key into log.


type_force_send_non_idempotent_if_need(rpc);

return;
}
if (_inflights[hash].empty()) {
Expand All @@ -231,6 +259,55 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
}
}

void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc &rpc)
{
if (!FLAGS_duplication_unsafe_allow_non_idempotent) {
return;
}

// there maybe more than one mutation in one dup rpc
for (auto entry : rpc.request().entries) {
// not a non idempotent request
if (!_non_idempotent_codes.count(entry.task_code)) {
continue;
}

METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request);
dsn::message_ex *write = dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message);

if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
incr_rpc raw_rpc(write);

LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing "
"duplication, key is '{}'",
raw_rpc.request().key);
continue;
}

if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
check_and_set_rpc raw_rpc(write);

LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried "
"when doing duplication, hash key '{}', check sort key '{}', set sort "
"key '{}'",
raw_rpc.request().hash_key,
raw_rpc.request().check_sort_key,
raw_rpc.request().set_sort_key);
continue;
}

if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
check_and_mutate_rpc raw_rpc(write);

LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been "
"retried when doing duplication, hash key is '{}', sort key is '{}'.",
raw_rpc.request().hash_key,
raw_rpc.request().check_sort_key);
continue;
}
}
}

void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb)
{
_total_shipped_size = 0;
Expand Down
13 changes: 13 additions & 0 deletions src/server/pegasus_mutation_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,26 @@
#include <stdint.h>
#include <deque>
#include <map>
#include <set>
#include <string>

#include "absl/strings/string_view.h"
#include "replica/duplication/mutation_duplicator.h"
#include "rrdb/rrdb.client.h"
#include "rrdb/rrdb.code.definition.h"
#include "runtime/pipeline.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_tracker.h"
#include "utils/chrono_literals.h"

#include <string_view>
#include "utils/metrics.h"
#include "utils/zlocks.h"

namespace dsn {
class blob;
class error_code;

namespace replication {
struct replica_base;
} // namespace replication
Expand Down Expand Up @@ -73,6 +78,8 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator

void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err);

void type_force_send_non_idempotent_if_need(duplicate_rpc &rpc);

private:
friend class pegasus_mutation_duplicator_test;

Expand All @@ -89,8 +96,14 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator

size_t _total_shipped_size{0};

const std::set<dsn::task_code> _non_idempotent_codes = {
dsn::apps::RPC_RRDB_RRDB_INCR,
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE};

METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests);
METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests);
METRIC_VAR_DECLARE_counter(dup_retry_non_idempotent_duplicate_request);
};

// Decodes the binary `request_data` into write request in thrift struct, and
Expand Down
44 changes: 44 additions & 0 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of DUPLICATE requests");

METRIC_DEFINE_counter(replica,
dup_unsafe_received_non_idempotent_duplicate_request,
dsn::metric_unit::kRequests,
"receive non-idempotent request from master cluster via duplication when "
"FLAG_duplication_unsafe_allow_non_idempotent set as true."
"This metric greater than zero means that there is already the possibility "
"of inconsistency between clusters.");

METRIC_DEFINE_percentile_int64(replica,
dup_time_lag_ms,
dsn::metric_unit::kMilliSeconds,
Expand Down Expand Up @@ -168,6 +176,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(check_and_set_latency_ns),
METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
METRIC_VAR_INIT_replica(dup_requests),
METRIC_VAR_INIT_replica(dup_unsafe_received_non_idempotent_duplicate_request),
METRIC_VAR_INIT_replica(dup_time_lag_ms),
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
Expand Down Expand Up @@ -415,6 +424,41 @@ int pegasus_write_service::duplicate(int64_t decree,
}
continue;
}

// Parse non-idempotent writes via duplication
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {

METRIC_VAR_INCREMENT(dup_unsafe_received_non_idempotent_duplicate_request);

if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
incr_rpc rpc(write);
resp.__set_error(_impl->incr(ctx.decree, rpc.request(), rpc.response()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to describe what's you aim here.

if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
check_and_set_rpc rpc(write);
resp.__set_error(_impl->check_and_set(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
check_and_mutate_rpc rpc(write);
resp.__set_error(
_impl->check_and_mutate(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
}

resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code));
return empty_put(ctx.decree);
Expand Down
Loading
Loading