diff --git a/CLI.md b/CLI.md index dc2b42455c4..73643486870 100644 --- a/CLI.md +++ b/CLI.md @@ -523,6 +523,13 @@ Run a GraphQL service that exposes a faucet where users can claim tokens. This g Default value: `8080` * `--amount ` — The number of tokens to send to each new chain * `--limit-rate-until ` — The end timestamp: The faucet will rate-limit the token supply so it runs out of money no earlier than this +* `--listener-skip-process-inbox` — Do not create blocks automatically to receive incoming messages. Instead, wait for an explicit mutation `processInbox` +* `--listener-delay-before-ms ` — Wait before processing any notification (useful for testing) + + Default value: `0` +* `--listener-delay-after-ms ` — Wait after processing any notification (useful for rate limiting) + + Default value: `0` diff --git a/linera-client/src/chain_clients.rs b/linera-client/src/chain_clients.rs index 5e65364552e..3605a57815a 100644 --- a/linera-client/src/chain_clients.rs +++ b/linera-client/src/chain_clients.rs @@ -8,7 +8,10 @@ use linera_base::identifiers::ChainId; use linera_core::client::ChainClient; use linera_storage::Storage; -use crate::error::{self, Error}; +use crate::{ + chain_listener::ClientContext, + error::{self, Error}, +}; pub type ClientMapInner = BTreeMap>; pub struct ChainClients(pub Arc>>) @@ -24,18 +27,10 @@ where } } -impl Default for ChainClients -where - S: Storage, -{ - fn default() -> Self { - Self(Arc::new(Mutex::new(BTreeMap::new()))) - } -} - impl ChainClients where S: Storage, + P: 'static, { async fn client(&self, chain_id: &ChainId) -> Option> { Some(self.0.lock().await.get(chain_id)?.clone()) @@ -54,4 +49,34 @@ where pub async fn map_lock(&self) -> MutexGuard> { self.0.lock().await } + + pub async fn add_client(&self, client: ChainClient) { + self.0.lock().await.insert(client.chain_id(), client); + } + + pub async fn request_client( + &self, + chain_id: ChainId, + context: Arc>>, + ) -> ChainClient { + let mut guard = self.0.lock().await; + match guard.get(&chain_id) { + Some(client) => client.clone(), + None => { + let context = context.lock().await; + let client = context.make_chain_client(chain_id); + guard.insert(chain_id, client.clone()); + client + } + } + } + + pub async fn from_clients(chains: impl IntoIterator>) -> Self { + let chain_clients = Self(Default::default()); + for chain_client in chains { + let mut map_guard = chain_clients.map_lock().await; + map_guard.insert(chain_client.chain_id(), chain_client); + } + chain_clients + } } diff --git a/linera-client/src/chain_listener.rs b/linera-client/src/chain_listener.rs index 6d0f02a0d78..ea5fc44b014 100644 --- a/linera-client/src/chain_listener.rs +++ b/linera-client/src/chain_listener.rs @@ -1,7 +1,7 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::btree_map, sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::{ @@ -81,6 +81,14 @@ pub trait ClientContext { &mut self, client: &ChainClient, ) -> Result<(), Error>; + + fn clients(&self) -> Vec> { + let mut clients = vec![]; + for chain_id in &self.wallet().chain_ids() { + clients.push(self.make_chain_client(*chain_id)); + } + clients + } } /// A `ChainListener` is a process that listens to notifications from validators and reacts @@ -91,6 +99,7 @@ where { config: ChainListenerConfig, clients: ChainClients, + listening: Arc>>, } impl ChainListener @@ -101,7 +110,11 @@ where { /// Creates a new chain listener given client chains. pub fn new(config: ChainListenerConfig, clients: ChainClients) -> Self { - Self { config, clients } + Self { + config, + clients, + listening: Default::default(), + } } /// Runs the chain listener. @@ -117,6 +130,7 @@ where context.clone(), storage.clone(), self.config.clone(), + self.listening.clone(), ); } } @@ -128,13 +142,15 @@ where context: Arc>, storage: S, config: ChainListenerConfig, + listening: Arc>>, ) where C: ClientContext + Send + 'static, { let _handle = linera_base::task::spawn( async move { if let Err(err) = - Self::run_client_stream(chain_id, clients, context, storage, config).await + Self::run_client_stream(chain_id, clients, context, storage, config, listening) + .await { error!("Stream for chain {} failed: {}", chain_id, err); } @@ -150,24 +166,23 @@ where context: Arc>, storage: S, config: ChainListenerConfig, + listening: Arc>>, ) -> Result<(), Error> where C: ClientContext + Send + 'static, { - let client = { - let mut map_guard = clients.map_lock().await; - let context_guard = context.lock().await; - let btree_map::Entry::Vacant(entry) = map_guard.entry(chain_id) else { - // For every entry in the client map we are already listening to notifications, so - // there's nothing to do. This can happen if we download a child before the parent - // chain, and then process the OpenChain message in the parent. - return Ok(()); - }; - let client = context_guard.make_chain_client(chain_id); - entry.insert(client.clone()); - client - }; + let mut guard = listening.lock().await; + if guard.contains(&chain_id) { + // If we are already listening to notifications, there's nothing to do. + // This can happen if we download a child before the parent + // chain, and then process the OpenChain message in the parent. + return Ok(()); + } + // If the client is not present, we can request it. + let client = clients.request_client(chain_id, context.clone()).await; let (listener, _listen_handle, mut local_stream) = client.listen().await?; + guard.insert(chain_id); + drop(guard); client.synchronize_from_validators().await?; drop(linera_base::task::spawn(listener.in_current_span())); let mut timeout = storage.clock().current_time(); @@ -268,6 +283,7 @@ where context.clone(), storage.clone(), config.clone(), + listening.clone(), ); } } diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index b4df0f15540..281f547a2f6 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -681,6 +681,10 @@ pub enum ClientCommand { /// no earlier than this. #[arg(long)] limit_rate_until: Option>, + + /// Configuration for the faucet chain listener. + #[command(flatten)] + config: ChainListenerConfig, }, /// Publish bytecode. diff --git a/linera-client/src/unit_tests/chain_listener.rs b/linera-client/src/unit_tests/chain_listener.rs index 2d0d6f873a5..649e1e52335 100644 --- a/linera-client/src/unit_tests/chain_listener.rs +++ b/linera-client/src/unit_tests/chain_listener.rs @@ -31,7 +31,7 @@ use linera_views::memory::MemoryStore; use rand::SeedableRng as _; use crate::{ - chain_listener::{self, ChainListener, ChainListenerConfig, ClientContext as _}, + chain_listener::{self, ChainClients, ChainListener, ChainListenerConfig, ClientContext as _}, config::{CommitteeConfig, GenesisConfig, ValidatorConfig}, wallet::{UserChain, Wallet}, Error, @@ -169,8 +169,9 @@ async fn test_chain_listener() -> anyhow::Result<()> { context .update_wallet_for_new_chain(chain_id0, Some(key_pair), clock.current_time()) .await?; + let chain_clients = ChainClients::from_clients(context.clients()).await; let context = Arc::new(Mutex::new(context)); - let listener = ChainListener::new(config, Default::default()); + let listener = ChainListener::new(config, chain_clients); listener.run(context, storage).await; // Transfer ownership of chain 0 to the chain listener and some other key. The listener will diff --git a/linera-service/src/faucet.rs b/linera-service/src/faucet.rs index f93c9eac5d1..e7d3429004d 100644 --- a/linera-service/src/faucet.rs +++ b/linera-service/src/faucet.rs @@ -13,8 +13,15 @@ use linera_base::{ identifiers::{ChainId, MessageId}, ownership::ChainOwnership, }; -use linera_client::{chain_listener::ClientContext, config::GenesisConfig}; -use linera_core::{client::ChainClient, data_types::ClientOutcome, node::ValidatorNodeProvider}; +use linera_client::{ + chain_clients::ChainClients, + chain_listener::{ChainListener, ChainListenerConfig, ClientContext}, + config::GenesisConfig, +}; +use linera_core::{ + data_types::ClientOutcome, + node::{ValidatorNode, ValidatorNodeProvider}, +}; use linera_execution::committee::ValidatorName; use linera_storage::{Clock as _, Storage}; use serde::Deserialize; @@ -33,7 +40,8 @@ where S: Storage, { genesis_config: Arc, - client: Arc>>, + clients: ChainClients, + chain_id: ChainId, } /// The root GraphQL mutation type. @@ -41,7 +49,8 @@ pub struct MutationRoot where S: Storage, { - client: Arc>>, + clients: ChainClients, + chain_id: ChainId, context: Arc>, amount: Amount, end_timestamp: Timestamp, @@ -84,7 +93,7 @@ where /// Returns the current committee's validators. async fn current_validators(&self) -> Result, Error> { - let client = self.client.lock().await; + let client = self.clients.try_client_lock(&self.chain_id).await?; let committee = client.local_committee().await?; Ok(committee .validators() @@ -119,7 +128,7 @@ where C: ClientContext + Send + 'static, { async fn do_claim(&self, public_key: PublicKey) -> Result { - let client = self.client.lock().await; + let client = self.clients.try_client_lock(&self.chain_id).await?; if self.start_timestamp < self.end_timestamp { let local_time = client.storage_client().clock().current_time(); @@ -149,7 +158,7 @@ where let result = client .open_chain(ownership, ApplicationPermissions::default(), self.amount) .await; - self.context.lock().await.update_wallet(&*client).await?; + self.context.lock().await.update_wallet(&client).await?; let (message_id, certificate) = match result? { ClientOutcome::Committed(result) => result, ClientOutcome::WaitForTimeout(timeout) => { @@ -189,9 +198,12 @@ pub struct FaucetService where S: Storage, { - client: Arc>>, + clients: ChainClients, + chain_id: ChainId, context: Arc>, genesis_config: Arc, + config: ChainListenerConfig, + storage: S, port: NonZeroU16, amount: Amount, end_timestamp: Timestamp, @@ -205,9 +217,12 @@ where { fn clone(&self) -> Self { Self { - client: self.client.clone(), + clients: self.clients.clone(), + chain_id: self.chain_id, context: self.context.clone(), genesis_config: self.genesis_config.clone(), + config: self.config.clone(), + storage: self.storage.clone(), port: self.port, amount: self.amount, end_timestamp: self.end_timestamp, @@ -220,25 +235,35 @@ where impl FaucetService where P: ValidatorNodeProvider + Send + Sync + Clone + 'static, + <

::Node as ValidatorNode>::NotificationStream: Send, S: Storage + Clone + Send + Sync + 'static, C: ClientContext + Send + 'static, { /// Creates a new instance of the faucet service. + #[allow(clippy::too_many_arguments)] pub async fn new( port: NonZeroU16, - client: ChainClient, + chain_id: ChainId, context: C, amount: Amount, end_timestamp: Timestamp, genesis_config: Arc, + config: ChainListenerConfig, + storage: S, ) -> anyhow::Result { + let clients = ChainClients::::from_clients(context.clients()).await; + let context = Arc::new(Mutex::new(context)); + let client = clients.try_client_lock(&chain_id).await?; let start_timestamp = client.storage_client().clock().current_time(); client.process_inbox().await?; let start_balance = client.local_balance().await?; Ok(Self { - client: Arc::new(Mutex::new(client)), - context: Arc::new(Mutex::new(context)), + clients, + chain_id, + context, genesis_config, + config, + storage, port, amount, end_timestamp, @@ -249,7 +274,8 @@ where pub fn schema(&self) -> Schema, MutationRoot, EmptySubscription> { let mutation_root = MutationRoot { - client: self.client.clone(), + clients: self.clients.clone(), + chain_id: self.chain_id, context: self.context.clone(), amount: self.amount, end_timestamp: self.end_timestamp, @@ -258,7 +284,8 @@ where }; let query_root = QueryRoot { genesis_config: self.genesis_config.clone(), - client: self.client.clone(), + clients: self.clients.clone(), + chain_id: self.chain_id, }; Schema::build(query_root, mutation_root, EmptySubscription).finish() } @@ -277,6 +304,10 @@ where info!("GraphiQL IDE: http://localhost:{}", port); + ChainListener::new(self.config.clone(), self.clients.clone()) + .run(self.context.clone(), self.storage.clone()) + .await; + axum::serve( tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?, app, diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 6362464c9b5..f5084995bb7 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -791,7 +791,7 @@ impl Runnable for Job { Service { config, port } => { let default_chain = context.wallet().default_chain(); - let service = NodeService::new(config, port, default_chain, storage, context); + let service = NodeService::new(config, port, default_chain, storage, context).await; service.run().await?; } @@ -800,10 +800,10 @@ impl Runnable for Job { port, amount, limit_rate_until, + config, } => { let chain_id = chain_id.unwrap_or_else(|| context.default_chain()); info!("Starting faucet service using chain {}", chain_id); - let chain_client = context.make_chain_client(chain_id); let end_timestamp = limit_rate_until .map(|et| { let micros = u64::try_from(et.timestamp_micros()) @@ -814,11 +814,13 @@ impl Runnable for Job { let genesis_config = Arc::new(context.wallet().genesis_config().clone()); let faucet = FaucetService::new( port, - chain_client, + chain_id, context, amount, end_timestamp, genesis_config, + config, + storage, ) .await?; faucet.run().await?; diff --git a/linera-service/src/node_service.rs b/linera-service/src/node_service.rs index bc938f2fb42..e9e4cfca7ff 100644 --- a/linera-service/src/node_service.rs +++ b/linera-service/src/node_service.rs @@ -958,7 +958,7 @@ where C: ClientContext + Send + 'static, { /// Creates a new instance of the node service given a client chain and a port. - pub fn new( + pub async fn new( config: ChainListenerConfig, port: NonZeroU16, default_chain: Option, @@ -966,7 +966,7 @@ where context: C, ) -> Self { Self { - clients: ChainClients::default(), + clients: ChainClients::from_clients(context.clients()).await, config, port, default_chain, diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index f3f1a879d3b..e81b0913cc9 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -154,6 +154,10 @@ impl ClientConte async fn update_wallet(&mut self, _: &ChainClient) -> Result<(), Error> { Ok(()) } + + fn clients(&self) -> Vec> { + vec![] + } } #[tokio::main] @@ -176,7 +180,8 @@ async fn main() -> std::io::Result<()> { None, storage, context, - ); + ) + .await; let schema = service.schema().sdl(); print!("{}", schema); Ok(()) diff --git a/linera-service/src/unit_tests/faucet.rs b/linera-service/src/unit_tests/faucet.rs index 9ead17fd5e1..9fb27b062a2 100644 --- a/linera-service/src/unit_tests/faucet.rs +++ b/linera-service/src/unit_tests/faucet.rs @@ -12,7 +12,7 @@ use linera_base::{ data_types::{Amount, Timestamp}, identifiers::{ChainDescription, ChainId}, }; -use linera_client::{chain_listener, wallet::Wallet}; +use linera_client::{chain_clients::ChainClients, chain_listener, wallet::Wallet}; use linera_core::{ client::ChainClient, test_utils::{FaultType, MemoryStorageBuilder, NodeProvider, StorageBuilder as _, TestBuilder}, @@ -72,10 +72,13 @@ async fn test_faucet_rate_limiting() { .add_initial_chain(ChainDescription::Root(1), Amount::from_tokens(6)) .await .unwrap(); - let client = Arc::new(Mutex::new(client)); - let context = Arc::new(Mutex::new(ClientContext::default())); + let chain_id = client.chain_id(); + let context = ClientContext::default(); + let clients = ChainClients::from_clients(vec![client]).await; + let context = Arc::new(Mutex::new(context)); let root = MutationRoot { - client, + clients, + chain_id, context: context.clone(), amount: Amount::from_tokens(1), end_timestamp: Timestamp::from(6000), diff --git a/linera-service/tests/local_net_tests.rs b/linera-service/tests/local_net_tests.rs index dbc459dc004..1d46a914f7b 100644 --- a/linera-service/tests/local_net_tests.rs +++ b/linera-service/tests/local_net_tests.rs @@ -77,6 +77,23 @@ async fn test_end_to_end_reconfiguration(config: LocalNetConfig) -> Result<()> { let network = config.network; let (mut net, client) = config.instantiate().await?; + let faucet_client = net.make_client().await; + faucet_client.wallet_init(&[], FaucetOption::None).await?; + + let faucet_chain = client + .open_and_assign(&faucet_client, Amount::from_tokens(1_000u128)) + .await?; + + let mut faucet_service = faucet_client + .run_faucet(None, faucet_chain, Amount::from_tokens(2)) + .await?; + + faucet_service.ensure_is_running()?; + + let faucet = faucet_service.instance(); + + assert_eq!(faucet.current_validators().await?.len(), 4); + let client_2 = net.make_client().await; client_2.wallet_init(&[], FaucetOption::None).await?; let chain_1 = ChainId::root(0); @@ -125,17 +142,26 @@ async fn test_end_to_end_reconfiguration(config: LocalNetConfig) -> Result<()> { client.query_validators(None).await?; client.query_validators(Some(chain_1)).await?; + if matches!(network, Network::Grpc) { + assert_eq!(faucet.current_validators().await?.len(), 5); + } + // Add 6th validator client .set_validator(net.validator_name(5).unwrap(), LocalNet::proxy_port(5), 100) .await?; + if matches!(network, Network::Grpc) { + assert_eq!(faucet.current_validators().await?.len(), 6); + } // Remove 5th validator client .remove_validator(net.validator_name(4).unwrap()) .await?; net.remove_validator(4)?; - + if matches!(network, Network::Grpc) { + assert_eq!(faucet.current_validators().await?.len(), 5); + } client.query_validators(None).await?; client.query_validators(Some(chain_1)).await?; if let Some(service) = &node_service_2 {