Skip to content

Commit

Permalink
feat(p2p/tce): add gossipsub message_id to tce logs
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Mar 29, 2024
1 parent b8cd730 commit 40b7fc3
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 177 deletions.
75 changes: 13 additions & 62 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,25 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::{
collections::{HashMap, VecDeque},
env,
task::Poll,
time::Duration,
};
use std::{collections::HashMap, task::Poll};

use libp2p::gossipsub::MessageId;
use libp2p::swarm::{ConnectionClosed, FromSwarm};
use libp2p::PeerId;
use libp2p::{
gossipsub::{self, IdentTopic, Message, MessageAuthenticity},
identity::Keypair,
swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm},
};
use prost::Message as ProstMessage;
use topos_core::api::grpc::tce::v1::Batch;
use topos_metrics::P2P_GOSSIP_BATCH_SIZE;
use tracing::{debug, error, warn};
use tracing::{debug, trace, warn};

use crate::error::P2PError;
use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;

pub struct Behaviour {
batch_size: usize,
gossipsub: gossipsub::Behaviour,
pending: HashMap<&'static str, VecDeque<Vec<u8>>>,
tick: tokio::time::Interval,
/// List of connected peers per topic.
connected_peer: HashMap<&'static str, HashSet<PeerId>>,
/// The health status of the gossip behaviour
Expand All @@ -43,18 +31,16 @@ impl Behaviour {
&mut self,
topic: &'static str,
message: Vec<u8>,
) -> Result<usize, &'static str> {
) -> Result<MessageId, P2PError> {
match topic {
TOPOS_GOSSIP => {
if let Ok(msg_id) = self.gossipsub.publish(IdentTopic::new(topic), message) {
debug!("Published on topos_gossip: {:?}", msg_id);
}
TOPOS_GOSSIP | TOPOS_ECHO | TOPOS_READY => {
let msg_id = self.gossipsub.publish(IdentTopic::new(topic), message)?;
trace!("Published on topos_gossip: {:?}", msg_id);

Ok(msg_id)
}
TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message),
_ => return Err("Invalid topic"),
_ => Err(P2PError::InvalidGossipTopic(topic)),
}

Ok(0)
}

pub fn subscribe(&mut self) -> Result<(), P2PError> {
Expand All @@ -71,10 +57,6 @@ impl Behaviour {
}

pub async fn new(peer_key: Keypair) -> Self {
let batch_size = env::var("TOPOS_GOSSIP_BATCH_SIZE")
.map(|v| v.parse::<usize>())
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(2 * 1024 * 1024)
.validation_mode(gossipsub::ValidationMode::Strict)
Expand All @@ -99,21 +81,7 @@ impl Behaviour {
.unwrap();

Self {
batch_size,
gossipsub,
pending: [
(TOPOS_ECHO, VecDeque::new()),
(TOPOS_READY, VecDeque::new()),
]
.into_iter()
.collect(),
tick: tokio::time::interval(Duration::from_millis(
env::var("TOPOS_GOSSIP_INTERVAL")
.map(|v| v.parse::<u64>())
.unwrap_or(Ok(100))
.unwrap(),
)),

connected_peer: Default::default(),
health_status: Default::default(),
}
Expand Down Expand Up @@ -191,26 +159,6 @@ impl NetworkBehaviour for Behaviour {
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if self.tick.poll_tick(cx).is_ready() {
// Publish batch
for (topic, queue) in self.pending.iter_mut() {
if !queue.is_empty() {
let num_of_message = queue.len().min(self.batch_size);
let batch = Batch {
messages: queue.drain(0..num_of_message).collect(),
};

debug!("Publishing {} {}", batch.messages.len(), topic);
let msg = batch.encode_to_vec();
P2P_GOSSIP_BATCH_SIZE.observe(batch.messages.len() as f64);
match self.gossipsub.publish(IdentTopic::new(*topic), msg) {
Ok(message_id) => debug!("Published {} {}", topic, message_id),
Err(error) => error!("Failed to publish {}: {}", topic, error),
}
}
}
}

match self.gossipsub.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
Expand All @@ -231,6 +179,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_GOSSIP,
message: data,
source,
id: message_id,
},
)))
}
Expand All @@ -240,6 +189,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_ECHO,
message: data,
source,
id: message_id,
},
)))
}
Expand All @@ -249,6 +199,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_READY,
message: data,
source,
id: message_id,
},
)))
}
Expand Down
25 changes: 14 additions & 11 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::future::BoxFuture;
use futures::TryFutureExt;
use libp2p::PeerId;
use tokio::sync::{
mpsc::{self, error::SendError},
Expand Down Expand Up @@ -39,21 +39,24 @@ impl NetworkClient {
.await
}

pub fn publish<T: std::fmt::Debug + prost::Message + 'static>(
pub async fn publish<T: std::fmt::Debug + prost::Message + 'static>(
&self,
topic: &'static str,
message: T,
) -> BoxFuture<'static, Result<(), SendError<Command>>> {
) -> Result<String, P2PError> {
let network = self.sender.clone();
let (sender, receiver) = oneshot::channel();

network
.send(Command::Gossip {
topic,
data: message.encode_to_vec(),
sender,
})
.map_err(CommandExecutionError::from)
.await?;

Box::pin(async move {
network
.send(Command::Gossip {
topic,
data: message.encode_to_vec(),
})
.await
})
receiver.await?.map(|id| id.to_string())
}

