diff --git a/src/common/duplication_common.cpp b/src/common/duplication_common.cpp index 4dc5323c65..8354e9a28f 100644 --- a/src/common/duplication_common.cpp +++ b/src/common/duplication_common.cpp @@ -29,6 +29,7 @@ #include "nlohmann/json_fwd.hpp" #include "utils/config_api.h" #include "utils/error_code.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/singleton.h" #include "utils/time_utils.h" @@ -39,6 +40,16 @@ DSN_DEFINE_uint32(replication, "send mutation log batch bytes size per rpc"); DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE); + +DSN_DEFINE_bool( + replication, + duplication_unsafe_allow_non_idempotent, + false, + "Turn on the switch so that the cluster can accept non-idempotent writes and forward these " + "writes via duplication. Note that this switch may cause data inconsistency between " + "clusters. So we say it is unsafe."); +DSN_TAG_VARIABLE(duplication_unsafe_allow_non_idempotent, FT_MUTABLE); + // While many clusters are duplicated to a target cluster, we have to add many cluster // ids to the `*.ini` file of the target cluster, and the target cluster might be restarted // very frequently. @@ -50,6 +61,7 @@ DSN_DEFINE_bool(replication, false, "Allow any other cluster id except myself to be ignored for duplication"); + namespace dsn { namespace replication { diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h index fe0ec067b4..aca32dad21 100644 --- a/src/common/duplication_common.h +++ b/src/common/duplication_common.h @@ -26,11 +26,8 @@ #include "duplication_types.h" #include "runtime/rpc/rpc_holder.h" #include "utils/errors.h" -#include "utils/flags.h" #include "utils/fmt_utils.h" -DSN_DECLARE_uint32(duplicate_log_batch_bytes); - namespace dsn { namespace replication { diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index b0e501409f..2f4488c1b1 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -18,9 +18,11 @@ #include #include #include +#include #include -#include + +#include "absl/strings/string_view.h" #include "common/duplication_common.h" #include "duplication_types.h" #include "load_from_private_log.h" @@ -34,6 +36,7 @@ #include "utils/error_code.h" #include "utils/errors.h" #include "utils/fail_point.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/ports.h" @@ -58,6 +61,8 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kMutations, "The number of mutations read from private log for dup"); +DSN_DECLARE_uint32(duplicate_log_batch_bytes); + namespace dsn { namespace replication { diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index efcd1eeac3..5c2c87144f 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -33,9 +33,12 @@ #include "utils/autoref_ptr.h" #include "utils/blob.h" #include "utils/error_code.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/ports.h" +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); + METRIC_DEFINE_gauge_int64(replica, dup_recent_lost_mutations, dsn::metric_unit::kMutations, @@ -192,7 +195,8 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree // ERR_OPERATION_DISABLED, but there could still be a mutation written // before the duplication was added. // To ignore means this write will be lost, which is acceptable under this rare case. - if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) { + if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && + !FLAGS_duplication_unsafe_allow_non_idempotent) { continue; } diff --git a/src/replica/mutation.h b/src/replica/mutation.h index cd99d957c4..fda3c3f93d 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -48,6 +48,7 @@ class binary_reader; class binary_writer; class blob; class gpid; + namespace utils { class latency_tracer; } // namespace utils diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 8f48438d1a..5b7acfcc45 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -83,7 +83,6 @@ DSN_DEFINE_bool(replication, true, "reject client write requests if disk status is space insufficient"); DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE); - DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 3000, @@ -106,6 +105,7 @@ DSN_DEFINE_uint64( DSN_DECLARE_int32(max_mutation_count_in_prepare_list); DSN_DECLARE_int32(staleness_for_commit); +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); namespace dsn { namespace replication { @@ -156,7 +156,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } - if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) { + if (is_duplication_master() && !spec->rpc_request_is_write_idempotent && + !FLAGS_duplication_unsafe_allow_non_idempotent) { // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to // make this write produce the same result on multiple clusters. METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests); diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index c2b1fe0943..584d26a0e7 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -209,9 +209,11 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s INIT_COUNTER(incr_qps); INIT_COUNTER(check_and_set_qps); INIT_COUNTER(check_and_mutate_qps); + INIT_COUNTER(dup_unsafe_received_non_idempotent_duplicate_request); INIT_COUNTER(scan_qps); INIT_COUNTER(duplicate_qps); INIT_COUNTER(dup_shipped_ops); + INIT_COUNTER(dup_retry_non_idempotent_duplicate_request); INIT_COUNTER(dup_failed_shipping_ops); INIT_COUNTER(dup_recent_mutation_loss_count); INIT_COUNTER(recent_read_cu); diff --git a/src/server/info_collector.h b/src/server/info_collector.h index fe168ad7f1..67e6ce65f8 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -66,10 +66,14 @@ class info_collector incr_qps->set(row_stats.incr_qps); check_and_set_qps->set(row_stats.check_and_set_qps); check_and_mutate_qps->set(row_stats.check_and_mutate_qps); + dup_unsafe_received_non_idempotent_duplicate_request->set( + row_stats.dup_unsafe_received_non_idempotent_duplicate_request); scan_qps->set(row_stats.scan_qps); duplicate_qps->set(row_stats.duplicate_qps); dup_shipped_ops->set(row_stats.dup_shipped_ops); dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops); + dup_retry_non_idempotent_duplicate_request->set( + row_stats.dup_retry_non_idempotent_duplicate_request); dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count); recent_read_cu->set(row_stats.recent_read_cu); recent_write_cu->set(row_stats.recent_write_cu); @@ -144,10 +148,12 @@ class info_collector ::dsn::perf_counter_wrapper incr_qps; ::dsn::perf_counter_wrapper check_and_set_qps; ::dsn::perf_counter_wrapper check_and_mutate_qps; + ::dsn::perf_counter_wrapper dup_unsafe_received_non_idempotent_duplicate_request; ::dsn::perf_counter_wrapper scan_qps; ::dsn::perf_counter_wrapper duplicate_qps; ::dsn::perf_counter_wrapper dup_shipped_ops; ::dsn::perf_counter_wrapper dup_failed_shipping_ops; + ::dsn::perf_counter_wrapper dup_retry_non_idempotent_duplicate_request; ::dsn::perf_counter_wrapper dup_recent_mutation_loss_count; ::dsn::perf_counter_wrapper recent_read_cu; ::dsn::perf_counter_wrapper recent_write_cu; diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 8b82557bb4..04ed3321f9 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -37,6 +37,7 @@ #include "duplication_internal_types.h" #include "pegasus/client.h" #include "pegasus_key_schema.h" +#include "pegasus_rpc_types.h" #include "rrdb/rrdb.code.definition.h" #include "rrdb/rrdb_types.h" #include "runtime/message_utils.h" @@ -62,6 +63,14 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kRequests, "The number of failed DUPLICATE requests sent from client"); +METRIC_DEFINE_counter(replica, + dup_retry_non_idempotent_duplicate_request, + dsn::metric_unit::kRequests, + "The number of retried non-idempotent DUPLICATE requests sent from client"); + +DSN_DECLARE_uint32(duplicate_log_batch_bytes); +DSN_DECLARE_bool(duplication_unsafe_allow_non_idempotent); + namespace dsn { namespace replication { struct replica_base; @@ -111,6 +120,22 @@ using namespace dsn::literals::chrono_literals; dsn::from_blob_to_thrift(data, thrift_request); return pegasus_hash_key_hash(thrift_request.hash_key); } + if (tc == dsn::apps::RPC_RRDB_RRDB_INCR) { + dsn::apps::incr_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + dsn::apps::check_and_set_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + dsn::apps::check_and_mutate_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + LOG_FATAL("unexpected task code: {}", tc); __builtin_unreachable(); } @@ -215,6 +240,9 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, // retry this rpc _inflights[hash].push_front(rpc); _env.schedule([hash, cb, this]() { send(hash, cb); }, 1_s); + + type_force_send_non_idempotent_if_need(rpc); + return; } if (_inflights[hash].empty()) { @@ -231,6 +259,55 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash, } } +void pegasus_mutation_duplicator::type_force_send_non_idempotent_if_need(duplicate_rpc &rpc) +{ + if (!FLAGS_duplication_unsafe_allow_non_idempotent) { + return; + } + + // there maybe more than one mutation in one dup rpc + for (auto entry : rpc.request().entries) { + // not a non idempotent request + if (!_non_idempotent_codes.count(entry.task_code)) { + continue; + } + + METRIC_VAR_INCREMENT(dup_retry_non_idempotent_duplicate_request); + dsn::message_ex *write = dsn::from_blob_to_received_msg(entry.task_code, entry.raw_message); + + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + incr_rpc raw_rpc(write); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_INCR has been retried when doing " + "duplication, key is '{}'", + raw_rpc.request().key); + continue; + } + + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + check_and_set_rpc raw_rpc(write); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_SET has been retried " + "when doing duplication, hash key '{}', check sort key '{}', set sort " + "key '{}'", + raw_rpc.request().hash_key, + raw_rpc.request().check_sort_key, + raw_rpc.request().set_sort_key); + continue; + } + + if (entry.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + check_and_mutate_rpc raw_rpc(write); + + LOG_DEBUG("Non-indempotent write RPC_RRDB_RRDB_CHECK_AND_MUTATE has been " + "retried when doing duplication, hash key is '{}', sort key is '{}'.", + raw_rpc.request().hash_key, + raw_rpc.request().check_sort_key); + continue; + } + } +} + void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb) { _total_shipped_size = 0; diff --git a/src/server/pegasus_mutation_duplicator.h b/src/server/pegasus_mutation_duplicator.h index 0ac19c68df..bc8790d811 100644 --- a/src/server/pegasus_mutation_duplicator.h +++ b/src/server/pegasus_mutation_duplicator.h @@ -23,14 +23,18 @@ #include #include #include +#include #include +#include "absl/strings/string_view.h" #include "replica/duplication/mutation_duplicator.h" #include "rrdb/rrdb.client.h" +#include "rrdb/rrdb.code.definition.h" #include "runtime/pipeline.h" #include "runtime/task/task_code.h" #include "runtime/task/task_tracker.h" #include "utils/chrono_literals.h" + #include #include "utils/metrics.h" #include "utils/zlocks.h" @@ -38,6 +42,7 @@ namespace dsn { class blob; class error_code; + namespace replication { struct replica_base; } // namespace replication @@ -73,6 +78,8 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator void on_duplicate_reply(uint64_t hash, callback, duplicate_rpc, dsn::error_code err); + void type_force_send_non_idempotent_if_need(duplicate_rpc &rpc); + private: friend class pegasus_mutation_duplicator_test; @@ -89,8 +96,14 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator size_t _total_shipped_size{0}; + const std::set _non_idempotent_codes = { + dsn::apps::RPC_RRDB_RRDB_INCR, + dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET, + dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE}; + METRIC_VAR_DECLARE_counter(dup_shipped_successful_requests); METRIC_VAR_DECLARE_counter(dup_shipped_failed_requests); + METRIC_VAR_DECLARE_counter(dup_retry_non_idempotent_duplicate_request); }; // Decodes the binary `request_data` into write request in thrift struct, and diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 2fb5733265..b46d1a7eca 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -121,6 +121,14 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kRequests, "The number of DUPLICATE requests"); +METRIC_DEFINE_counter(replica, + dup_unsafe_received_non_idempotent_duplicate_request, + dsn::metric_unit::kRequests, + "receive non-idempotent request from master cluster via duplication when " + "FLAG_duplication_unsafe_allow_non_idempotent set as true." + "This metric greater than zero means that there is already the possibility " + "of inconsistency between clusters."); + METRIC_DEFINE_percentile_int64(replica, dup_time_lag_ms, dsn::metric_unit::kMilliSeconds, @@ -168,6 +176,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) METRIC_VAR_INIT_replica(check_and_set_latency_ns), METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), METRIC_VAR_INIT_replica(dup_requests), + METRIC_VAR_INIT_replica(dup_unsafe_received_non_idempotent_duplicate_request), METRIC_VAR_INIT_replica(dup_time_lag_ms), METRIC_VAR_INIT_replica(dup_lagging_writes), _put_batch_size(0), @@ -415,6 +424,41 @@ int pegasus_write_service::duplicate(int64_t decree, } continue; } + + // Parse non-idempotent writes via duplication + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || + request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || + request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + + METRIC_VAR_INCREMENT(dup_unsafe_received_non_idempotent_duplicate_request); + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + incr_rpc rpc(write); + resp.__set_error(_impl->incr(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + check_and_set_rpc rpc(write); + resp.__set_error(_impl->check_and_set(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + check_and_mutate_rpc rpc(write); + resp.__set_error( + _impl->check_and_mutate(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + } + resp.__set_error(rocksdb::Status::kInvalidArgument); resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); return empty_put(ctx.decree); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index f430b48b0c..9c68601cf5 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -222,6 +222,7 @@ class pegasus_write_service : dsn::replication::replica_base METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); METRIC_VAR_DECLARE_counter(dup_requests); + METRIC_VAR_DECLARE_counter(dup_unsafe_received_non_idempotent_duplicate_request); METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); METRIC_VAR_DECLARE_counter(dup_lagging_writes); diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp b/src/server/test/pegasus_mutation_duplicator_test.cpp index f4e6d871db..b03cb361c3 100644 --- a/src/server/test/pegasus_mutation_duplicator_test.cpp +++ b/src/server/test/pegasus_mutation_duplicator_test.cpp @@ -30,7 +30,6 @@ #include "backup_types.h" #include "base/pegasus_rpc_types.h" -#include "common/duplication_common.h" #include "common/gpid.h" #include "common/replication.codes.h" #include "duplication_internal_types.h" @@ -45,6 +44,9 @@ #include "runtime/rpc/rpc_message.h" #include "utils/blob.h" #include "utils/error_code.h" +#include "utils/flags.h" + +DSN_DECLARE_uint32(duplicate_log_batch_bytes); namespace pegasus { namespace server { diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 912fae9172..a5fa9f65e8 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -1277,9 +1277,13 @@ struct row_data incr_qps += row.incr_qps; check_and_set_qps += row.check_and_set_qps; check_and_mutate_qps += row.check_and_mutate_qps; + dup_unsafe_received_non_idempotent_duplicate_request += + row.dup_unsafe_received_non_idempotent_duplicate_request; scan_qps += row.scan_qps; duplicate_qps += row.duplicate_qps; dup_shipped_ops += row.dup_shipped_ops; + dup_retry_non_idempotent_duplicate_request += + row.dup_retry_non_idempotent_duplicate_request; dup_failed_shipping_ops += row.dup_failed_shipping_ops; dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count; recent_read_cu += row.recent_read_cu; @@ -1344,9 +1348,11 @@ struct row_data double incr_qps = 0; double check_and_set_qps = 0; double check_and_mutate_qps = 0; + double dup_unsafe_received_non_idempotent_duplicate_request = 0; double scan_qps = 0; double duplicate_qps = 0; double dup_shipped_ops = 0; + double dup_retry_non_idempotent_duplicate_request = 0; double dup_failed_shipping_ops = 0; double dup_recent_mutation_loss_count = 0; double recent_read_cu = 0; @@ -1669,12 +1675,16 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, row.check_and_set_qps += value; else if (counter_name == "check_and_mutate_qps") row.check_and_mutate_qps += value; + else if (counter_name == "dup_unsafe_received_non_idempotent_duplicate_request") + row.dup_unsafe_received_non_idempotent_duplicate_request += value; else if (counter_name == "scan_qps") row.scan_qps += value; else if (counter_name == "duplicate_qps") row.duplicate_qps += value; else if (counter_name == "dup_shipped_ops") row.dup_shipped_ops += value; + else if (counter_name == "dup_retry_non_idempotent_duplicate_request") + row.dup_retry_non_idempotent_duplicate_request += value; else if (counter_name == "dup_failed_shipping_ops") row.dup_failed_shipping_ops += value; else if (counter_name == "dup_recent_mutation_loss_count") diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index b75e27f56a..429d90cff5 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -529,6 +529,9 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) sum.incr_qps += row.incr_qps; sum.check_and_set_qps += row.check_and_set_qps; sum.check_and_mutate_qps += row.check_and_mutate_qps; + sum.dup_unsafe_received_non_idempotent_duplicate_request += + row.dup_unsafe_received_non_idempotent_duplicate_request; + sum.scan_qps += row.scan_qps; sum.recent_read_cu += row.recent_read_cu; sum.recent_write_cu += row.recent_write_cu; @@ -625,6 +628,7 @@ bool app_stat(command_executor *e, shell_context *sc, arguments args) tp.append_data(row.incr_qps); tp.append_data(row.check_and_set_qps); tp.append_data(row.check_and_mutate_qps); + tp.append_data(row.dup_unsafe_received_non_idempotent_duplicate_request); tp.append_data(row.scan_qps); tp.append_data(row.recent_read_cu); tp.append_data(row.recent_write_cu);