Skip to content

Commit

Permalink
chore: adding check and improve buffer size
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jun 27, 2023
1 parent 430038e commit 952fc6b
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tracing = { workspace = true, features = ["attributes"] }

libp2p = { workspace = true, features = ["macros", "gossipsub", "tcp", "dns", "tokio", "request-response", "identify", "kad", "serde", "yamux"] }
void = "1"
lazy_static = "1"

topos-metrics = { path = "../topos-metrics/" }

Expand Down
1 change: 1 addition & 0 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ impl NetworkBehaviour for Behaviour {
},
_ => {}
};

Poll::Pending
}
}
20 changes: 19 additions & 1 deletion crates/topos-p2p/src/constant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
use std::env;

use lazy_static::lazy_static;

// TODO: Investigate BUFFER SIZE
pub const EVENT_STREAM_BUFFER: usize = 2048;

lazy_static! {
pub static ref EVENT_STREAM_BUFFER: usize = env::var("TCE_EVENT_STREAM_BUFFER")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(2048 * 2);
pub static ref CAPACITY_EVENT_STREAM_BUFFER: usize = EVENT_STREAM_BUFFER
.checked_mul(10)
.map(|v| {
let r: usize = v.checked_div(100).unwrap_or(*EVENT_STREAM_BUFFER);
r
})
.unwrap_or(*EVENT_STREAM_BUFFER);
}

pub const COMMAND_STREAM_BUFFER: usize = 2048;
pub const TRANSMISSION_PROTOCOL: &str = "/tce-transmission/1";
pub const DISCOVERY_PROTOCOL: &str = "/tce-disco/1";
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<'a> NetworkBuilder<'a> {
// let noise_keys = noise::Keypair::<noise::>::new().into_authentic(&peer_key)?;

let (command_sender, command_receiver) = mpsc::channel(COMMAND_STREAM_BUFFER);
let (event_sender, event_receiver) = mpsc::channel(EVENT_STREAM_BUFFER);
let (event_sender, event_receiver) = mpsc::channel(*EVENT_STREAM_BUFFER);

let gossipsub = gossip::Behaviour::new(peer_key.clone());
let behaviour = Behaviour {
Expand Down
8 changes: 6 additions & 2 deletions crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use topos_metrics::{
use tracing::{error, info};

use crate::{
behaviour::gossip::Batch, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOSSIP,
TOPOS_READY,
behaviour::gossip::Batch, constant, event::GossipEvent, Event, Runtime, TOPOS_ECHO,
TOPOS_GOSSIP, TOPOS_READY,
};

use super::EventHandler;
Expand All @@ -20,6 +20,10 @@ impl EventHandler<GossipEvent> for Runtime {
topic,
} = event
{
if self.event_sender.capacity() >= *constant::CAPACITY_EVENT_STREAM_BUFFER {
tracing::error!("P2P Event sender is almost full, dropping event");
}

info!("Received message from {:?} on topic {:?}", source, topic);
match topic {
TOPOS_GOSSIP => {
Expand Down

0 comments on commit 952fc6b

Please sign in to comment.