Skip to content

Commit

Permalink
fix: channel size, timing of parsing messages, local signature hashset
Browse files Browse the repository at this point in the history
  • Loading branch information
gruberb committed Apr 24, 2024
1 parent e602a6a commit 19da5d9
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 29 deletions.
2 changes: 1 addition & 1 deletion crates/topos-config/src/tce/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig {
}

impl SynchronizationConfig {
pub const INTERVAL_SECONDS: u64 = 10;
pub const INTERVAL_SECONDS: u64 = 60;
pub const LIMIT_PER_SUBNET: usize = 100;

const fn default_interval_seconds() -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;
const MAX_BATCH_SIZE: usize = 1024 * 100;

pub struct Behaviour {
batch_size: usize,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Behaviour {
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(2 * 1024 * 1024)
.max_transmit_size(20 * 1024 * 1024)
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(|msg_id| {
// Content based id
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ lazy_static! {
pub static ref EVENT_STREAM_BUFFER: usize = env::var("TCE_EVENT_STREAM_BUFFER")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(2048 * 2);
.unwrap_or(1024 * 20);
pub static ref CAPACITY_EVENT_STREAM_BUFFER: usize = EVENT_STREAM_BUFFER
.checked_mul(10)
.checked_mul(1_000)
.map(|v| {
let r: usize = v.checked_div(100).unwrap_or(*EVENT_STREAM_BUFFER);
r
Expand All @@ -23,7 +23,7 @@ lazy_static! {
pub static ref COMMAND_STREAM_BUFFER_SIZE: usize = env::var("TCE_COMMAND_STREAM_BUFFER_SIZE")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
}

pub const DISCOVERY_PROTOCOL: &str = "/tce-disco/1";
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy {
address: {}, ",
&config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address
);
let (command_sender, mut command_rcv) = mpsc::channel::<SubnetRuntimeProxyCommand>(256);
let (command_sender, mut command_rcv) =
mpsc::channel::<SubnetRuntimeProxyCommand>(1024 * 20);
let ws_runtime_endpoint = config.ws_endpoint.clone();
let http_runtime_endpoint = config.http_endpoint.clone();
let subnet_contract_address = Arc::new(config.subnet_contract_address.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) mod console;
#[cfg(test)]
mod tests;

const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100;
const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20;

pub(crate) mod builder;
pub(crate) mod messaging;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService {
.map(move |message| Self::parse_stream(message, stream_id))
.boxed();

let (command_sender, command_receiver) = mpsc::channel(2048);
let (command_sender, command_receiver) = mpsc::channel(1024 * 20);

let (outbound_stream, rx) = mpsc::channel::<Result<(Option<Uuid>, OutboundMessage), Status>>(
DEFAULT_CHANNEL_STREAM_CAPACITY,
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod tests;

pub(crate) mod constants {
/// Constant size of every channel in the crate
pub(crate) const CHANNEL_SIZE: usize = 2048;
pub(crate) const CHANNEL_SIZE: usize = 1024 * 20;

/// Constant size of every transient stream channel in the crate
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024;
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20;
}
pub use runtime::{
error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent,
Expand Down
21 changes: 20 additions & 1 deletion crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ pub struct DoubleEcho {
/// List of approved validators through smart contract and/or genesis
pub validators: HashSet<ValidatorId>,
pub validator_store: Arc<ValidatorStore>,
pub known_signatures: HashSet<Signature>,
pub broadcast_sender: broadcast::Sender<CertificateDeliveredWithPositions>,

pub task_manager_cancellation: CancellationToken,
}

impl DoubleEcho {
pub const MAX_BUFFER_SIZE: usize = 2048;
pub const MAX_BUFFER_SIZE: usize = 1024 * 20;

#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -85,6 +86,7 @@ impl DoubleEcho {
},
shutdown,
validator_store,
known_signatures: HashSet::new(),
broadcast_sender,
task_manager_cancellation: CancellationToken::new(),
}
Expand Down Expand Up @@ -150,6 +152,12 @@ impl DoubleEcho {
continue;
}

if self.known_signatures.contains(&signature) {
debug!("ECHO message signature already known: {}", signature);
self.handle_echo(certificate_id, validator_id, signature).await;
continue;
}

let mut payload = Vec::new();
payload.extend_from_slice(certificate_id.as_array());
payload.extend_from_slice(validator_id.as_bytes());
Expand All @@ -159,6 +167,8 @@ impl DoubleEcho {
continue;
}

self.known_signatures.insert(signature);

self.handle_echo(certificate_id, validator_id, signature).await
},
DoubleEchoCommand::Ready { certificate_id, validator_id, signature } => {
Expand All @@ -168,6 +178,13 @@ impl DoubleEcho {
continue;
}

if self.known_signatures.contains(&signature) {
debug!("READY message signature already known: {}", signature);
self.handle_ready(certificate_id, validator_id, signature).await;
continue;
}


let mut payload = Vec::new();
payload.extend_from_slice(certificate_id.as_array());
payload.extend_from_slice(validator_id.as_bytes());
Expand All @@ -177,6 +194,8 @@ impl DoubleEcho {
continue;
}

self.known_signatures.insert(signature);

self.handle_ready(certificate_id, validator_id, signature).await
},
}
Expand Down
38 changes: 21 additions & 17 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,31 +107,35 @@ impl TaskManager {
}

pub async fn run(mut self, shutdown_receiver: CancellationToken) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut pending_certificate_interval = tokio::time::interval(Duration::from_micros(500));
let mut message_interval = tokio::time::interval(Duration::from_millis(100));

loop {
tokio::select! {
biased;

_ = interval.tick() => {
_ = pending_certificate_interval.tick() => {
self.next_pending_certificate();
}
Some(msg) = self.message_receiver.recv() => {
match msg {
DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => {
if let Some(task_context) = self.tasks.get(&certificate_id) {
_ = task_context.sink.send(msg).await;
} else {
self.buffered_messages
.entry(certificate_id)
.or_default()
.push(msg);
};
}
DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => {
trace!("Received broadcast message for certificate {} ", cert.id);

self.create_task(cert, need_gossip, pending_id)
_ = message_interval.tick() => {
if let Some(msg) = self.message_receiver.recv().await {
match msg {
DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => {
if let Some(task_context) = self.tasks.get(&certificate_id) {
_ = task_context.sink.send(msg).await;
} else {
self.buffered_messages
.entry(certificate_id)
.or_default()
.push(msg);
};
}
DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => {
trace!("Received broadcast message for certificate {} ", cert.id);

self.create_task(cert, need_gossip, pending_id)
}
}
}
}
Expand Down

0 comments on commit 19da5d9

Please sign in to comment.