Skip to content

Commit

Permalink
node: refactor NodeSettings.asio_context (#2361)
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr authored Sep 18, 2024
1 parent 380b425 commit 7d8050e
Show file tree
Hide file tree
Showing 23 changed files with 168 additions and 82 deletions.
2 changes: 1 addition & 1 deletion cmd/dev/backend_kv_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ std::shared_ptr<silkworm::sentry::api::SentryClient> make_sentry_client(
// wrap remote client in a session client
auto session_sentry_client = std::make_shared<silkworm::sentry::SessionSentryClient>(
remote_sentry_client,
silkworm::sentry::eth::StatusDataProvider::to_factory_function(std::move(eth_status_data_provider)));
silkworm::sentry::eth::StatusDataProvider::to_factory_function(eth_status_data_provider));
clients.push_back(session_sentry_client);
}

Expand Down
7 changes: 7 additions & 0 deletions cmd/dev/db_toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ void unwind(db::EnvConfig& config, BlockNum unwind_point, bool remove_blocks) {
auto chain_config{db::read_chain_config(txn)};
ensure(chain_config.has_value(), "Not an initialized Silkworm db or unknown/custom chain");

boost::asio::io_context io_context;

NodeSettings settings{
.data_directory = std::make_unique<DataDirectory>(),
.chaindata_env_config = config,
Expand All @@ -580,8 +582,13 @@ void unwind(db::EnvConfig& config, BlockNum unwind_point, bool remove_blocks) {
return std::make_unique<stagedsync::BodiesStage>(sync_context, *settings.chain_config, [] { return 0; });
};

stagedsync::TimerFactory log_timer_factory = [&](std::function<bool()> callback) {
return std::make_shared<Timer>(io_context.get_executor(), settings.sync_loop_log_interval_seconds * 1000, std::move(callback));
};

stagedsync::ExecutionPipeline stage_pipeline{
&settings,
std::move(log_timer_factory),
std::move(bodies_stage_factory),
};
const auto unwind_result{stage_pipeline.unwind(txn, unwind_point)};
Expand Down
3 changes: 2 additions & 1 deletion silkworm/capi/fork_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ SILKWORM_EXPORT int silkworm_start_fork_validator(SilkwormHandle handle, MDBX_en
silkworm::db::EnvUnmanaged unmanaged_env{mdbx_env};
silkworm::db::RWAccess rw_access{unmanaged_env};
handle->execution_engine = std::make_unique<silkworm::stagedsync::ExecutionEngine>(
handle->node_settings.asio_context,
/* executor = */ std::nullopt, // ExecutionEngine manages an internal io_context
handle->node_settings,
/* log_timer_factory = */ std::nullopt,
make_bodies_stage_factory(*handle->node_settings.chain_config),
rw_access);

Expand Down
3 changes: 0 additions & 3 deletions silkworm/node/common/node_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include <string>
#include <vector>

#include <boost/asio/io_context.hpp>

#include <silkworm/core/chain/config.hpp>
#include <silkworm/core/common/base.hpp>
#include <silkworm/db/etl/collector_settings.hpp>
Expand All @@ -35,7 +33,6 @@ namespace silkworm {

struct NodeSettings {
ApplicationInfo build_info; // Application build info (human-readable)
boost::asio::io_context asio_context; // Async context (e.g. for timers)
std::unique_ptr<DataDirectory> data_directory; // Pointer to data folder
db::EnvConfig chaindata_env_config{}; // Chaindata db config
uint64_t network_id{kMainnetConfig.chain_id}; // Network/Chain id
Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/execution/active_direct_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct ActiveDirectServiceTest : public TaskRunner {
dba{tmp_chaindata.env()} {
tmp_chaindata.add_genesis_data();
tmp_chaindata.commit_txn();
mock_execution_engine = std::make_unique<NiceMock<MockExecutionEngine>>(context(), settings, dba);
mock_execution_engine = std::make_unique<NiceMock<MockExecutionEngine>>(executor(), settings, dba);
direct_service = std::make_unique<ActiveDirectServiceForTest>(*mock_execution_engine, execution_context);
execution_context_thread = std::thread{[this]() {
direct_service->execution_loop();
Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/execution/direct_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct DirectServiceTest : public TaskRunner {
dba{tmp_chaindata.env()} {
tmp_chaindata.add_genesis_data();
tmp_chaindata.commit_txn();
mock_execution_engine = std::make_unique<MockExecutionEngine>(context(), settings, dba);
mock_execution_engine = std::make_unique<MockExecutionEngine>(executor(), settings, dba);
direct_service = std::make_unique<DirectService>(*mock_execution_engine);
}

Expand Down
3 changes: 2 additions & 1 deletion silkworm/node/execution/header_chain_plus_exec_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ TEST_CASE("Headers receiving and saving") {

// creating the ExecutionEngine
ExecutionEngineForTest exec_engine{
runner.context(),
runner.executor(),
node_settings,
/* log_timer_factory = */ std::nullopt,
std::move(bodies_stage_factory),
db_access,
};
Expand Down
51 changes: 20 additions & 31 deletions silkworm/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ class NodeImpl final {
BlockNum last_pre_validated_block() const { return chain_sync_.last_pre_validated_block(); }

private:
Task<void> start_execution_server();
Task<void> start_backend_kv_grpc_server();
Task<void> start_resource_usage_log();
Task<void> start_execution_log_timer();
Task<void> run_execution_server();
Task<void> run_backend_kv_grpc_server();
Task<void> embedded_sentry_run_if_needed();

Settings& settings_;
Expand Down Expand Up @@ -116,6 +114,17 @@ static chainsync::EngineRpcSettings make_sync_engine_rpc_settings(
};
}

static stagedsync::TimerFactory make_log_timer_factory(
const boost::asio::any_io_executor& executor,
uint32_t sync_loop_log_interval_seconds) {
return [=](std::function<bool()> callback) {
return std::make_shared<Timer>(
executor,
sync_loop_log_interval_seconds * 1'000,
std::move(callback));
};
}

static stagedsync::BodiesStageFactory make_bodies_stage_factory(
const ChainConfig& chain_config,
const NodeImpl& node) {
Expand Down Expand Up @@ -145,8 +154,9 @@ NodeImpl::NodeImpl(
chain_config_{*settings_.node_settings.chain_config},
chaindata_env_{std::move(chaindata_env)},
execution_engine_{
execution_context_,
execution_context_.get_executor(),
settings_.node_settings,
make_log_timer_factory(context_pool.any_executor(), settings_.node_settings.sync_loop_log_interval_seconds),
make_bodies_stage_factory(chain_config_, *this),
db::RWAccess{chaindata_env_},
},
Expand Down Expand Up @@ -196,14 +206,13 @@ Task<void> NodeImpl::run_tasks() {
co_await wait_for_setup();

co_await (
start_execution_server() &&
start_resource_usage_log() &&
start_execution_log_timer() &&
run_execution_server() &&
resource_usage_log_.run() &&
chain_sync_.async_run() &&
start_backend_kv_grpc_server());
run_backend_kv_grpc_server());
}

Task<void> NodeImpl::start_execution_server() {
Task<void> NodeImpl::run_execution_server() {
// Thread running block execution requires custom stack size because of deep EVM call stacks
if (settings_.execution_server_enabled) {
co_await execution_server_.async_run(/*stack_size=*/kExecutionThreadStackSize);
Expand All @@ -212,7 +221,7 @@ Task<void> NodeImpl::start_execution_server() {
}
}

Task<void> NodeImpl::start_backend_kv_grpc_server() {
Task<void> NodeImpl::run_backend_kv_grpc_server() {
auto run = [this]() {
backend_kv_rpc_server_->build_and_start();
backend_kv_rpc_server_->join();
Expand All @@ -223,26 +232,6 @@ Task<void> NodeImpl::start_backend_kv_grpc_server() {
co_await concurrency::async_thread(std::move(run), std::move(stop), "bekv-server");
}

Task<void> NodeImpl::start_resource_usage_log() {
return resource_usage_log_.run();
}

Task<void> NodeImpl::start_execution_log_timer() {
// Run Asio context in settings for execution timers // TODO(canepat) we need a better solution
auto& asio_context = settings_.node_settings.asio_context;
using asio_guard_type = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
auto asio_guard = std::make_unique<asio_guard_type>(asio_context.get_executor());

auto run = [&asio_context] {
log::set_thread_name("ctx-log-tmr");
log::Trace("Asio Timers", {"state", "started"});
asio_context.run();
log::Trace("Asio Timers", {"state", "stopped"});
};
auto stop = [&asio_guard] { asio_guard.reset(); };
co_await silkworm::concurrency::async_thread(std::move(run), std::move(stop), "ctx-log-tmr");
}

Task<void> NodeImpl::embedded_sentry_run_if_needed() {
sentry::SentryClientFactory::SentryServerPtr server = std::get<1>(sentry_);
if (server) {
Expand Down
18 changes: 14 additions & 4 deletions silkworm/node/stagedsync/execution_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,25 @@ using execution::api::ValidChain;
using execution::api::VerificationResult;

ExecutionEngine::ExecutionEngine(
asio::io_context& ctx,
std::optional<boost::asio::any_io_executor> executor,
NodeSettings& ns,
std::optional<TimerFactory> log_timer_factory,
BodiesStageFactory bodies_stage_factory,
db::RWAccess dba)
: io_context_{ctx},
: context_pool_{executor ? std::unique_ptr<concurrency::ContextPool<>>{} : std::make_unique<concurrency::ContextPool<>>(concurrency::ContextPoolSettings{1, concurrency::WaitMode::kSleeping})},
executor_{executor ? std::move(*executor) : context_pool_->any_executor()},
node_settings_{ns},
main_chain_{ctx, ns, std::move(bodies_stage_factory), std::move(dba)},
main_chain_{
executor_,
ns,
std::move(log_timer_factory),
std::move(bodies_stage_factory),
std::move(dba),
},
block_cache_{kDefaultCacheSize} {}

void ExecutionEngine::open() { // needed to circumvent mdbx threading model limitations
if (context_pool_) context_pool_->start();
main_chain_.open();
last_finalized_block_ = main_chain_.last_finalized_head();
last_fork_choice_ = main_chain_.last_chosen_head();
Expand All @@ -48,6 +57,7 @@ void ExecutionEngine::open() { // needed to circumvent mdbx threading model lim

void ExecutionEngine::close() {
main_chain_.close();
context_pool_.reset();
}

BlockNum ExecutionEngine::block_progress() const {
Expand Down Expand Up @@ -216,7 +226,7 @@ bool ExecutionEngine::notify_fork_choice_update(Hash head_block_hash,

// notify the fork of the update - we need to block here to restore the invariant
auto fork_choice_aw_future = (*f)->fork_choice(head_block_hash, finalized_block_hash, safe_block_hash);
std::future<bool> fork_choice_future = concurrency::spawn_future(io_context_, fork_choice_aw_future.get());
std::future<bool> fork_choice_future = concurrency::spawn_future(executor_, fork_choice_aw_future.get());
bool updated = fork_choice_future.get(); // BLOCKING
if (!updated) return false;

Expand Down
18 changes: 11 additions & 7 deletions silkworm/node/stagedsync/execution_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,31 @@

#include <atomic>
#include <concepts>
#include <memory>
#include <optional>
#include <set>
#include <variant>
#include <vector>

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/io_context.hpp>
#include <boost/asio/any_io_executor.hpp>

#include <silkworm/core/common/lru_cache.hpp>
#include <silkworm/core/types/block.hpp>
#include <silkworm/db/stage.hpp>
#include <silkworm/db/stage_scheduler.hpp>
#include <silkworm/execution/api/execution_engine.hpp>
#include <silkworm/infra/concurrency/context_pool.hpp>
#include <silkworm/node/stagedsync/execution_pipeline.hpp>

#include "forks/extending_fork.hpp"
#include "forks/main_chain.hpp"
#include "stages/stage_bodies_factory.hpp"
#include "timer_factory.hpp"

namespace silkworm::stagedsync {

namespace asio = boost::asio;

/**
* ExecutionEngine is the main component of the staged sync.
* It is responsible for:
Expand All @@ -56,10 +58,11 @@ namespace asio = boost::asio;
class ExecutionEngine : public execution::api::ExecutionEngine, public Stoppable {
public:
ExecutionEngine(
asio::io_context&,
NodeSettings&,
std::optional<boost::asio::any_io_executor> executor,
NodeSettings& ns,
std::optional<TimerFactory> log_timer_factory,
BodiesStageFactory bodies_stage_factory,
db::RWAccess);
db::RWAccess dba);
~ExecutionEngine() override = default;

// needed to circumvent mdbx threading model limitations
Expand Down Expand Up @@ -108,7 +111,8 @@ class ExecutionEngine : public execution::api::ExecutionEngine, public Stoppable
std::optional<ForkingPath> find_forking_point(const BlockHeader& header) const;
void discard_all_forks();

asio::io_context& io_context_;
std::unique_ptr<concurrency::ContextPool<>> context_pool_;
boost::asio::any_io_executor executor_;
NodeSettings& node_settings_;

MainChain main_chain_;
Expand Down
6 changes: 4 additions & 2 deletions silkworm/node/stagedsync/execution_engine_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin
db::RWAccess db_access{db_context.mdbx_env()};

ExecutionEngineForTest exec_engine{
runner.context(),
runner.executor(),
node_settings,
/* log_timer_factory = */ std::nullopt,
make_bodies_stage_factory(*node_settings.chain_config),
db_access,
};
Expand Down Expand Up @@ -822,8 +823,9 @@ TEST_CASE("ExecutionEngine") {
NodeSettings node_settings = node::test_util::make_node_settings_from_temp_chain_data(context);
db::RWAccess db_access{context.env()};
ExecutionEngineForTest exec_engine{
runner.context(),
runner.executor(),
node_settings,
/* log_timer_factory = */ std::nullopt,
make_bodies_stage_factory(*node_settings.chain_config),
db_access,
};
Expand Down
25 changes: 17 additions & 8 deletions silkworm/node/stagedsync/execution_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ static const std::chrono::milliseconds kStageDurationThresholdForLog{0};

ExecutionPipeline::ExecutionPipeline(
silkworm::NodeSettings* node_settings,
std::optional<TimerFactory> log_timer_factory,
BodiesStageFactory bodies_stage_factory)
: node_settings_{node_settings},
log_timer_factory_{std::move(log_timer_factory)},
bodies_stage_factory_{std::move(bodies_stage_factory)},
sync_context_{std::make_unique<SyncContext>()} {
load_stages();
Expand Down Expand Up @@ -206,7 +208,9 @@ Stage::Result ExecutionPipeline::forward(db::RWTxn& cycle_txn, BlockNum target_h
break;
}

log_timer->reset(); // Resets the interval for next log line from now
if (log_timer) {
log_timer->reset(); // Resets the interval for next log line from now
}

// forward
const auto stage_result = current_stage_->second->forward(cycle_txn);
Expand Down Expand Up @@ -273,7 +277,10 @@ Stage::Result ExecutionPipeline::unwind(db::RWTxn& cycle_txn, BlockNum unwind_po
}
++current_stage_number_;
current_stage_->second->set_log_prefix(get_log_prefix());
log_timer->reset(); // Resets the interval for next log line from now

if (log_timer) {
log_timer->reset(); // Resets the interval for next log line from now
}

// Do unwind on current stage
const auto stage_result = current_stage_->second->unwind(cycle_txn);
Expand Down Expand Up @@ -332,7 +339,10 @@ Stage::Result ExecutionPipeline::prune(db::RWTxn& cycle_txn) {
++current_stage_number_;
current_stage_->second->set_log_prefix(get_log_prefix());

log_timer->reset(); // Resets the interval for next log line from now
if (log_timer) {
log_timer->reset(); // Resets the interval for next log line from now
}

const auto stage_result{current_stage_->second->prune(cycle_txn)};
if (stage_result != Stage::Result::kSuccess) {
log::Error(get_log_prefix(), {"op", "Prune", "returned",
Expand Down Expand Up @@ -369,11 +379,10 @@ std::string ExecutionPipeline::get_log_prefix() const {
}

std::shared_ptr<Timer> ExecutionPipeline::make_log_timer() {
return std::make_shared<Timer>(
this->node_settings_->asio_context.get_executor(),
this->node_settings_->sync_loop_log_interval_seconds * 1'000,
[this]() { return log_timer_expired(); },
/*auto_start=*/true);
if (log_timer_factory_) {
return log_timer_factory_.value()([this]() { return log_timer_expired(); });
}
return {};
}

bool ExecutionPipeline::log_timer_expired() {
Expand Down
Loading

0 comments on commit 7d8050e

Please sign in to comment.