From 48abb556f3db201e0756f47c5cd1cf53b0fc72f6 Mon Sep 17 00:00:00 2001 From: lihzeng Date: Tue, 20 Aug 2024 02:15:02 -0700 Subject: [PATCH 1/3] Enhance test for stream asio 1. client send requests to wrong endpoint 2. client is closed after sending requests 3. server timeout after sending requests 4. server is closed after sending requests --- tests/unit/asio_service_stream_test.cxx | 392 ++++++++++++++++++------ 1 file changed, 291 insertions(+), 101 deletions(-) diff --git a/tests/unit/asio_service_stream_test.cxx b/tests/unit/asio_service_stream_test.cxx index fd2d6ab..b276a17 100644 --- a/tests/unit/asio_service_stream_test.cxx +++ b/tests/unit/asio_service_stream_test.cxx @@ -29,15 +29,58 @@ using namespace raft_functional_common; namespace asio_service_stream_test { const std::string TEST_MSG = "stream-test-msg-str"; + class stream_statistic { + public: + stream_statistic() + : resp_log_index_(0) + , msg_mismatch_(false) + , reqs_out_of_order_(false) + , next_log_index_(1) + { + std::string server_log_file_name = "./srv" + std::to_string(1) + ".log"; + server_logger_ = cs_new(server_log_file_name); + std::string client_log_file_name = "./srv" + std::to_string(2) + ".log"; + client_logger_ = cs_new(client_log_file_name); + } + + bool waiting_for_responses(int timeout_ms = 3000) { + TestSuite::_msg("wait for responses (up to %d ms)\n", timeout_ms); + ea_.wait_ms(timeout_ms); + SimpleLogger* ll = client_logger_->getLogger(); + _log_info(ll, "resp: %ld, message sent: %ld", next_log_index_.load(), num_messages_sent_); + return (next_log_index_ == num_messages_sent_ + 1); + } + + void wait_for_receiving_requests(int timeout_ms = 1000) { + TestSuite::_msg("wait for receiving requests (up to %d ms)\n", timeout_ms); + req_ea_.wait_ms(1000); + } + + // server + std::atomic resp_log_index_; + std::atomic msg_mismatch_; + std::atomic reqs_out_of_order_; + EventAwaiter req_ea_; + + // client + std::atomic next_log_index_; + size_t sent_log_index_ = 0; + size_t num_messages_sent_ = 0; + size_t error_req_count_ = 0; + EventAwaiter ea_; + + // log + ptr server_logger_; + ptr client_logger_; + }; + class stream_msg_handler : public nuraft::msg_handler { public: stream_msg_handler(context* ctx, const init_options& opt, - ptr log_wrapper) + ptr stream_stat) : msg_handler(ctx, opt) - , my_log_wrapper_(log_wrapper) - , streamed_log_index(0) - , msg_mismatch(false) + , stream_stat_(stream_stat) {} ptr process_req(req_msg& req, const req_ext_params& ext_params) { @@ -45,41 +88,119 @@ namespace asio_service_stream_test { msg_type::append_entries_response, id_, req.get_src()); - if (req.get_last_log_idx() == streamed_log_index) { - streamed_log_index++; - resp->accept(streamed_log_index.load()); + if (req.get_last_log_idx() == stream_stat_->resp_log_index_) { + stream_stat_->resp_log_index_++; + resp->accept(stream_stat_->resp_log_index_.load()); ptr buf = req.log_entries().at(0)->get_buf_ptr(); buf->pos(0); std::string buf_str = buf->get_str(); if (buf_str != TEST_MSG) { - SimpleLogger* ll = my_log_wrapper_->getLogger(); + SimpleLogger* ll = stream_stat_->server_logger_->getLogger(); _log_info(ll, "resp str: %s", buf_str.c_str()); - msg_mismatch.store(true); + stream_stat_->msg_mismatch_.store(true); } } else { - SimpleLogger* ll = my_log_wrapper_->getLogger(); + SimpleLogger* ll = stream_stat_->server_logger_->getLogger(); _log_info(ll, "req log index not match, req: %ld, current: %ld", - req.get_last_log_idx(), streamed_log_index.load()); + req.get_last_log_idx(), stream_stat_->resp_log_index_.load()); + stream_stat_->reqs_out_of_order_.store(true); } + + stream_stat_->req_ea_.invoke(); return resp; } - ptr my_log_wrapper_; - std::atomic streamed_log_index; - std::atomic msg_mismatch; + ptr stream_stat_; }; class stream_server { public: - stream_server(int id, int port) + stream_server(int id, + int port, + ptr& stream_stat) + : my_id_(id) + , port_(port) + , stream_stat_(stream_stat) + {} + + void stop_server() { + if (my_listener_) { + my_listener_->stop(); + my_listener_->shutdown(); + } + + if (asio_svc_) { + asio_svc_->stop(); + size_t count = 0; + while (asio_svc_->get_active_workers() && count < 500) { + // 10ms per tick. + timer_helper::sleep_ms(10); + count++; + } + } + + if (my_msg_handler_) { + my_msg_handler_->shutdown(); + } + } + + void init_server() { + ptr logger = stream_stat_->server_logger_; + // opts + asio_service::options asio_opt; + asio_opt.thread_pool_size_ = 2; + asio_opt.replicate_log_timestamp_ = false; + asio_opt.streaming_mode_ = true; + std::string endpoint = "localhost:"+std::to_string(port_); + asio_svc_ = cs_new(asio_opt, stream_stat_->server_logger_); + + // server + s_mgr_ = cs_new(my_id_, endpoint); + sm_ = cs_new( stream_stat_->server_logger_->getLogger() ); + ptr scheduler = asio_svc_; + ptr rpc_cli_factory = asio_svc_; + + my_listener_ = asio_svc_->create_rpc_listener(port_, logger); + + raft_params params; + context* ctx( new context( s_mgr_, sm_, my_listener_, logger, + rpc_cli_factory, scheduler, params ) ); + const raft_server::init_options& opt = raft_server::init_options(); + my_msg_handler_ = cs_new(ctx, opt, stream_stat_); + ptr handler = my_msg_handler_; + my_listener_->listen(handler); + } + + int my_id_; + int port_; + ptr s_mgr_; + ptr sm_; + ptr asio_svc_; + ptr my_listener_; + ptr my_msg_handler_; + ptr stream_stat_; + }; + + class stream_client { + public: + stream_client(int id, + int port, + ptr& stream_stat) : my_id_(id) , port_(port) - , next_log_index_(1) + , stream_stat_(stream_stat) { - init_server(); + asio_service::options asio_opt; + asio_opt.thread_pool_size_ = 2; + asio_opt.replicate_log_timestamp_ = false; + asio_opt.streaming_mode_ = true; + asio_svc_ = cs_new(asio_opt, stream_stat_->client_logger_); + + std::string endpoint = "localhost:"+std::to_string(port_); + client_ = asio_svc_->create_client(endpoint); } - void send_req(int count) { + void send_req(int count, int timeout_ms=0) { ptr msg = buffer::alloc(TEST_MSG.size() + 1); msg->put(TEST_MSG); @@ -88,54 +209,51 @@ namespace asio_service_stream_test { while (count > 0) { ptr req(cs_new( 1, msg_type::append_entries_request, 1, my_id_, - 1, sent_log_index_, 1)); + 1, stream_stat_->sent_log_index_, 1)); ptr log(cs_new(0, msg, log_val_type::app_log)); req->log_entries().push_back(log); rpc_handler h = (rpc_handler)std::bind( - &stream_server::handle_result, + &stream_client::handle_result, this, req, std::placeholders::_1, std::placeholders::_2); - my_client_->send(req, h); - sent_log_index_++; - pp.update(sent_log_index_); + client_->send(req, h, timeout_ms); + stream_stat_->sent_log_index_++; + pp.update(stream_stat_->sent_log_index_); count--; } pp.done(); - num_messages_sent_= sent_log_index_; + stream_stat_->num_messages_sent_= stream_stat_->sent_log_index_; } void handle_result(ptr& req, ptr& resp, ptr& err) { - if (resp->get_next_idx() == get_next_log_index()) { - next_log_index_++; + if (err) { + stream_stat_->error_req_count_++; + stream_stat_->next_log_index_++; + SimpleLogger* ll = stream_stat_->client_logger_->getLogger(); + _log_info(ll, "handle result err: %s", err->what()); } else { - SimpleLogger* ll = my_log_wrapper_->getLogger(); - _log_info(ll, "resp log index not match, resp: %ld, current: %ld", - resp->get_next_idx(), get_next_log_index()); - } - if (next_log_index_ == num_messages_sent_ + 1) { - ea.invoke(); + if (resp->get_next_idx() == stream_stat_->next_log_index_.load()) { + stream_stat_->next_log_index_++; + } else { + SimpleLogger* ll = stream_stat_->client_logger_->getLogger(); + _log_info(ll, "resp log index not match, resp: %ld, current: %ld", + resp->get_next_idx(), stream_stat_->next_log_index_.load()); + } } - } - - bool waiting_for_responses(int timeout_ms = 3000) { - TestSuite::_msg("wait for responses (up to %d ms)\n", timeout_ms); - ea.wait_ms(timeout_ms); - return (next_log_index_ == num_messages_sent_ + 1); - } - void stop_server() { - if (my_listener_) { - my_listener_->stop(); - my_listener_->shutdown(); + if (stream_stat_->next_log_index_ == stream_stat_->num_messages_sent_ + 1) { + stream_stat_->ea_.invoke(); } + } + void stop() { if (asio_svc_) { asio_svc_->stop(); size_t count = 0; @@ -147,78 +265,84 @@ namespace asio_service_stream_test { } } - ulong get_resp_log_index() { - return my_msg_handler_->streamed_log_index; - } - - bool is_msg_mismatch() { - return my_msg_handler_->msg_mismatch; - } - - ulong get_next_log_index() { - return next_log_index_; - } - - private: int my_id_; int port_; - std::atomic next_log_index_; - ulong sent_log_index_ = 0; + ptr stream_stat_; ptr asio_svc_; - ptr my_client_; - ptr my_listener_; - ptr my_log_wrapper_; - ptr my_log_; - ptr my_msg_handler_; - size_t num_messages_sent_ = 0; - EventAwaiter ea; + ptr client_; + }; - void init_server() { - std::string log_file_name = "./srv" + std::to_string(my_id_) + ".log"; - my_log_wrapper_ = cs_new(log_file_name); - my_log_ = my_log_wrapper_; + int stream_server_happy_path_test() { + reset_log_files(); + ptr stat_ptr = cs_new(); - // opts - asio_service::options asio_opt; - asio_opt.thread_pool_size_ = 2; - asio_opt.replicate_log_timestamp_ = false; - asio_opt.streaming_mode_ = true; - asio_svc_ = cs_new(asio_opt, my_log_); + stream_server s(1, 20010, stat_ptr); + s.init_server(); + // send request + int count = 1000; + stream_client client(2, 20010, stat_ptr); + client.send_req(count); - // client - std::string endpoint = "localhost:"+std::to_string(port_); - my_client_ = asio_svc_->create_client(endpoint); + // check req + CHK_TRUE(stat_ptr->waiting_for_responses()); + CHK_EQ(count, stat_ptr->resp_log_index_.load()); + CHK_EQ(count, stat_ptr->next_log_index_ - 1); + CHK_FALSE(stat_ptr->msg_mismatch_.load()); - // server - ptr s_mgr = cs_new(my_id_, endpoint); - ptr sm = cs_new( my_log_wrapper_->getLogger() ); - ptr scheduler = asio_svc_; - ptr rpc_cli_factory = asio_svc_; - my_listener_ = asio_svc_->create_rpc_listener(port_, my_log_); + // stop + client.stop(); + s.stop_server(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; + } - raft_params params; - context* ctx( new context( s_mgr, sm, my_listener_, my_log_, - rpc_cli_factory, scheduler, params ) ); - const raft_server::init_options& opt = raft_server::init_options(); - my_msg_handler_ = cs_new(ctx, opt, my_log_wrapper_); - ptr handler = my_msg_handler_; - my_listener_->listen(handler); - } - }; + int client_send_to_wrong_endpoint_test() { + reset_log_files(); + ptr stat_ptr = cs_new(); - int stream_server_happy_path_test() { + stream_server s(1, 20010, stat_ptr); + s.init_server(); + // send request + int count = 1000; + stream_client client(2, 20011, stat_ptr); + client.send_req(count); + + // check req if finish + CHK_TRUE(stat_ptr->waiting_for_responses()); + CHK_EQ(count, stat_ptr->error_req_count_); + CHK_EQ(count, stat_ptr->next_log_index_ - 1); + CHK_FALSE(stat_ptr->msg_mismatch_.load()); + + // stop + client.stop(); + s.stop_server(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; + } + + int client_close_after_sending_test() { reset_log_files(); + ptr stat_ptr = cs_new(); - stream_server s(1, 20010); + stream_server s(1, 20010, stat_ptr); + s.init_server(); // send request int count = 1000; - s.send_req(count); + { + stream_client client(2, 20010, stat_ptr); + stat_ptr->wait_for_receiving_requests(); + client.send_req(count); + client.stop(); + } - // check req - CHK_TRUE(s.waiting_for_responses()); - CHK_EQ(count, s.get_resp_log_index()); - CHK_EQ(count, s.get_next_log_index() - 1); - CHK_FALSE(s.is_msg_mismatch()); + // check req if finish + CHK_TRUE(stat_ptr->waiting_for_responses()); + CHK_TRUE(stat_ptr->error_req_count_ > 0); + CHK_EQ(count, stat_ptr->next_log_index_ - 1); + CHK_FALSE(stat_ptr->msg_mismatch_.load()); + CHK_FALSE(stat_ptr->reqs_out_of_order_.load()); // stop s.stop_server(); @@ -226,6 +350,64 @@ namespace asio_service_stream_test { SimpleLogger::shutdown(); return 0; } + + int server_timeout_test() { + reset_log_files(); + ptr stat_ptr = cs_new(); + + stream_server s(1, 20010, stat_ptr); + s.init_server(); + // send request + int count = 1000; + stream_client client(2, 20010, stat_ptr); + client.send_req(count, 2000); + stat_ptr->wait_for_receiving_requests(); + + // shutdown + s.stop_server(); + + // check req if finish + CHK_TRUE(stat_ptr->waiting_for_responses()); + CHK_TRUE(stat_ptr->error_req_count_ > 0); + CHK_EQ(count, stat_ptr->next_log_index_ - 1); + CHK_FALSE(stat_ptr->msg_mismatch_.load()); + CHK_FALSE(stat_ptr->reqs_out_of_order_.load()); + + client.stop(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; + } + + int server_close_after_sending_test() { + reset_log_files(); + ptr stat_ptr = cs_new(); + + stream_server* s = new stream_server(1, 20010, stat_ptr); + s->init_server(); + // send request + int count = 1000; + stream_client client(2, 20010, stat_ptr); + client.send_req(count); + stat_ptr->wait_for_receiving_requests(); + + // shutdown + s->stop_server(); + TestSuite::sleep_sec(1, "server shutting down"); + delete s; + + // check req if finish + CHK_TRUE(stat_ptr->waiting_for_responses()); + CHK_TRUE(stat_ptr->error_req_count_ > 0); + CHK_EQ(count, stat_ptr->next_log_index_ - 1); + CHK_FALSE(stat_ptr->msg_mismatch_.load()); + CHK_FALSE(stat_ptr->reqs_out_of_order_.load()); + + client.stop(); + TestSuite::sleep_sec(1, "client shutting down"); + SimpleLogger::shutdown(); + return 0; + } }; using namespace asio_service_stream_test; @@ -236,5 +418,13 @@ int main(int argc, char** argv) { ts.doTest("stream server happy path test", stream_server_happy_path_test); + ts.doTest("client send msg to wrong endpoint test", + client_send_to_wrong_endpoint_test); + ts.doTest("cient close after sending test", + client_close_after_sending_test); + ts.doTest("server timeout test", + server_timeout_test); + ts.doTest("server close after sending test", + server_close_after_sending_test); return 0; } From f34a650e1f804ede821d4586209c4631324f54c4 Mon Sep 17 00:00:00 2001 From: lihzeng Date: Wed, 21 Aug 2024 06:55:36 -0700 Subject: [PATCH 2/3] test enhance --- tests/unit/asio_service_stream_test.cxx | 707 ++++++++++++------------ 1 file changed, 361 insertions(+), 346 deletions(-) diff --git a/tests/unit/asio_service_stream_test.cxx b/tests/unit/asio_service_stream_test.cxx index b276a17..4c74db0 100644 --- a/tests/unit/asio_service_stream_test.cxx +++ b/tests/unit/asio_service_stream_test.cxx @@ -27,387 +27,402 @@ using namespace nuraft; using namespace raft_functional_common; namespace asio_service_stream_test { - const std::string TEST_MSG = "stream-test-msg-str"; +const std::string TEST_MSG = "stream-test-msg-str"; - class stream_statistic { - public: - stream_statistic() +class stream_statistic { +public: + stream_statistic() : resp_log_index_(0) , msg_mismatch_(false) , reqs_out_of_order_(false) , next_log_index_(1) - { - std::string server_log_file_name = "./srv" + std::to_string(1) + ".log"; - server_logger_ = cs_new(server_log_file_name); - std::string client_log_file_name = "./srv" + std::to_string(2) + ".log"; - client_logger_ = cs_new(client_log_file_name); - } + , sent_log_index_(0) + , num_messages_sent_(0) + , error_req_count_(0) + { + open_logs(); + } - bool waiting_for_responses(int timeout_ms = 3000) { - TestSuite::_msg("wait for responses (up to %d ms)\n", timeout_ms); - ea_.wait_ms(timeout_ms); - SimpleLogger* ll = client_logger_->getLogger(); - _log_info(ll, "resp: %ld, message sent: %ld", next_log_index_.load(), num_messages_sent_); - return (next_log_index_ == num_messages_sent_ + 1); - } + bool wait_for_responses(int timeout_ms = 3000) { + TestSuite::_msg("wait for responses (up to %d ms)\n", timeout_ms); + ea_.wait_ms(timeout_ms); + CHK_EQ(next_log_index_.load(), num_messages_sent_.load() + 1); + return 0; + } - void wait_for_receiving_requests(int timeout_ms = 1000) { - TestSuite::_msg("wait for receiving requests (up to %d ms)\n", timeout_ms); - req_ea_.wait_ms(1000); - } + void wait_for_receiving_requests(int timeout_ms = 1000) { + TestSuite::_msg("wait for receiving requests (up to %d ms)\n", timeout_ms); + req_ea_.wait_ms(timeout_ms); + } - // server - std::atomic resp_log_index_; - std::atomic msg_mismatch_; - std::atomic reqs_out_of_order_; - EventAwaiter req_ea_; - - // client - std::atomic next_log_index_; - size_t sent_log_index_ = 0; - size_t num_messages_sent_ = 0; - size_t error_req_count_ = 0; - EventAwaiter ea_; - - // log - ptr server_logger_; - ptr client_logger_; - }; - - class stream_msg_handler : public nuraft::msg_handler { - public: - stream_msg_handler(context* ctx, - const init_options& opt, - ptr stream_stat) - : msg_handler(ctx, opt) - , stream_stat_(stream_stat) - {} - - ptr process_req(req_msg& req, const req_ext_params& ext_params) { - ptr resp = cs_new(state_->get_term(), - msg_type::append_entries_response, - id_, - req.get_src()); - if (req.get_last_log_idx() == stream_stat_->resp_log_index_) { - stream_stat_->resp_log_index_++; - resp->accept(stream_stat_->resp_log_index_.load()); - ptr buf = req.log_entries().at(0)->get_buf_ptr(); - buf->pos(0); - std::string buf_str = buf->get_str(); - if (buf_str != TEST_MSG) { - SimpleLogger* ll = stream_stat_->server_logger_->getLogger(); - _log_info(ll, "resp str: %s", buf_str.c_str()); - stream_stat_->msg_mismatch_.store(true); - } - } else { - SimpleLogger* ll = stream_stat_->server_logger_->getLogger(); - _log_info(ll, "req log index not match, req: %ld, current: %ld", - req.get_last_log_idx(), stream_stat_->resp_log_index_.load()); - stream_stat_->reqs_out_of_order_.store(true); - } + void open_logs() { + std::string server_log_file_name = "./srv" + std::to_string(1) + ".log"; + server_logger_ = cs_new(server_log_file_name); + std::string client_log_file_name = "./srv" + std::to_string(2) + ".log"; + client_logger_ = cs_new(client_log_file_name); + } - stream_stat_->req_ea_.invoke(); - return resp; - } + void reset() { + // reset log + reset_log_files(); + open_logs(); + + // reset data + resp_log_index_.store(0); + msg_mismatch_.store(false); + reqs_out_of_order_.store(false); + next_log_index_.store(1); + + sent_log_index_ = 0; + num_messages_sent_.store(0); + error_req_count_.store(0); + req_ea_.reset(); + ea_.reset(); + } - ptr stream_stat_; - }; - - class stream_server { - public: - stream_server(int id, - int port, - ptr& stream_stat) - : my_id_(id) - , port_(port) - , stream_stat_(stream_stat) - {} + // server + std::atomic resp_log_index_; + std::atomic msg_mismatch_; + std::atomic reqs_out_of_order_; + + // when server receives requests, it will be invoked + EventAwaiter req_ea_; + + // client + std::atomic next_log_index_; + ulong sent_log_index_; + std::atomic num_messages_sent_; + std::atomic error_req_count_; + EventAwaiter ea_; + + // log + ptr server_logger_; + ptr client_logger_; +}; - void stop_server() { - if (my_listener_) { - my_listener_->stop(); - my_listener_->shutdown(); - } +static stream_statistic stream_stat; - if (asio_svc_) { - asio_svc_->stop(); - size_t count = 0; - while (asio_svc_->get_active_workers() && count < 500) { - // 10ms per tick. - timer_helper::sleep_ms(10); - count++; - } - } +class stream_msg_handler : public nuraft::msg_handler { +public: + stream_msg_handler(context* ctx, + const init_options& opt) + : msg_handler(ctx, opt) + {} - if (my_msg_handler_) { - my_msg_handler_->shutdown(); + ptr process_req(req_msg& req, const req_ext_params& ext_params) { + ptr resp = cs_new(state_->get_term(), + msg_type::append_entries_response, + id_, + req.get_src()); + SimpleLogger* ll = stream_stat.server_logger_->getLogger(); + _log_info(ll, "req log idx: %ld, current resp log idx: %ld", + req.get_last_log_idx(), stream_stat.resp_log_index_.load()); + if (req.get_last_log_idx() == stream_stat.resp_log_index_.load()) { + stream_stat.resp_log_index_++; + resp->accept(stream_stat.resp_log_index_.load()); + ptr buf = req.log_entries().at(0)->get_buf_ptr(); + buf->pos(0); + std::string buf_str = buf->get_str(); + if (buf_str != TEST_MSG) { + SimpleLogger* ll = stream_stat.server_logger_->getLogger(); + _log_info(ll, "resp str: %s", buf_str.c_str()); + stream_stat.msg_mismatch_.store(true); } + } else { + stream_stat.reqs_out_of_order_.store(true); } - void init_server() { - ptr logger = stream_stat_->server_logger_; - // opts - asio_service::options asio_opt; - asio_opt.thread_pool_size_ = 2; - asio_opt.replicate_log_timestamp_ = false; - asio_opt.streaming_mode_ = true; - std::string endpoint = "localhost:"+std::to_string(port_); - asio_svc_ = cs_new(asio_opt, stream_stat_->server_logger_); - - // server - s_mgr_ = cs_new(my_id_, endpoint); - sm_ = cs_new( stream_stat_->server_logger_->getLogger() ); - ptr scheduler = asio_svc_; - ptr rpc_cli_factory = asio_svc_; - - my_listener_ = asio_svc_->create_rpc_listener(port_, logger); - - raft_params params; - context* ctx( new context( s_mgr_, sm_, my_listener_, logger, - rpc_cli_factory, scheduler, params ) ); - const raft_server::init_options& opt = raft_server::init_options(); - my_msg_handler_ = cs_new(ctx, opt, stream_stat_); - ptr handler = my_msg_handler_; - my_listener_->listen(handler); - } - - int my_id_; - int port_; - ptr s_mgr_; - ptr sm_; - ptr asio_svc_; - ptr my_listener_; - ptr my_msg_handler_; - ptr stream_stat_; - }; - - class stream_client { - public: - stream_client(int id, - int port, - ptr& stream_stat) - : my_id_(id) - , port_(port) - , stream_stat_(stream_stat) - { - asio_service::options asio_opt; - asio_opt.thread_pool_size_ = 2; - asio_opt.replicate_log_timestamp_ = false; - asio_opt.streaming_mode_ = true; - asio_svc_ = cs_new(asio_opt, stream_stat_->client_logger_); - - std::string endpoint = "localhost:"+std::to_string(port_); - client_ = asio_svc_->create_client(endpoint); - } + stream_stat.req_ea_.invoke(); + return resp; + } +}; - void send_req(int count, int timeout_ms=0) { - ptr msg = buffer::alloc(TEST_MSG.size() + 1); - msg->put(TEST_MSG); - - TestSuite::Progress pp(count, "sending req"); - - while (count > 0) { - ptr req(cs_new( - 1, msg_type::append_entries_request, 1, my_id_, - 1, stream_stat_->sent_log_index_, 1)); - - ptr log(cs_new(0, msg, log_val_type::app_log)); - req->log_entries().push_back(log); - - rpc_handler h = (rpc_handler)std::bind( - &stream_client::handle_result, - this, - req, - std::placeholders::_1, - std::placeholders::_2); - client_->send(req, h, timeout_ms); - stream_stat_->sent_log_index_++; - pp.update(stream_stat_->sent_log_index_); - count--; - } - pp.done(); - stream_stat_->num_messages_sent_= stream_stat_->sent_log_index_; +class stream_server { +public: + stream_server(int id, + int port) + : my_id_(id) + , port_(port) + {} + + void stop_server() { + if (my_listener_) { + my_listener_->stop(); + my_listener_->shutdown(); } - void handle_result(ptr& req, - ptr& resp, - ptr& err) - { - if (err) { - stream_stat_->error_req_count_++; - stream_stat_->next_log_index_++; - SimpleLogger* ll = stream_stat_->client_logger_->getLogger(); - _log_info(ll, "handle result err: %s", err->what()); - } else { - if (resp->get_next_idx() == stream_stat_->next_log_index_.load()) { - stream_stat_->next_log_index_++; - } else { - SimpleLogger* ll = stream_stat_->client_logger_->getLogger(); - _log_info(ll, "resp log index not match, resp: %ld, current: %ld", - resp->get_next_idx(), stream_stat_->next_log_index_.load()); - } - } - - if (stream_stat_->next_log_index_ == stream_stat_->num_messages_sent_ + 1) { - stream_stat_->ea_.invoke(); + if (asio_svc_) { + asio_svc_->stop(); + size_t count = 0; + while (asio_svc_->get_active_workers() && count < 500) { + // 10ms per tick. + timer_helper::sleep_ms(10); + count++; } } - void stop() { - if (asio_svc_) { - asio_svc_->stop(); - size_t count = 0; - while (asio_svc_->get_active_workers() && count < 500) { - // 10ms per tick. - timer_helper::sleep_ms(10); - count++; - } - } + if (my_msg_handler_) { + my_msg_handler_->shutdown(); } + } - int my_id_; - int port_; - ptr stream_stat_; - ptr asio_svc_; - ptr client_; - }; - - int stream_server_happy_path_test() { - reset_log_files(); - ptr stat_ptr = cs_new(); + void init_server() { + ptr logger = stream_stat.server_logger_; + // opts + asio_service::options asio_opt; + asio_opt.thread_pool_size_ = 2; + asio_opt.replicate_log_timestamp_ = false; + asio_opt.streaming_mode_ = true; + std::string endpoint = "localhost:"+std::to_string(port_); + asio_svc_ = cs_new(asio_opt, stream_stat.server_logger_); - stream_server s(1, 20010, stat_ptr); - s.init_server(); - // send request - int count = 1000; - stream_client client(2, 20010, stat_ptr); - client.send_req(count); + // server + s_mgr_ = cs_new(my_id_, endpoint); + sm_ = cs_new( stream_stat.server_logger_->getLogger() ); + ptr scheduler = asio_svc_; + ptr rpc_cli_factory = asio_svc_; + + my_listener_ = asio_svc_->create_rpc_listener(port_, logger); + + raft_params params; + context* ctx( new context( s_mgr_, sm_, my_listener_, logger, + rpc_cli_factory, scheduler, params ) ); + const raft_server::init_options& opt = raft_server::init_options(); + my_msg_handler_ = cs_new(ctx, opt); + ptr handler = my_msg_handler_; + my_listener_->listen(handler); + } - // check req - CHK_TRUE(stat_ptr->waiting_for_responses()); - CHK_EQ(count, stat_ptr->resp_log_index_.load()); - CHK_EQ(count, stat_ptr->next_log_index_ - 1); - CHK_FALSE(stat_ptr->msg_mismatch_.load()); + int my_id_; + int port_; + ptr s_mgr_; + ptr sm_; + ptr asio_svc_; + ptr my_listener_; + ptr my_msg_handler_; +}; - // stop - client.stop(); - s.stop_server(); - TestSuite::sleep_sec(1, "shutting down"); - SimpleLogger::shutdown(); - return 0; +class stream_client { +public: + stream_client(int id, + int port) + : my_id_(id) + , port_(port) + { + asio_service::options asio_opt; + asio_opt.thread_pool_size_ = 2; + asio_opt.replicate_log_timestamp_ = false; + asio_opt.streaming_mode_ = true; + asio_svc_ = cs_new(asio_opt, stream_stat.client_logger_); + + std::string endpoint = "localhost:"+std::to_string(port_); + client_ = asio_svc_->create_client(endpoint); } - int client_send_to_wrong_endpoint_test() { - reset_log_files(); - ptr stat_ptr = cs_new(); + void send_req(int count, int timeout_ms=0) { + ptr msg = buffer::alloc(TEST_MSG.size() + 1); + msg->put(TEST_MSG); + + TestSuite::Progress pp(count, "sending req"); + + while (count > 0) { + ptr req(cs_new( + 1, msg_type::append_entries_request, 1, my_id_, + 1, stream_stat.sent_log_index_, 1)); + + ptr log(cs_new(0, msg, log_val_type::app_log)); + req->log_entries().push_back(log); + + rpc_handler h = (rpc_handler)std::bind( + &stream_client::handle_result, + this, + req, + std::placeholders::_1, + std::placeholders::_2); + client_->send(req, h, timeout_ms); + stream_stat.sent_log_index_++; + pp.update(stream_stat.sent_log_index_); + count--; + } + pp.done(); + stream_stat.num_messages_sent_.store(stream_stat.sent_log_index_); + } - stream_server s(1, 20010, stat_ptr); - s.init_server(); - // send request - int count = 1000; - stream_client client(2, 20011, stat_ptr); - client.send_req(count); - - // check req if finish - CHK_TRUE(stat_ptr->waiting_for_responses()); - CHK_EQ(count, stat_ptr->error_req_count_); - CHK_EQ(count, stat_ptr->next_log_index_ - 1); - CHK_FALSE(stat_ptr->msg_mismatch_.load()); + void handle_result(ptr& req, + ptr& resp, + ptr& err) + { + if (err) { + stream_stat.error_req_count_++; + stream_stat.next_log_index_++; + SimpleLogger* ll = stream_stat.client_logger_->getLogger(); + _log_info(ll, "handle result err: %s, error_count: %ld, next log idx: %ld", + err->what(), stream_stat.error_req_count_.load(), + stream_stat.next_log_index_.load()); + } else { + if (resp->get_next_idx() == stream_stat.next_log_index_.load()) { + stream_stat.next_log_index_++; + } else { + SimpleLogger* ll = stream_stat.client_logger_->getLogger(); + _log_info(ll, "resp log index not match, resp: %ld, current: %ld", + resp->get_next_idx(), stream_stat.next_log_index_.load()); + } + } - // stop - client.stop(); - s.stop_server(); - TestSuite::sleep_sec(1, "shutting down"); - SimpleLogger::shutdown(); - return 0; + if (stream_stat.next_log_index_ == stream_stat.num_messages_sent_ + 1) { + stream_stat.ea_.invoke(); + } } - int client_close_after_sending_test() { - reset_log_files(); - ptr stat_ptr = cs_new(); - - stream_server s(1, 20010, stat_ptr); - s.init_server(); - // send request - int count = 1000; - { - stream_client client(2, 20010, stat_ptr); - stat_ptr->wait_for_receiving_requests(); - client.send_req(count); - client.stop(); + void stop() { + if (asio_svc_) { + asio_svc_->stop(); + size_t count = 0; + while (asio_svc_->get_active_workers() && count < 500) { + // 10ms per tick. + timer_helper::sleep_ms(10); + count++; + } } - - // check req if finish - CHK_TRUE(stat_ptr->waiting_for_responses()); - CHK_TRUE(stat_ptr->error_req_count_ > 0); - CHK_EQ(count, stat_ptr->next_log_index_ - 1); - CHK_FALSE(stat_ptr->msg_mismatch_.load()); - CHK_FALSE(stat_ptr->reqs_out_of_order_.load()); - - // stop - s.stop_server(); - TestSuite::sleep_sec(1, "shutting down"); - SimpleLogger::shutdown(); - return 0; } - int server_timeout_test() { - reset_log_files(); - ptr stat_ptr = cs_new(); - - stream_server s(1, 20010, stat_ptr); - s.init_server(); - // send request - int count = 1000; - stream_client client(2, 20010, stat_ptr); - client.send_req(count, 2000); - stat_ptr->wait_for_receiving_requests(); - - // shutdown - s.stop_server(); + int my_id_; + int port_; + ptr asio_svc_; + ptr client_; +}; + +int stream_server_happy_path_test() { + stream_stat.reset(); + + stream_server s(1, 20010); + s.init_server(); + // send request + int count = 1000; + stream_client client(2, 20010); + client.send_req(count); + + // check req + CHK_Z(stream_stat.wait_for_responses()); + CHK_EQ(count, stream_stat.resp_log_index_.load()); + CHK_EQ(count, stream_stat.next_log_index_ - 1); + CHK_FALSE(stream_stat.msg_mismatch_.load()); + + // stop + client.stop(); + s.stop_server(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; +} + +int client_send_to_wrong_endpoint_test() { + stream_stat.reset(); + + stream_server s(1, 20010); + s.init_server(); + // send request + int count = 1000; + stream_client client(2, 20011); + client.send_req(count); + + // check req if finish + CHK_Z(stream_stat.wait_for_responses()); + CHK_EQ(count, stream_stat.error_req_count_); + CHK_EQ(count, stream_stat.next_log_index_ - 1); + CHK_FALSE(stream_stat.msg_mismatch_.load()); + + // stop + client.stop(); + s.stop_server(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; +} - // check req if finish - CHK_TRUE(stat_ptr->waiting_for_responses()); - CHK_TRUE(stat_ptr->error_req_count_ > 0); - CHK_EQ(count, stat_ptr->next_log_index_ - 1); - CHK_FALSE(stat_ptr->msg_mismatch_.load()); - CHK_FALSE(stat_ptr->reqs_out_of_order_.load()); +int client_close_after_sending_test() { + stream_stat.reset(); + stream_server s(1, 20010); + s.init_server(); + // send request + int count = 1000; + { + stream_client client(2, 20010); + stream_stat.wait_for_receiving_requests(); + client.send_req(count); client.stop(); - TestSuite::sleep_sec(1, "shutting down"); - SimpleLogger::shutdown(); - return 0; } - int server_close_after_sending_test() { - reset_log_files(); - ptr stat_ptr = cs_new(); + // check req if finish + CHK_Z(stream_stat.wait_for_responses()); + CHK_TRUE(stream_stat.error_req_count_.load() > 0); + CHK_EQ(count, stream_stat.next_log_index_.load() - 1); + CHK_FALSE(stream_stat.msg_mismatch_.load()); + CHK_FALSE(stream_stat.reqs_out_of_order_.load()); + + // stop + s.stop_server(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; +} - stream_server* s = new stream_server(1, 20010, stat_ptr); - s->init_server(); - // send request - int count = 1000; - stream_client client(2, 20010, stat_ptr); - client.send_req(count); - stat_ptr->wait_for_receiving_requests(); - - // shutdown - s->stop_server(); - TestSuite::sleep_sec(1, "server shutting down"); - delete s; - - // check req if finish - CHK_TRUE(stat_ptr->waiting_for_responses()); - CHK_TRUE(stat_ptr->error_req_count_ > 0); - CHK_EQ(count, stat_ptr->next_log_index_ - 1); - CHK_FALSE(stat_ptr->msg_mismatch_.load()); - CHK_FALSE(stat_ptr->reqs_out_of_order_.load()); +int server_timeout_test() { + stream_stat.reset(); + + stream_server s(1, 20010); + s.init_server(); + // send request + int count = 1000; + stream_client client(2, 20010); + client.send_req(count, 2000); + stream_stat.wait_for_receiving_requests(); + + // shutdown + s.stop_server(); + + // check req if finish + CHK_Z(stream_stat.wait_for_responses()); + CHK_TRUE(stream_stat.error_req_count_.load() > 0); + CHK_EQ(count, stream_stat.next_log_index_.load() - 1); + CHK_FALSE(stream_stat.msg_mismatch_.load()); + CHK_FALSE(stream_stat.reqs_out_of_order_.load()); + + client.stop(); + TestSuite::sleep_sec(1, "shutting down"); + SimpleLogger::shutdown(); + return 0; +} - client.stop(); - TestSuite::sleep_sec(1, "client shutting down"); - SimpleLogger::shutdown(); - return 0; - } +int server_close_after_sending_test() { + stream_stat.reset(); + + stream_server* s = new stream_server(1, 20010); + s->init_server(); + // send request + int count = 1000; + stream_client client(2, 20010); + client.send_req(count); + stream_stat.wait_for_receiving_requests(); + + // shutdown + s->stop_server(); + TestSuite::sleep_sec(1, "server shutting down"); + delete s; + + // check req if finish + CHK_Z(stream_stat.wait_for_responses()); + CHK_TRUE(stream_stat.error_req_count_.load() > 0); + CHK_EQ(count, stream_stat.next_log_index_.load() - 1); + CHK_FALSE(stream_stat.msg_mismatch_.load()); + CHK_FALSE(stream_stat.reqs_out_of_order_.load()); + + client.stop(); + TestSuite::sleep_sec(1, "client shutting down"); + SimpleLogger::shutdown(); + return 0; +} }; using namespace asio_service_stream_test; @@ -416,15 +431,15 @@ int main(int argc, char** argv) { TestSuite ts(argc, argv); ts.options.printTestMessage = true; - ts.doTest("stream server happy path test", - stream_server_happy_path_test); - ts.doTest("client send msg to wrong endpoint test", - client_send_to_wrong_endpoint_test); - ts.doTest("cient close after sending test", - client_close_after_sending_test); + // ts.doTest("stream server happy path test", + // stream_server_happy_path_test); + // ts.doTest("client send msg to wrong endpoint test", + // client_send_to_wrong_endpoint_test); + // ts.doTest("client close after sending test", + // client_close_after_sending_test); ts.doTest("server timeout test", server_timeout_test); - ts.doTest("server close after sending test", - server_close_after_sending_test); + // ts.doTest("server close after sending test", + // server_close_after_sending_test); return 0; } From 35b9f8df29204dd6b9b704651fed5f033e9b4973 Mon Sep 17 00:00:00 2001 From: lihzeng Date: Wed, 21 Aug 2024 18:47:34 -0700 Subject: [PATCH 3/3] test fix --- tests/unit/asio_service_stream_test.cxx | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/unit/asio_service_stream_test.cxx b/tests/unit/asio_service_stream_test.cxx index 4c74db0..a629cbc 100644 --- a/tests/unit/asio_service_stream_test.cxx +++ b/tests/unit/asio_service_stream_test.cxx @@ -43,7 +43,7 @@ class stream_statistic { open_logs(); } - bool wait_for_responses(int timeout_ms = 3000) { + int wait_for_responses(int timeout_ms = 3000) { TestSuite::_msg("wait for responses (up to %d ms)\n", timeout_ms); ea_.wait_ms(timeout_ms); CHK_EQ(next_log_index_.load(), num_messages_sent_.load() + 1); @@ -431,15 +431,15 @@ int main(int argc, char** argv) { TestSuite ts(argc, argv); ts.options.printTestMessage = true; - // ts.doTest("stream server happy path test", - // stream_server_happy_path_test); - // ts.doTest("client send msg to wrong endpoint test", - // client_send_to_wrong_endpoint_test); - // ts.doTest("client close after sending test", - // client_close_after_sending_test); + ts.doTest("stream server happy path test", + stream_server_happy_path_test); + ts.doTest("client send msg to wrong endpoint test", + client_send_to_wrong_endpoint_test); + ts.doTest("client close after sending test", + client_close_after_sending_test); ts.doTest("server timeout test", server_timeout_test); - // ts.doTest("server close after sending test", - // server_close_after_sending_test); + ts.doTest("server close after sending test", + server_close_after_sending_test); return 0; }