From 72d8673734d3c512b7c7482c29a08d8d238411db Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Sat, 24 Jun 2023 14:07:33 +0200 Subject: [PATCH] feat: cleanup app_context Signed-off-by: Simon Paitrault --- Cargo.lock | 1 + Cargo.toml | 1 + crates/topos-tce/Cargo.toml | 1 + crates/topos-tce/src/app_context.rs | 432 +------------------ crates/topos-tce/src/app_context/api.rs | 144 +++++++ crates/topos-tce/src/app_context/message.rs | 36 ++ crates/topos-tce/src/app_context/network.rs | 122 ++++++ crates/topos-tce/src/app_context/protocol.rs | 156 +++++++ crates/topos-tce/src/config.rs | 25 ++ crates/topos-tce/src/lib.rs | 31 +- crates/topos-tce/src/metrics.rs | 21 + 11 files changed, 523 insertions(+), 447 deletions(-) create mode 100644 crates/topos-tce/src/app_context/api.rs create mode 100644 crates/topos-tce/src/app_context/message.rs create mode 100644 crates/topos-tce/src/app_context/network.rs create mode 100644 crates/topos-tce/src/app_context/protocol.rs create mode 100644 crates/topos-tce/src/config.rs create mode 100644 crates/topos-tce/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 8337549ca..925e6516f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7512,6 +7512,7 @@ dependencies = [ "cucumber", "futures", "hyper", + "lazy_static", "libp2p", "opentelemetry", "prometheus", diff --git a/Cargo.toml b/Cargo.toml index b2835b41a..e24bc7b98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ topos-crypto = { path = "./crates/topos-crypto", default-features = false } # Various utility crates clap = { version = "4.0", features = ["derive", "env", "string"] } +lazy_static = "1.4" rand = { version = "0.8", default-features = false } rand_core = { version = "0.6", default-features = false } rand_distr = { version = "0.4", default-features = false } diff --git a/crates/topos-tce/Cargo.toml b/crates/topos-tce/Cargo.toml index e4baf0d5c..84f661caf 100644 --- a/crates/topos-tce/Cargo.toml +++ b/crates/topos-tce/Cargo.toml @@ -10,6 +10,7 @@ async-trait.workspace = true bincode.workspace = true clap.workspace = true futures.workspace = true +lazy_static.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index dbd7a15d9..2a1eac5ee 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -3,31 +3,27 @@ //! Application logic glue //! use futures::{Stream, StreamExt}; -use opentelemetry::trace::{FutureExt as TraceFutureExt, TraceContextExt}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use tce_transport::{ProtocolEvents, TceCommands}; -use tokio::spawn; +use tce_transport::ProtocolEvents; use tokio::sync::{mpsc, oneshot}; -use topos_core::api::grpc::checkpoints::TargetStreamPosition; -use topos_core::uci::{Certificate, CertificateId, SubnetId}; -use topos_metrics::CERTIFICATE_DELIVERED; + +use topos_core::uci::CertificateId; use topos_p2p::{Client as NetworkClient, Event as NetEvent}; +use topos_tce_api::RuntimeClient as ApiClient; use topos_tce_api::RuntimeEvent as ApiEvent; -use topos_tce_api::{RuntimeClient as ApiClient, RuntimeError}; -use topos_tce_broadcast::DoubleEchoCommand; use topos_tce_broadcast::ReliableBroadcastClient; -use topos_tce_gatekeeper::{GatekeeperClient, GatekeeperError}; -use topos_tce_storage::errors::{InternalStorageError, StorageError}; +use topos_tce_gatekeeper::GatekeeperClient; use topos_tce_storage::events::StorageEvent; use topos_tce_storage::StorageClient; use topos_tce_synchronizer::{SynchronizerClient, SynchronizerEvent}; -use topos_telemetry::PropagationContext; -use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{error, info, warn}; use crate::events::Events; +mod api; +mod message; +mod network; +mod protocol; + /// Top-level transducer main app context & driver (alike) /// /// Implements <...Host> traits for network and Api, listens for protocol events in events @@ -127,378 +123,6 @@ impl AppContext { warn!("Exiting main TCE app processing loop") } - async fn on_api_event(&mut self, event: ApiEvent) { - match event { - ApiEvent::CertificateSubmitted { - certificate, - sender, - ctx, - } => { - let span = info_span!(parent: &ctx, "TCE Runtime"); - - _ = self - .tce_cli - .broadcast_new_certificate(*certificate, true) - .with_context(span.context()) - .instrument(span) - .await; - - _ = sender.send(Ok(())); - } - - ApiEvent::PeerListPushed { peers, sender } => { - let sampler = self.tce_cli.clone(); - let gatekeeper = self.gatekeeper.clone(); - let events = self.events.clone(); - let api = self.api_client.clone(); - - spawn(async move { - match gatekeeper.push_peer_list(peers).await { - Ok(peers) => { - info!("Gatekeeper has detected changes on the peer list, new sample in creation"); - if sampler.peer_changed(peers).await.is_err() { - _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); - } else { - api.set_active_sample(true).await; - if events.send(Events::StableSample).await.is_err() { - error!("Unable to send StableSample event"); - } - _ = sender.send(Ok(())); - } - } - Err(GatekeeperError::NoUpdate) => { - _ = sender.send(Ok(())); - } - Err(_) => { - _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); - } - } - }); - } - - ApiEvent::GetSourceHead { subnet_id, sender } => { - // Get source head certificate - let mut result = self - .pending_storage - .get_source_head(subnet_id) - .await - .map_err(|e| match e { - StorageError::InternalStorage(internal) => { - if let InternalStorageError::MissingHeadForSubnet(subnet_id) = internal - { - RuntimeError::UnknownSubnet(subnet_id) - } else { - RuntimeError::UnableToGetSourceHead(subnet_id, internal.to_string()) - } - } - e => RuntimeError::UnableToGetSourceHead(subnet_id, e.to_string()), - }); - - // TODO: Initial genesis certificate eventually will be fetched from the topos subnet - // Currently, for subnet starting from scratch there are no certificates in the database - // So for MissingHeadForSubnet error we will return some default dummy certificate - if let Err(RuntimeError::UnknownSubnet(subnet_id)) = result { - warn!("Returning dummy certificate as head certificate, to be fixed..."); - result = Ok(( - 0, - topos_core::uci::Certificate { - prev_id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, - source_subnet_id: subnet_id, - state_root: Default::default(), - tx_root_hash: Default::default(), - target_subnets: vec![], - verifier: 0, - id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, - proof: Default::default(), - signature: Default::default(), - }, - )); - }; - - _ = sender.send(result); - } - - ApiEvent::GetLastPendingCertificates { - mut subnet_ids, - sender, - } => { - let mut last_pending_certificates: HashMap> = - subnet_ids - .iter() - .map(|subnet_id| (*subnet_id, None)) - .collect(); - - if let Ok(pending_certificates) = - self.pending_storage.get_pending_certificates().await - { - // Iterate through pending certificates and determine last one for every subnet - // Last certificate in the subnet should be one with the highest index - for (pending_certificate_id, cert) in pending_certificates.into_iter().rev() { - if let Some(subnet_id) = subnet_ids.take(&cert.source_subnet_id) { - *last_pending_certificates.entry(subnet_id).or_insert(None) = - Some(cert); - } - if subnet_ids.is_empty() { - break; - } - } - } - - // Add None pending certificate for any other requested subnet_id - subnet_ids.iter().for_each(|subnet_id| { - last_pending_certificates.insert(*subnet_id, None); - }); - - _ = sender.send(Ok(last_pending_certificates)); - } - } - } - - async fn on_protocol_event(&mut self, evt: ProtocolEvents) { - match evt { - ProtocolEvents::StableSample => { - info!("Stable Sample detected"); - self.api_client.set_active_sample(true).await; - if self.events.send(Events::StableSample).await.is_err() { - error!("Unable to send StableSample event"); - } - } - - ProtocolEvents::Broadcast { certificate_id } => { - info!("Broadcasting certificate {}", certificate_id); - } - - ProtocolEvents::CertificateDelivered { certificate } => { - warn!("Certificate delivered {}", certificate.id); - CERTIFICATE_DELIVERED.inc(); - let storage = self.pending_storage.clone(); - let api_client = self.api_client.clone(); - - spawn(async move { - match storage.certificate_delivered(certificate.id).await { - Ok(positions) => { - let certificate_id = certificate.id; - api_client - .dispatch_certificate( - certificate, - positions - .targets - .into_iter() - .map(|(subnet_id, certificate_target_stream_position)| { - ( - subnet_id, - TargetStreamPosition { - target_subnet_id: - certificate_target_stream_position - .target_subnet_id, - source_subnet_id: - certificate_target_stream_position - .source_subnet_id, - position: certificate_target_stream_position - .position - .0, - certificate_id: Some(certificate_id), - }, - ) - }) - .collect::>(), - ) - .await; - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateNotFound(_), - )) => {} - Err(e) => { - error!("Pending storage error while delivering certificate: {e}"); - } - }; - }); - } - - ProtocolEvents::Gossip { cert, ctx } => { - let span = info_span!( - parent: &ctx, - "SEND Outbound Gossip", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "producer", - ); - let cert_id = cert.id; - - let data = NetworkMessage::from(TceCommands::OnGossip { - cert, - ctx: PropagationContext::inject(&span.context()), - }); - - if let Err(e) = self - .network_client - .publish::(topos_p2p::TOPOS_GOSSIP, data) - .await - { - error!("Unable to send Gossip due to error: {e}"); - } - } - - ProtocolEvents::Echo { - certificate_id, - ctx, - } => { - let span = info_span!( - parent: &ctx, - "SEND Outbound Echo", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "producer", - ); - let my_peer_id = self.network_client.local_peer_id; - // Send echo message - let data = NetworkMessage::from(TceCommands::OnEcho { - certificate_id, - ctx: PropagationContext::inject(&span.context()), - }); - - if let Err(e) = self - .network_client - .publish::(topos_p2p::TOPOS_ECHO, data) - .await - { - error!("Unable to send Echo due to error: {e}"); - } - } - - ProtocolEvents::Ready { - certificate_id, - ctx, - } => { - let span = info_span!( - parent: &ctx, - "SEND Outbound Ready", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "producer", - ); - let my_peer_id = self.network_client.local_peer_id; - let data = NetworkMessage::from(TceCommands::OnReady { - certificate_id, - ctx: PropagationContext::inject(&span.context()), - }); - - if let Err(e) = self - .network_client - .publish::(topos_p2p::TOPOS_READY, data) - .await - { - error!("Unable to send Ready due to error: {e}"); - } - } - - evt => { - debug!("Unhandled event: {:?}", evt); - } - } - } - - async fn on_net_event(&mut self, evt: NetEvent) { - trace!( - "on_net_event: peer: {} event {:?}", - &self.network_client.local_peer_id, - &evt - ); - - if let NetEvent::Gossip { from, data } = evt { - let msg: NetworkMessage = data.into(); - - if let NetworkMessage::Cmd(cmd) = msg { - match cmd { - TceCommands::OnGossip { cert, ctx } => { - let span = info_span!( - "RECV Outbound Gossip", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "consumer", - sender = from.to_string() - ); - let parent = ctx.extract(); - span.add_link(parent.span().span_context().clone()); - - let channel = self.tce_cli.get_double_echo_channel(); - - spawn(async move { - info!("Send certificate to be broadcast"); - if channel - .send(DoubleEchoCommand::Broadcast { - cert, - need_gossip: false, - ctx: span, - }) - .await - .is_err() - { - error!("Unable to send broadcast_new_certificate command, Receiver was dropped"); - } - }); - } - - TceCommands::OnEcho { - certificate_id, - ctx, - } => { - let span = info_span!( - "RECV Outbound Echo", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "consumer", - sender = from.to_string() - ); - let context = ctx.extract(); - span.add_link(context.span().span_context().clone()); - - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - if let Err(e) = channel - .send(DoubleEchoCommand::Echo { - from_peer: from, - certificate_id, - ctx: span.clone(), - }) - .with_context(span.context().clone()) - .instrument(span) - .await - { - error!("Unable to send Echo, {:?}", e); - } - }); - } - TceCommands::OnReady { - certificate_id, - ctx, - } => { - let span = info_span!( - "RECV Outbound Ready", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "consumer", - sender = from.to_string() - ); - let context = ctx.extract(); - span.add_link(context.span().span_context().clone()); - - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - if let Err(e) = channel - .send(DoubleEchoCommand::Ready { - from_peer: from, - certificate_id, - ctx: span.clone(), - }) - .with_context(context) - .instrument(span) - .await - { - error!("Unable to send Ready {:?}", e); - } - }); - } - _ => {} - } - } - } - } - pub async fn shutdown(&mut self) -> Result<(), Box> { info!("Shutting down the TCE client..."); self.api_client.shutdown().await?; @@ -511,37 +135,3 @@ impl AppContext { Ok(()) } } - -/// Definition of networking payload. -/// -/// We assume that only Commands will go through the network, -/// [Response] is used to allow reporting of logic errors to the caller. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -enum NetworkMessage { - Cmd(TceCommands), - Bulk(Vec), - - NotReady(topos_p2p::NotReadyMessage), -} - -// deserializer -impl From> for NetworkMessage { - fn from(data: Vec) -> Self { - bincode::deserialize::(data.as_ref()).expect("msg deser") - } -} - -// serializer -impl From for Vec { - fn from(msg: NetworkMessage) -> Self { - bincode::serialize::(&msg).expect("msg ser") - } -} - -// transformer of protocol commands into network commands -impl From for NetworkMessage { - fn from(cmd: TceCommands) -> Self { - Self::Cmd(cmd) - } -} diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs new file mode 100644 index 000000000..b2802bd48 --- /dev/null +++ b/crates/topos-tce/src/app_context/api.rs @@ -0,0 +1,144 @@ +use std::collections::HashMap; + +use crate::{events::Events, AppContext}; +use opentelemetry::trace::FutureExt; +use tokio::spawn; +use topos_core::uci::{Certificate, SubnetId}; +use topos_tce_api::{RuntimeError, RuntimeEvent as Event}; +use topos_tce_gatekeeper::GatekeeperError; +use topos_tce_storage::errors::{InternalStorageError, StorageError}; +use tracing::{error, info, info_span, warn, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::metrics::{CERTIFICATE_RECEIVED, CERTIFICATE_RECEIVED_FROM_API}; + +impl AppContext { + pub async fn on_api_event(&mut self, event: Event) { + match event { + Event::CertificateSubmitted { + certificate, + sender, + ctx, + } => { + let span = info_span!(parent: &ctx, "TCE Runtime"); + + _ = self + .tce_cli + .broadcast_new_certificate(*certificate, true) + .with_context(span.context()) + .instrument(span) + .await; + + CERTIFICATE_RECEIVED.inc(); + CERTIFICATE_RECEIVED_FROM_API.inc(); + _ = sender.send(Ok(())); + } + + Event::PeerListPushed { peers, sender } => { + let sampler = self.tce_cli.clone(); + let gatekeeper = self.gatekeeper.clone(); + let events = self.events.clone(); + let api = self.api_client.clone(); + + spawn(async move { + match gatekeeper.push_peer_list(peers).await { + Ok(peers) => { + info!("Gatekeeper has detected changes on the peer list, new sample in creation"); + if sampler.peer_changed(peers).await.is_err() { + _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); + } else { + api.set_active_sample(true).await; + if events.send(Events::StableSample).await.is_err() { + error!("Unable to send StableSample event"); + } + _ = sender.send(Ok(())); + } + } + Err(GatekeeperError::NoUpdate) => { + _ = sender.send(Ok(())); + } + Err(_) => { + _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); + } + } + }); + } + + Event::GetSourceHead { subnet_id, sender } => { + // Get source head certificate + let mut result = self + .pending_storage + .get_source_head(subnet_id) + .await + .map_err(|e| match e { + StorageError::InternalStorage(internal) => { + if let InternalStorageError::MissingHeadForSubnet(subnet_id) = internal + { + RuntimeError::UnknownSubnet(subnet_id) + } else { + RuntimeError::UnableToGetSourceHead(subnet_id, internal.to_string()) + } + } + e => RuntimeError::UnableToGetSourceHead(subnet_id, e.to_string()), + }); + + // TODO: Initial genesis certificate eventually will be fetched from the topos subnet + // Currently, for subnet starting from scratch there are no certificates in the database + // So for MissingHeadForSubnet error we will return some default dummy certificate + if let Err(RuntimeError::UnknownSubnet(subnet_id)) = result { + warn!("Returning dummy certificate as head certificate, to be fixed..."); + result = Ok(( + 0, + topos_core::uci::Certificate { + prev_id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, + source_subnet_id: subnet_id, + state_root: Default::default(), + tx_root_hash: Default::default(), + target_subnets: vec![], + verifier: 0, + id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, + proof: Default::default(), + signature: Default::default(), + }, + )); + }; + + _ = sender.send(result); + } + + Event::GetLastPendingCertificates { + mut subnet_ids, + sender, + } => { + let mut last_pending_certificates: HashMap> = + subnet_ids + .iter() + .map(|subnet_id| (*subnet_id, None)) + .collect(); + + if let Ok(pending_certificates) = + self.pending_storage.get_pending_certificates().await + { + // Iterate through pending certificates and determine last one for every subnet + // Last certificate in the subnet should be one with the highest index + for (pending_certificate_id, cert) in pending_certificates.into_iter().rev() { + if let Some(subnet_id) = subnet_ids.take(&cert.source_subnet_id) { + *last_pending_certificates.entry(subnet_id).or_insert(None) = + Some(cert); + } + if subnet_ids.is_empty() { + break; + } + } + } + + // Add None pending certificate for any other requested subnet_id + subnet_ids.iter().for_each(|subnet_id| { + last_pending_certificates.insert(*subnet_id, None); + }); + + _ = sender.send(Ok(last_pending_certificates)); + } + } + } +} diff --git a/crates/topos-tce/src/app_context/message.rs b/crates/topos-tce/src/app_context/message.rs new file mode 100644 index 000000000..eacabce3a --- /dev/null +++ b/crates/topos-tce/src/app_context/message.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; +use tce_transport::TceCommands; + +/// Definition of networking payload. +/// +/// We assume that only Commands will go through the network, +/// [Response] is used to allow reporting of logic errors to the caller. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] +pub(crate) enum NetworkMessage { + Cmd(TceCommands), + Bulk(Vec), + + NotReady(topos_p2p::NotReadyMessage), +} + +// deserializer +impl From> for NetworkMessage { + fn from(data: Vec) -> Self { + bincode::deserialize::(data.as_ref()).expect("msg deser") + } +} + +// serializer +impl From for Vec { + fn from(msg: NetworkMessage) -> Self { + bincode::serialize::(&msg).expect("msg ser") + } +} + +// transformer of protocol commands into network commands +impl From for NetworkMessage { + fn from(cmd: TceCommands) -> Self { + Self::Cmd(cmd) + } +} diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs new file mode 100644 index 000000000..8e23ac87d --- /dev/null +++ b/crates/topos-tce/src/app_context/network.rs @@ -0,0 +1,122 @@ +use crate::{ + app_context::message::NetworkMessage, + metrics::{CERTIFICATE_RECEIVED, CERTIFICATE_RECEIVED_FROM_GOSSIP}, + AppContext, +}; + +use opentelemetry::trace::{FutureExt, TraceContextExt}; +use tce_transport::TceCommands; +use tokio::spawn; +use topos_p2p::Event; +use topos_tce_broadcast::DoubleEchoCommand; +use tracing::{error, info, info_span, trace, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +impl AppContext { + pub async fn on_net_event(&mut self, evt: Event) { + trace!( + "on_net_event: peer: {} event {:?}", + &self.network_client.local_peer_id, + &evt + ); + + if let Event::Gossip { from, data } = evt { + let msg: NetworkMessage = data.into(); + + if let NetworkMessage::Cmd(cmd) = msg { + match cmd { + TceCommands::OnGossip { cert, ctx } => { + let span = info_span!( + "RECV Outbound Gossip", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "consumer", + sender = from.to_string() + ); + let parent = ctx.extract(); + span.add_link(parent.span().span_context().clone()); + + let channel = self.tce_cli.get_double_echo_channel(); + + spawn(async move { + info!("Send certificate to be broadcast"); + if channel + .send(DoubleEchoCommand::Broadcast { + cert, + need_gossip: false, + ctx: span, + }) + .await + .is_err() + { + error!("Unable to send broadcast_new_certificate command, Receiver was dropped"); + } else { + CERTIFICATE_RECEIVED.inc(); + CERTIFICATE_RECEIVED_FROM_GOSSIP.inc(); + } + }); + } + + TceCommands::OnEcho { + certificate_id, + ctx, + } => { + let span = info_span!( + "RECV Outbound Echo", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "consumer", + sender = from.to_string() + ); + let context = ctx.extract(); + span.add_link(context.span().span_context().clone()); + + let channel = self.tce_cli.get_double_echo_channel(); + spawn(async move { + if let Err(e) = channel + .send(DoubleEchoCommand::Echo { + from_peer: from, + certificate_id, + ctx: span.clone(), + }) + .with_context(span.context().clone()) + .instrument(span) + .await + { + error!("Unable to send Echo, {:?}", e); + } + }); + } + TceCommands::OnReady { + certificate_id, + ctx, + } => { + let span = info_span!( + "RECV Outbound Ready", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "consumer", + sender = from.to_string() + ); + let context = ctx.extract(); + span.add_link(context.span().span_context().clone()); + + let channel = self.tce_cli.get_double_echo_channel(); + spawn(async move { + if let Err(e) = channel + .send(DoubleEchoCommand::Ready { + from_peer: from, + certificate_id, + ctx: span.clone(), + }) + .with_context(context) + .instrument(span) + .await + { + error!("Unable to send Ready {:?}", e); + } + }); + } + _ => {} + } + } + } + } +} diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs new file mode 100644 index 000000000..1dfe72cab --- /dev/null +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -0,0 +1,156 @@ +use std::collections::HashMap; + +use tce_transport::{ProtocolEvents as Event, TceCommands}; +use tokio::spawn; +use topos_core::{api::grpc::checkpoints::TargetStreamPosition, uci::SubnetId}; +use topos_tce_storage::errors::{InternalStorageError, StorageError}; +use topos_telemetry::PropagationContext; +use tracing::{debug, error, info, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::{events::Events, metrics::CERTIFICATE_DELIVERED, AppContext}; + +use super::message::NetworkMessage; + +impl AppContext { + pub async fn on_protocol_event(&mut self, evt: Event) { + match evt { + Event::StableSample => { + info!("Stable Sample detected"); + self.api_client.set_active_sample(true).await; + if self.events.send(Events::StableSample).await.is_err() { + error!("Unable to send StableSample event"); + } + } + + Event::Broadcast { certificate_id } => { + info!("Broadcasting certificate {}", certificate_id); + } + + Event::CertificateDelivered { certificate } => { + warn!("Certificate delivered {}", certificate.id); + CERTIFICATE_DELIVERED.inc(); + let storage = self.pending_storage.clone(); + let api_client = self.api_client.clone(); + + spawn(async move { + match storage.certificate_delivered(certificate.id).await { + Ok(positions) => { + let certificate_id = certificate.id; + api_client + .dispatch_certificate( + certificate, + positions + .targets + .into_iter() + .map(|(subnet_id, certificate_target_stream_position)| { + ( + subnet_id, + TargetStreamPosition { + target_subnet_id: + certificate_target_stream_position + .target_subnet_id, + source_subnet_id: + certificate_target_stream_position + .source_subnet_id, + position: certificate_target_stream_position + .position + .0, + certificate_id: Some(certificate_id), + }, + ) + }) + .collect::>(), + ) + .await; + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateNotFound(_), + )) => {} + Err(e) => { + error!("Pending storage error while delivering certificate: {e}"); + } + }; + }); + } + + Event::Gossip { cert, ctx } => { + let span = info_span!( + parent: &ctx, + "SEND Outbound Gossip", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "producer", + ); + let cert_id = cert.id; + + let data = NetworkMessage::from(TceCommands::OnGossip { + cert, + ctx: PropagationContext::inject(&span.context()), + }); + + if let Err(e) = self + .network_client + .publish::(topos_p2p::TOPOS_GOSSIP, data) + .await + { + error!("Unable to send Gossip due to error: {e}"); + } + } + + Event::Echo { + certificate_id, + ctx, + } => { + let span = info_span!( + parent: &ctx, + "SEND Outbound Echo", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "producer", + ); + let my_peer_id = self.network_client.local_peer_id; + // Send echo message + let data = NetworkMessage::from(TceCommands::OnEcho { + certificate_id, + ctx: PropagationContext::inject(&span.context()), + }); + + if let Err(e) = self + .network_client + .publish::(topos_p2p::TOPOS_ECHO, data) + .await + { + error!("Unable to send Echo due to error: {e}"); + } + } + + Event::Ready { + certificate_id, + ctx, + } => { + let span = info_span!( + parent: &ctx, + "SEND Outbound Ready", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "producer", + ); + let my_peer_id = self.network_client.local_peer_id; + let data = NetworkMessage::from(TceCommands::OnReady { + certificate_id, + ctx: PropagationContext::inject(&span.context()), + }); + + if let Err(e) = self + .network_client + .publish::(topos_p2p::TOPOS_READY, data) + .await + { + error!("Unable to send Ready due to error: {e}"); + } + } + + evt => { + debug!("Unhandled event: {:?}", evt); + } + } + } +} diff --git a/crates/topos-tce/src/config.rs b/crates/topos-tce/src/config.rs new file mode 100644 index 000000000..b7a7086c9 --- /dev/null +++ b/crates/topos-tce/src/config.rs @@ -0,0 +1,25 @@ +use std::{net::SocketAddr, path::PathBuf, time::Duration}; +use tce_transport::ReliableBroadcastParams; +use topos_p2p::{Multiaddr, PeerId}; + +#[derive(Debug)] +pub struct TceConfiguration { + pub local_key_seed: Option>, + pub tce_params: ReliableBroadcastParams, + pub boot_peers: Vec<(PeerId, Multiaddr)>, + pub api_addr: SocketAddr, + pub graphql_api_addr: SocketAddr, + pub metrics_api_addr: SocketAddr, + pub tce_addr: String, + pub tce_local_port: u16, + pub storage: StorageConfiguration, + pub network_bootstrap_timeout: Duration, + pub minimum_cluster_size: usize, + pub version: &'static str, +} + +#[derive(Debug)] +pub enum StorageConfiguration { + RAM, + RocksDB(Option), +} diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 2c7fd0ae2..1a2266c8b 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -1,43 +1,22 @@ mod app_context; use std::future::IntoFuture; -use std::net::SocketAddr; -use std::path::PathBuf; -use std::time::Duration; pub use app_context::AppContext; use opentelemetry::global; -use tce_transport::ReliableBroadcastParams; use tokio::{spawn, sync::mpsc, sync::oneshot}; use topos_p2p::utils::local_key_pair_from_slice; -use topos_p2p::{utils::local_key_pair, Multiaddr, PeerId}; +use topos_p2p::{utils::local_key_pair, Multiaddr}; use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig}; use topos_tce_storage::{Connection, RocksDBStorage}; use tracing::{debug, warn}; pub mod events; +mod metrics; -#[derive(Debug)] -pub struct TceConfiguration { - pub local_key_seed: Option>, - pub tce_params: ReliableBroadcastParams, - pub boot_peers: Vec<(PeerId, Multiaddr)>, - pub api_addr: SocketAddr, - pub graphql_api_addr: SocketAddr, - pub metrics_api_addr: SocketAddr, - pub tce_addr: String, - pub tce_local_port: u16, - pub storage: StorageConfiguration, - pub network_bootstrap_timeout: Duration, - pub minimum_cluster_size: usize, - pub version: &'static str, -} - -#[derive(Debug)] -pub enum StorageConfiguration { - RAM, - RocksDB(Option), -} +pub mod config; +pub use config::StorageConfiguration; +pub use config::TceConfiguration; pub async fn run( config: &TceConfiguration, diff --git a/crates/topos-tce/src/metrics.rs b/crates/topos-tce/src/metrics.rs new file mode 100644 index 000000000..267caa3d4 --- /dev/null +++ b/crates/topos-tce/src/metrics.rs @@ -0,0 +1,21 @@ +use prometheus::{self, IntCounter}; + +use lazy_static::lazy_static; +use prometheus::register_int_counter; + +lazy_static! { + pub static ref CERTIFICATE_RECEIVED: IntCounter = + register_int_counter!("certificate_received", "Number of certificate received.").unwrap(); + pub static ref CERTIFICATE_RECEIVED_FROM_GOSSIP: IntCounter = register_int_counter!( + "certificate_received_from_gossip", + "Number of certificate received from gossip." + ) + .unwrap(); + pub static ref CERTIFICATE_RECEIVED_FROM_API: IntCounter = register_int_counter!( + "certificate_received_from_api", + "Number of certificate received from api." + ) + .unwrap(); + pub static ref CERTIFICATE_DELIVERED: IntCounter = + register_int_counter!("certificate_delivered", "Number of certificate delivered.").unwrap(); +}