diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs index 222cd1030..b6ff4050d 100644 --- a/crates/topos-config/src/tce/synchronization.rs +++ b/crates/topos-config/src/tce/synchronization.rs @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig { } impl SynchronizationConfig { - pub const INTERVAL_SECONDS: u64 = 10; + pub const INTERVAL_SECONDS: u64 = 60; pub const LIMIT_PER_SUBNET: usize = 100; const fn default_interval_seconds() -> u64 { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index eaf6991ef..6bf3d341b 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 10; +const MAX_BATCH_SIZE: usize = 1024 * 100; pub struct Behaviour { batch_size: usize, @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(2 * 1024 * 1024) + .max_transmit_size(20 * 1024 * 1024) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-p2p/src/constants.rs b/crates/topos-p2p/src/constants.rs index ad5057e20..3b16983e2 100644 --- a/crates/topos-p2p/src/constants.rs +++ b/crates/topos-p2p/src/constants.rs @@ -12,9 +12,9 @@ lazy_static! { pub static ref EVENT_STREAM_BUFFER: usize = env::var("TCE_EVENT_STREAM_BUFFER") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(2048 * 2); + .unwrap_or(1024 * 20); pub static ref CAPACITY_EVENT_STREAM_BUFFER: usize = EVENT_STREAM_BUFFER - .checked_mul(10) + .checked_mul(1_000) .map(|v| { let r: usize = v.checked_div(100).unwrap_or(*EVENT_STREAM_BUFFER); r @@ -23,7 +23,7 @@ lazy_static! { pub static ref COMMAND_STREAM_BUFFER_SIZE: usize = env::var("TCE_COMMAND_STREAM_BUFFER_SIZE") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(2048); + .unwrap_or(1024 * 20); } pub const DISCOVERY_PROTOCOL: &str = "/tce-disco/1"; diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 5039698bb..e0e93f48d 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(256); + let (command_sender, mut command_rcv) = + mpsc::channel::(1024 * 20); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index ae6f132e4..2463d3ced 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20; pub(crate) mod builder; pub(crate) mod messaging; @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService { .map(move |message| Self::parse_stream(message, stream_id)) .boxed(); - let (command_sender, command_receiver) = mpsc::channel(2048); + let (command_sender, command_receiver) = mpsc::channel(1024 * 20); let (outbound_stream, rx) = mpsc::channel::, OutboundMessage), Status>>( DEFAULT_CHANNEL_STREAM_CAPACITY, diff --git a/crates/topos-tce-api/src/lib.rs b/crates/topos-tce-api/src/lib.rs index 41808c13a..7d1158d52 100644 --- a/crates/topos-tce-api/src/lib.rs +++ b/crates/topos-tce-api/src/lib.rs @@ -9,10 +9,10 @@ mod tests; pub(crate) mod constants { /// Constant size of every channel in the crate - pub(crate) const CHANNEL_SIZE: usize = 2048; + pub(crate) const CHANNEL_SIZE: usize = 1024 * 20; /// Constant size of every transient stream channel in the crate - pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024; + pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20; } pub use runtime::{ error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent, diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 3b9e0274f..b998e77bc 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -49,13 +49,14 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, + pub known_signatures: HashSet, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, } impl DoubleEcho { - pub const MAX_BUFFER_SIZE: usize = 2048; + pub const MAX_BUFFER_SIZE: usize = 1024 * 20; #[allow(clippy::too_many_arguments)] pub fn new( @@ -85,6 +86,7 @@ impl DoubleEcho { }, shutdown, validator_store, + known_signatures: HashSet::new(), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -150,6 +152,12 @@ impl DoubleEcho { continue; } + if self.known_signatures.contains(&signature) { + debug!("ECHO message signature already known: {}", signature); + self.handle_echo(certificate_id, validator_id, signature).await; + continue; + } + let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -159,6 +167,8 @@ impl DoubleEcho { continue; } + self.known_signatures.insert(signature); + self.handle_echo(certificate_id, validator_id, signature).await }, DoubleEchoCommand::Ready { certificate_id, validator_id, signature } => { @@ -168,6 +178,13 @@ impl DoubleEcho { continue; } + if self.known_signatures.contains(&signature) { + debug!("READY message signature already known: {}", signature); + self.handle_ready(certificate_id, validator_id, signature).await; + continue; + } + + let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -177,6 +194,8 @@ impl DoubleEcho { continue; } + self.known_signatures.insert(signature); + self.handle_ready(certificate_id, validator_id, signature).await }, } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index d25e3525d..83931e7ca 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,31 +107,35 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut pending_certificate_interval = tokio::time::interval(Duration::from_micros(500)); + let mut message_interval = tokio::time::interval(Duration::from_millis(100)); loop { tokio::select! { biased; - _ = interval.tick() => { + _ = pending_certificate_interval.tick() => { self.next_pending_certificate(); } - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) + _ = message_interval.tick() => { + if let Some(msg) = self.message_receiver.recv().await { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; + } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + trace!("Received broadcast message for certificate {} ", cert.id); + + self.create_task(cert, need_gossip, pending_id) + } } } }