Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the light client JSON-RPC server #1685

Merged
merged 41 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
87ba63d
Don't spawn sub-tasks anymore
tomaka Feb 12, 2024
031c156
Prepare for refactor
tomaka Feb 12, 2024
bd0327a
Prepare match block for requests handling
tomaka Feb 12, 2024
b21bdc5
Some functions copy-pasting
tomaka Feb 12, 2024
be4d81d
Restore transactions watching
tomaka Feb 12, 2024
991f7ea
`chain_getBlockHash`
tomaka Feb 12, 2024
b8a0ba9
`chain_getBlock`
tomaka Feb 12, 2024
be7a8ca
Clean up modules
tomaka Feb 12, 2024
04ae220
Move chain_head requests to main task
tomaka Feb 12, 2024
2cf0afb
Misc work
tomaka Feb 12, 2024
f7a7bc7
Misc work
tomaka Feb 12, 2024
6ee5790
WIP
tomaka Feb 12, 2024
0bcd89d
WIP
tomaka Feb 12, 2024
55e89d4
WIP
tomaka Feb 14, 2024
f590c38
WIP
tomaka Feb 14, 2024
bdde360
WIP
tomaka Feb 14, 2024
141976d
It compiles
tomaka Feb 15, 2024
0ed0687
WIP
tomaka Feb 15, 2024
c605698
WIP
tomaka Feb 15, 2024
71ab0be
WIP
tomaka Feb 15, 2024
b8abd33
WIP
tomaka Mar 4, 2024
f477239
It compiles
tomaka Mar 11, 2024
cd10e65
WIP
tomaka Mar 11, 2024
fe13fbe
Merge branch 'main' into light-json-rpc-clean
tomaka Mar 11, 2024
1b49c71
Implement best block correctly
tomaka Mar 11, 2024
1b1572b
WIP
tomaka Mar 11, 2024
f283dfe
Finish the runtime call related functions
tomaka Mar 11, 2024
93837a3
Finish chain_getHeader
tomaka Mar 11, 2024
edfad79
Don't freeze on `chain_getFinalizedHead`
tomaka Mar 11, 2024
b43691b
Restore state_getKeysPaged full functionalities
tomaka Mar 11, 2024
594e0df
Merge branch 'main' into light-json-rpc-clean
tomaka Mar 11, 2024
7f93c59
Fix bad merge
tomaka Mar 11, 2024
24de471
WIP
tomaka Mar 11, 2024
e050173
Restore legacy API warning
tomaka Mar 11, 2024
d9e49ba
Notify whether best block has changed when finalizing
tomaka Mar 11, 2024
2684768
Mention rewrite in CHANGELOG
tomaka Mar 11, 2024
d3b8ad0
Docfix
tomaka Mar 11, 2024
3dcb631
Misc changes and shrink_to_fit
tomaka Mar 11, 2024
9af91d3
Put the legacy subscription notified blocks in cache
tomaka Mar 11, 2024
6a03892
Add tons of comments
tomaka Mar 11, 2024
ce48af5
DRY for chainHead_call finished path
tomaka Mar 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 13 additions & 30 deletions lib/src/chain/async_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,40 +852,21 @@ where
///
/// # Panic
///
/// Panics if `node_to_finalize` or `new_best_block` aren't valid nodes.
/// Panics if `new_best_block` is not a descendant of `node_to_finalize`.
/// Panics if `node_to_finalize` isn't a valid node.
/// Panics if the current input best block is not a descendant of `node_to_finalize`.
///
pub fn input_finalize(&mut self, node_to_finalize: NodeIndex, new_best_block: NodeIndex) {
pub fn input_finalize(&mut self, node_to_finalize: NodeIndex) {
// Make sure that `new_best_block` is a descendant of `node_to_finalize`,
// otherwise the state of the tree will be corrupted.
// This is checked with an `assert!` rather than a `debug_assert!`, as this constraint
// is part of the public API of this method.
assert!(self
.non_finalized_blocks
.is_ancestor(node_to_finalize, new_best_block));
.input_best_block_index
.map_or(false, |current_input_best| self
.non_finalized_blocks
.is_ancestor(node_to_finalize, current_input_best)));

self.input_finalized_index = Some(node_to_finalize);
self.input_best_block_index = Some(new_best_block);

// If necessary, update the weight of the block.
match &mut self
.non_finalized_blocks
.get_mut(new_best_block)
.unwrap()
.input_best_block_weight
{
w if *w == self.input_best_block_next_weight - 1 => {}
w => {
*w = self.input_best_block_next_weight;
self.input_best_block_next_weight += 1;
}
}

// Minor sanity checks.
debug_assert!(self
.non_finalized_blocks
.iter_unordered()
.all(|(_, b)| b.input_best_block_weight < self.input_best_block_next_weight));
}

