Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faucet has a chain listener #2410

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,13 @@ Run a GraphQL service that exposes a faucet where users can claim tokens. This g
Default value: `8080`
* `--amount <AMOUNT>` — The number of tokens to send to each new chain
* `--limit-rate-until <LIMIT_RATE_UNTIL>` — The end timestamp: The faucet will rate-limit the token supply so it runs out of money no earlier than this
* `--listener-skip-process-inbox` — Do not create blocks automatically to receive incoming messages. Instead, wait for an explicit mutation `processInbox`
* `--listener-delay-before-ms <DELAY_BEFORE_MS>` — Wait before processing any notification (useful for testing)

Default value: `0`
* `--listener-delay-after-ms <DELAY_AFTER_MS>` — Wait after processing any notification (useful for rate limiting)

Default value: `0`



Expand Down
45 changes: 35 additions & 10 deletions linera-client/src/chain_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use linera_base::identifiers::ChainId;
use linera_core::client::ChainClient;
use linera_storage::Storage;

use crate::error::{self, Error};
use crate::{
chain_listener::ClientContext,
error::{self, Error},
};

pub type ClientMapInner<P, S> = BTreeMap<ChainId, ChainClient<P, S>>;
pub struct ChainClients<P, S>(pub Arc<Mutex<ClientMapInner<P, S>>>)
Expand All @@ -24,18 +27,10 @@ where
}
}

impl<P, S> Default for ChainClients<P, S>
where
S: Storage,
{
fn default() -> Self {
Self(Arc::new(Mutex::new(BTreeMap::new())))
}
}

impl<P, S> ChainClients<P, S>
where
S: Storage,
P: 'static,
{
async fn client(&self, chain_id: &ChainId) -> Option<ChainClient<P, S>> {
Some(self.0.lock().await.get(chain_id)?.clone())
Expand All @@ -54,4 +49,34 @@ where
pub async fn map_lock(&self) -> MutexGuard<ClientMapInner<P, S>> {
self.0.lock().await
}

pub async fn add_client(&self, client: ChainClient<P, S>) {
self.0.lock().await.insert(client.chain_id(), client);
}

pub async fn request_client(
&self,
chain_id: ChainId,
context: Arc<Mutex<impl ClientContext<ValidatorNodeProvider = P, Storage = S>>>,
) -> ChainClient<P, S> {
let mut guard = self.0.lock().await;
match guard.get(&chain_id) {
Some(client) => client.clone(),
None => {
let context = context.lock().await;
let client = context.make_chain_client(chain_id);
guard.insert(chain_id, client.clone());
client
}
}
}

pub async fn from_clients(chains: impl IntoIterator<Item = ChainClient<P, S>>) -> Self {
let chain_clients = Self(Default::default());
for chain_client in chains {
let mut map_guard = chain_clients.map_lock().await;
map_guard.insert(chain_client.chain_id(), chain_client);
}
chain_clients
}
}
48 changes: 32 additions & 16 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::btree_map, sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use futures::{
Expand Down Expand Up @@ -81,6 +81,14 @@ pub trait ClientContext {
&mut self,
client: &ChainClient<Self::ValidatorNodeProvider, Self::Storage>,
) -> Result<(), Error>;

fn clients(&self) -> Vec<ChainClient<Self::ValidatorNodeProvider, Self::Storage>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make_clients, to make it clearer that this isn't a (cheap) getter.

let mut clients = vec![];
for chain_id in &self.wallet().chain_ids() {
clients.push(self.make_chain_client(*chain_id));
}
clients
}
}

/// A `ChainListener` is a process that listens to notifications from validators and reacts
Expand All @@ -91,6 +99,7 @@ where
{
config: ChainListenerConfig,
clients: ChainClients<P, S>,
listening: Arc<Mutex<HashSet<ChainId>>>,
}