async fn send_command_with_receiver<
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

use libp2p::PeerId;
use libp2p::{gossipsub::MessageId, PeerId};
use tokio::sync::oneshot;

use crate::{behaviour::grpc::connection::OutboundConnection, error::P2PError};
Expand All @@ -15,6 +15,7 @@ pub enum Command {
Gossip {
topic: &'static str,
data: Vec<u8>,
sender: oneshot::Sender<Result<MessageId, P2PError>>,
},

/// Ask for the creation of a new proxy connection for a gRPC query.
Expand Down
13 changes: 11 additions & 2 deletions crates/topos-p2p/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::io;

use libp2p::{
gossipsub::SubscriptionError, kad::NoKnownPeers, noise::Error as NoiseError,
request_response::OutboundFailure, TransportError,
gossipsub::{PublishError, SubscriptionError},
kad::NoKnownPeers,
noise::Error as NoiseError,
request_response::OutboundFailure,
TransportError,
};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -49,6 +52,12 @@ pub enum P2PError {

#[error("Gossip topics subscription failed")]
GossipTopicSubscriptionFailure,

#[error("Gossipsub publish failure: {0}")]
GossipsubPublishFailure(#[from] PublishError),

#[error("Invalid gossipsub topics: {0}")]
InvalidGossipTopic(&'static str),
}

#[derive(Error, Debug)]
Expand Down
9 changes: 7 additions & 2 deletions crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{identify, kad, PeerId};
use libp2p::{gossipsub::MessageId, identify, kad, PeerId};

use crate::behaviour::{grpc, HealthStatus};

Expand All @@ -10,6 +10,7 @@ pub enum GossipEvent {
source: Option<PeerId>,
topic: &'static str,
message: Vec<u8>,
id: MessageId,
},
}

Expand Down Expand Up @@ -50,7 +51,11 @@ impl From<void::Void> for ComposedEvent {
#[derive(Debug)]
pub enum Event {
/// An event emitted when a gossip message is received
Gossip { from: PeerId, data: Vec<u8> },
Gossip {
from: PeerId,
data: Vec<u8>,
id: String,
},
/// An event emitted when the p2p layer becomes healthy
Healthy,
/// An event emitted when the p2p layer becomes unhealthy
Expand Down
11 changes: 8 additions & 3 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{

use rand::{thread_rng, Rng};
use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL;
use tracing::{debug, error, warn};
use tracing::{error, trace, warn};

impl Runtime {
pub(crate) async fn handle_command(&mut self, command: Command) {
Expand Down Expand Up @@ -64,12 +64,17 @@ impl Runtime {
Command::Gossip {
topic,
data: message,
sender,
} => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) {
Ok(message_id) => {
debug!("Published message to {topic}");
trace!("Published message to {topic}");
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
_ = sender.send(Ok(message_id));
}
Err(err) => {
error!("Failed to publish message to {topic}: {err}");
_ = sender.send(Err(err));
}
Err(err) => error!("Failed to publish message to {topic}: {err}"),
},
}
}
Expand Down
64 changes: 20 additions & 44 deletions crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use topos_metrics::{
P2P_EVENT_STREAM_CAPACITY_TOTAL, P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL,
P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL, P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL,
P2P_MESSAGE_RECEIVED_ON_READY_TOTAL,
P2P_EVENT_STREAM_CAPACITY_TOTAL, P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL,
P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL, P2P_MESSAGE_RECEIVED_ON_READY_TOTAL,
};
use tracing::{debug, error};

use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};
use prost::Message;
use topos_core::api::grpc::tce::v1::Batch;

use super::{EventHandler, EventResult};

Expand All @@ -18,57 +15,36 @@ impl EventHandler<GossipEvent> for Runtime {
source: Some(source),
message,
topic,
id,
} = event
{
if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER {
P2P_EVENT_STREAM_CAPACITY_TOTAL.inc();
}

debug!("Received message from {:?} on topic {:?}", source, topic);
match topic {
TOPOS_GOSSIP => {
P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc();

if let Err(e) = self
.event_sender
.send(Event::Gossip {
from: source,
data: message,
})
.await
{
error!("Failed to send gossip event to runtime: {:?}", e);
}
}
TOPOS_ECHO | TOPOS_READY => {
if topic == TOPOS_ECHO {
P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc();
} else {
P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc();
}
if let Ok(Batch { messages }) = Batch::decode(&message[..]) {
for message in messages {
if let Err(e) = self
.event_sender
.send(Event::Gossip {
from: source,
data: message,
})
.await
{
error!("Failed to send gossip {} event to runtime: {:?}", topic, e);
}
}
} else {
P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL
.with_label_values(&[topic])
.inc();
}
}
match topic {
TOPOS_GOSSIP => P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc(),
TOPOS_ECHO => P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(),
TOPOS_READY => P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(),
_ => {
error!("Received message on unknown topic {:?}", topic);
return Ok(());
}
}

if let Err(e) = self
.event_sender
.send(Event::Gossip {
from: source,
data: message,
id: id.to_string(),
})
.await
{
error!("Failed to send gossip event to runtime: {:?}", e);
}
}

Ok(())
Expand Down
Loading

0 comments on commit 40b7fc3

Please sign in to comment.