/// Tries to update the output blocks to follow the input.
Expand Down Expand Up @@ -924,6 +905,7 @@ where

let mut pruned_blocks = Vec::new();
let mut pruned_finalized = None;
let mut best_output_block_updated = false;

for pruned in self.non_finalized_blocks.prune_ancestors(new_finalized) {
debug_assert_ne!(Some(pruned.index), self.input_finalized_index);
Expand All @@ -935,6 +917,7 @@ where
.map_or(false, |b| b == pruned.index)
{
self.output_best_block_index = None;
best_output_block_updated = true;
}

// Update `self.finalized_block_weight`.
Expand Down Expand Up @@ -1005,6 +988,7 @@ where
// Input best can be updated to the block being iterated.
current_runtime_service_best_block_weight = block.input_best_block_weight;
self.output_best_block_index = Some(node_index);
best_output_block_updated = true;

// Continue looping, as there might be another block with an even
// higher weight.
Expand All @@ -1024,7 +1008,7 @@ where
user_data: pruned_finalized.user_data.user_data,
former_finalized_async_op_user_data,
pruned_blocks,
best_block_index: self.output_best_block_index,
best_output_block_updated,
});
}
}
Expand Down Expand Up @@ -1202,9 +1186,8 @@ pub enum OutputUpdate<TBl, TAsync> {
/// User data associated to the `async` operation of the previous finalized block.
former_finalized_async_op_user_data: TAsync,

/// Index of the best block after the finalization. `None` if the best block is the block
/// that has just been finalized.
best_block_index: Option<NodeIndex>,
/// `true` if the finalization has updated the best output block.
best_output_block_updated: bool,

/// Blocks that were a descendant of the former finalized block but not of the new
/// finalized block. These blocks are no longer part of the data structure.
Expand Down
1 change: 1 addition & 0 deletions light-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async-channel = { version = "2.2.0", default-features = false }
async-lock = { version = "3.0.0", default-features = false }
base64 = { version = "0.22.0", default-features = false, features = ["alloc"] }
blake2-rfc = { version = "0.2.18", default-features = false }
bs58 = { version = "0.5.0", default-features = false, features = ["alloc"] }
derive_more = "0.99.17"
either = { version = "1.9.0", default-features = false }
event-listener = { version = "5.0.0", default-features = false }
Expand Down
100 changes: 41 additions & 59 deletions light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ use crate::{
};

use alloc::{
borrow::Cow,
boxed::Box,
format,
string::{String, ToString as _},
sync::Arc,
};
use core::num::NonZeroU32;
use smoldot::{chain_spec, json_rpc::service};
use core::{num::NonZeroU32, pin::Pin};
use futures_lite::StreamExt as _;

/// Configuration for [`service()`].
pub struct Config<TPlat: PlatformRef> {
Expand Down Expand Up @@ -81,6 +83,7 @@ pub struct Config<TPlat: PlatformRef> {
///
/// This parameter is necessary in order to prevent users from using up too much memory within
/// the client.
// TODO: unused at the moment
pub max_parallel_requests: NonZeroU32,
}

Expand All @@ -93,22 +96,20 @@ pub struct Config<TPlat: PlatformRef> {
pub fn service<TPlat: PlatformRef>(config: Config<TPlat>) -> (Frontend<TPlat>, ServicePrototype) {
let log_target = format!("json-rpc-{}", config.log_name);

let (requests_processing_task, requests_responses_io) =
service::client_main_task(service::Config {
max_active_subscriptions: config.max_subscriptions,
max_pending_requests: config.max_pending_requests,
});
let (requests_tx, requests_rx) = async_channel::bounded(32); // TODO: capacity?
let (responses_tx, responses_rx) = async_channel::bounded(16); // TODO: capacity?

let frontend = Frontend {
platform: config.platform,
log_target: log_target.clone(),
requests_responses_io: Arc::new(requests_responses_io),
responses_rx: Arc::new(async_lock::Mutex::new(Box::pin(responses_rx))),
requests_tx,
};

let prototype = ServicePrototype {
log_target,
requests_processing_task,
max_parallel_requests: config.max_parallel_requests,
requests_rx,
responses_tx,
};

(frontend, prototype)
Expand All @@ -125,10 +126,12 @@ pub struct Frontend<TPlat> {
/// See [`Config::platform`].
platform: TPlat,

/// Sending requests and receiving responses.
///
/// Connected to the [`background`].
requests_responses_io: Arc<service::SerializedRequestsIo>,
/// How to send requests to the background task.
requests_tx: async_channel::Sender<String>,

/// How to receive responses coming from the background task.
// TODO: we use an Arc so that it's clonable, but that's questionnable
responses_rx: Arc<async_lock::Mutex<Pin<Box<async_channel::Receiver<String>>>>>,

/// Target to use when emitting logs.
log_target: String,
Expand All @@ -145,10 +148,7 @@ impl<TPlat: PlatformRef> Frontend<TPlat> {
crate::util::truncated_str(json_rpc_request.chars().filter(|c| !c.is_control()), 250)
.to_string();

match self
.requests_responses_io
.try_send_request(json_rpc_request)
{
match self.requests_tx.try_send(json_rpc_request) {
Ok(()) => {
log!(
&self.platform,
Expand All @@ -159,16 +159,9 @@ impl<TPlat: PlatformRef> Frontend<TPlat> {
);
Ok(())
}
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::TooManyPendingRequests,
request,
}) => Err(HandleRpcError::TooManyPendingRequests {
json_rpc_request: request,
Err(err) => Err(HandleRpcError::TooManyPendingRequests {
json_rpc_request: err.into_inner(),
}),
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::ClientMainTaskDestroyed,
..
}) => unreachable!(),
}
}

Expand All @@ -177,9 +170,9 @@ impl<TPlat: PlatformRef> Frontend<TPlat> {
/// If this function is called multiple times in parallel, the order in which the calls are
/// responded to is unspecified.
pub async fn next_json_rpc_response(&self) -> String {
let message = match self.requests_responses_io.wait_next_response().await {
Ok(m) => m,
Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(),
let message = match self.responses_rx.lock().await.next().await {
Some(m) => m,
None => unreachable!(),
};

log!(
Expand All @@ -197,20 +190,16 @@ impl<TPlat: PlatformRef> Frontend<TPlat> {

/// Prototype for a JSON-RPC service. Must be initialized using [`ServicePrototype::start`].
pub struct ServicePrototype {
/// Task processing the requests.
///
/// Later sent to the [`background`].
requests_processing_task: service::ClientMainTask,

/// Target to use when emitting logs.
log_target: String,

/// Value obtained through [`Config::max_parallel_requests`].
max_parallel_requests: NonZeroU32,
requests_rx: async_channel::Receiver<String>,

responses_tx: async_channel::Sender<String>,
}

/// Configuration for a JSON-RPC service.
pub struct StartConfig<'a, TPlat: PlatformRef> {
pub struct StartConfig<TPlat: PlatformRef> {
/// Access to the platform's capabilities.
// TODO: redundant with Config above?
pub platform: TPlat,
Expand All @@ -228,8 +217,14 @@ pub struct StartConfig<'a, TPlat: PlatformRef> {
/// Service that provides a ready-to-be-called runtime for the current best block.
pub runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,

/// Specification of the chain.
pub chain_spec: &'a chain_spec::ChainSpec,
/// Name of the chain, as found in the chain specification.
pub chain_name: String,
/// Type of chain, as found in the chain specification.
pub chain_ty: String,
/// JSON-encoded properties of the chain, as found in the chain specification.
pub chain_properties_json: String,
/// Whether the chain is a live network. Found in the chain specification.
pub chain_is_live: bool,

/// Value to return when the `system_name` RPC is called. Should be set to the name of the
/// final executable.
Expand All @@ -240,33 +235,20 @@ pub struct StartConfig<'a, TPlat: PlatformRef> {
pub system_version: String,

/// Hash of the genesis block of the chain.
///
/// > **Note**: This can be derived from a [`chain_spec::ChainSpec`]. While the
/// > [`ServicePrototype::start`] function could in theory use the
/// > [`StartConfig::chain_spec`] parameter to derive this value, doing so is quite
/// > expensive. We prefer to require this value from the upper layer instead, as
/// > it is most likely needed anyway.
pub genesis_block_hash: [u8; 32],

/// Hash of the storage trie root of the genesis block of the chain.
///
/// > **Note**: This can be derived from a [`chain_spec::ChainSpec`]. While the
/// > [`ServicePrototype::start`] function could in theory use the
/// > [`StartConfig::chain_spec`] parameter to derive this value, doing so is quite
/// > expensive. We prefer to require this value from the upper layer instead, as
/// > it is most likely needed anyway.
pub genesis_block_state_root: [u8; 32],
}

impl ServicePrototype {
/// Consumes this prototype and starts the service through [`PlatformRef::spawn_task`].
pub fn start<TPlat: PlatformRef>(self, config: StartConfig<'_, TPlat>) {
background::start(
self.log_target.clone(),
config,
self.requests_processing_task,
self.max_parallel_requests,
)
pub fn start<TPlat: PlatformRef>(self, config: StartConfig<TPlat>) {
let platform = config.platform.clone();
platform.spawn_task(
Cow::Owned(self.log_target.clone()),
background::run(self.log_target, config, self.requests_rx, self.responses_tx),
);
}
}

Expand Down
Loading
Loading