impl<P, S> ChainListener<P, S>
Expand All @@ -101,7 +110,11 @@ where
{
/// Creates a new chain listener given client chains.
pub fn new(config: ChainListenerConfig, clients: ChainClients<P, S>) -> Self {
Self { config, clients }
Self {
config,
clients,
listening: Default::default(),
}
}

/// Runs the chain listener.
Expand All @@ -117,6 +130,7 @@ where
context.clone(),
storage.clone(),
self.config.clone(),
self.listening.clone(),
);
}
}
Expand All @@ -128,13 +142,15 @@ where
context: Arc<Mutex<C>>,
storage: S,
config: ChainListenerConfig,
listening: Arc<Mutex<HashSet<ChainId>>>,
) where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let _handle = linera_base::task::spawn(
async move {
if let Err(err) =
Self::run_client_stream(chain_id, clients, context, storage, config).await
Self::run_client_stream(chain_id, clients, context, storage, config, listening)
.await
{
error!("Stream for chain {} failed: {}", chain_id, err);
}
Expand All @@ -150,24 +166,23 @@ where
context: Arc<Mutex<C>>,
storage: S,
config: ChainListenerConfig,
listening: Arc<Mutex<HashSet<ChainId>>>,
) -> Result<(), Error>
where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
let client = {
let mut map_guard = clients.map_lock().await;
let context_guard = context.lock().await;
let btree_map::Entry::Vacant(entry) = map_guard.entry(chain_id) else {
// For every entry in the client map we are already listening to notifications, so
// there's nothing to do. This can happen if we download a child before the parent
// chain, and then process the OpenChain message in the parent.
return Ok(());
};
let client = context_guard.make_chain_client(chain_id);
entry.insert(client.clone());
client
};
let mut guard = listening.lock().await;
if guard.contains(&chain_id) {
// If we are already listening to notifications, there's nothing to do.
// This can happen if we download a child before the parent
// chain, and then process the OpenChain message in the parent.
return Ok(());
}
// If the client is not present, we can request it.
let client = clients.request_client(chain_id, context.clone()).await;
let (listener, _listen_handle, mut local_stream) = client.listen().await?;
guard.insert(chain_id);
drop(guard);
client.synchronize_from_validators().await?;
drop(linera_base::task::spawn(listener.in_current_span()));
let mut timeout = storage.clock().current_time();
Expand Down Expand Up @@ -268,6 +283,7 @@ where
context.clone(),
storage.clone(),
config.clone(),
listening.clone(),
);
}
}
Expand Down
4 changes: 4 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,10 @@ pub enum ClientCommand {
/// no earlier than this.
#[arg(long)]
limit_rate_until: Option<DateTime<Utc>>,

/// Configuration for the faucet chain listener.
#[command(flatten)]
config: ChainListenerConfig,
},

/// Publish bytecode.
Expand Down
5 changes: 3 additions & 2 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use linera_views::memory::MemoryStore;
use rand::SeedableRng as _;

use crate::{
chain_listener::{self, ChainListener, ChainListenerConfig, ClientContext as _},
chain_listener::{self, ChainClients, ChainListener, ChainListenerConfig, ClientContext as _},
config::{CommitteeConfig, GenesisConfig, ValidatorConfig},
wallet::{UserChain, Wallet},
Error,
Expand Down Expand Up @@ -169,8 +169,9 @@ async fn test_chain_listener() -> anyhow::Result<()> {
context
.update_wallet_for_new_chain(chain_id0, Some(key_pair), clock.current_time())
.await?;
let chain_clients = ChainClients::from_clients(context.clients()).await;
let context = Arc::new(Mutex::new(context));
let listener = ChainListener::new(config, Default::default());
let listener = ChainListener::new(config, chain_clients);
listener.run(context, storage).await;

// Transfer ownership of chain 0 to the chain listener and some other key. The listener will
Expand Down
59 changes: 45 additions & 14 deletions linera-service/src/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ use linera_base::{
identifiers::{ChainId, MessageId},
ownership::ChainOwnership,
};
use linera_client::{chain_listener::ClientContext, config::GenesisConfig};
use linera_core::{client::ChainClient, data_types::ClientOutcome, node::ValidatorNodeProvider};
use linera_client::{
chain_clients::ChainClients,
chain_listener::{ChainListener, ChainListenerConfig, ClientContext},
config::GenesisConfig,
};
use linera_core::{
data_types::ClientOutcome,
node::{ValidatorNode, ValidatorNodeProvider},
};
use linera_execution::committee::ValidatorName;
use linera_storage::{Clock as _, Storage};
use serde::Deserialize;
Expand All @@ -33,15 +40,17 @@ where
S: Storage,
{
genesis_config: Arc<GenesisConfig>,
client: Arc<Mutex<ChainClient<P, S>>>,
clients: ChainClients<P, S>,
chain_id: ChainId,
}

/// The root GraphQL mutation type.
pub struct MutationRoot<P, S, C>
where
S: Storage,
{
client: Arc<Mutex<ChainClient<P, S>>>,
clients: ChainClients<P, S>,
chain_id: ChainId,
context: Arc<Mutex<C>>,
amount: Amount,
end_timestamp: Timestamp,
Expand Down Expand Up @@ -84,7 +93,7 @@ where

/// Returns the current committee's validators.
async fn current_validators(&self) -> Result<Vec<Validator>, Error> {
let client = self.client.lock().await;
let client = self.clients.try_client_lock(&self.chain_id).await?;
let committee = client.local_committee().await?;
Ok(committee
.validators()
Expand Down Expand Up @@ -119,7 +128,7 @@ where
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
async fn do_claim(&self, public_key: PublicKey) -> Result<ClaimOutcome, Error> {
let client = self.client.lock().await;
let client = self.clients.try_client_lock(&self.chain_id).await?;

if self.start_timestamp < self.end_timestamp {
let local_time = client.storage_client().clock().current_time();
Expand Down Expand Up @@ -149,7 +158,7 @@ where
let result = client
.open_chain(ownership, ApplicationPermissions::default(), self.amount)
.await;
self.context.lock().await.update_wallet(&*client).await?;
self.context.lock().await.update_wallet(&client).await?;
let (message_id, certificate) = match result? {
ClientOutcome::Committed(result) => result,
ClientOutcome::WaitForTimeout(timeout) => {
Expand Down Expand Up @@ -189,9 +198,12 @@ pub struct FaucetService<P, S, C>
where
S: Storage,
{
client: Arc<Mutex<ChainClient<P, S>>>,
clients: ChainClients<P, S>,
chain_id: ChainId,
context: Arc<Mutex<C>>,
genesis_config: Arc<GenesisConfig>,
config: ChainListenerConfig,
storage: S,
port: NonZeroU16,
amount: Amount,
end_timestamp: Timestamp,
Expand All @@ -205,9 +217,12 @@ where
{
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
clients: self.clients.clone(),
chain_id: self.chain_id,
context: self.context.clone(),
genesis_config: self.genesis_config.clone(),
config: self.config.clone(),
storage: self.storage.clone(),
port: self.port,
amount: self.amount,
end_timestamp: self.end_timestamp,
Expand All @@ -220,25 +235,35 @@ where
impl<P, S, C> FaucetService<P, S, C>
where
P: ValidatorNodeProvider + Send + Sync + Clone + 'static,
<<P as ValidatorNodeProvider>::Node as ValidatorNode>::NotificationStream: Send,
S: Storage + Clone + Send + Sync + 'static,
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
{
/// Creates a new instance of the faucet service.
#[allow(clippy::too_many_arguments)]
pub async fn new(
port: NonZeroU16,
client: ChainClient<P, S>,
chain_id: ChainId,
context: C,
amount: Amount,
end_timestamp: Timestamp,
genesis_config: Arc<GenesisConfig>,
config: ChainListenerConfig,
storage: S,
) -> anyhow::Result<Self> {
let clients = ChainClients::<P, S>::from_clients(context.clients()).await;
let context = Arc::new(Mutex::new(context));
let client = clients.try_client_lock(&chain_id).await?;
let start_timestamp = client.storage_client().clock().current_time();
client.process_inbox().await?;
let start_balance = client.local_balance().await?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
context: Arc::new(Mutex::new(context)),
clients,
chain_id,
context,
genesis_config,
config,
storage,
port,
amount,
end_timestamp,
Expand All @@ -249,7 +274,8 @@ where

pub fn schema(&self) -> Schema<QueryRoot<P, S>, MutationRoot<P, S, C>, EmptySubscription> {
let mutation_root = MutationRoot {
client: self.client.clone(),
clients: self.clients.clone(),
chain_id: self.chain_id,
context: self.context.clone(),
amount: self.amount,
end_timestamp: self.end_timestamp,
Expand All @@ -258,7 +284,8 @@ where
};
let query_root = QueryRoot {
genesis_config: self.genesis_config.clone(),
client: self.client.clone(),
clients: self.clients.clone(),
chain_id: self.chain_id,
};
Schema::build(query_root, mutation_root, EmptySubscription).finish()
}
Expand All @@ -277,6 +304,10 @@ where

info!("GraphiQL IDE: http://localhost:{}", port);

ChainListener::new(self.config.clone(), self.clients.clone())
.run(self.context.clone(), self.storage.clone())
.await;

axum::serve(
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?,
app,
Expand Down
Loading
Loading