From d768b4e36b17be229fdfef50e76c428794860596 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 11 Jul 2023 17:33:41 +0200 Subject: [PATCH] feat: cleanup app_context (#252) --- .github/workflows/quality.yml | 2 +- crates/topos-tce/src/app_context.rs | 406 +------------------ crates/topos-tce/src/app_context/api.rs | 142 +++++++ crates/topos-tce/src/app_context/network.rs | 116 ++++++ crates/topos-tce/src/app_context/protocol.rs | 164 ++++++++ crates/topos-tce/src/config.rs | 30 ++ crates/topos-tce/src/lib.rs | 70 +--- crates/topos-tce/src/messages.rs | 36 ++ crates/topos/src/components/tce/mod.rs | 2 +- crates/topos/src/components/tce/parser.rs | 2 +- 10 files changed, 507 insertions(+), 463 deletions(-) create mode 100644 crates/topos-tce/src/app_context/api.rs create mode 100644 crates/topos-tce/src/app_context/network.rs create mode 100644 crates/topos-tce/src/app_context/protocol.rs create mode 100644 crates/topos-tce/src/config.rs create mode 100644 crates/topos-tce/src/messages.rs diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index e39e18f21..fec032809 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -93,7 +93,7 @@ jobs: msrv: name: Check - MSRV - runs-on: ubuntu-latest + runs-on: ubuntu-latest-16-core steps: - name: Checkout uses: actions/checkout@v3 diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index e1df61de5..a3f9f65a5 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -1,32 +1,25 @@ -#![allow(unused_variables)] //! //! Application logic glue //! use futures::{Stream, StreamExt}; -use opentelemetry::trace::{FutureExt as TraceFutureExt, TraceContextExt}; -use std::collections::HashMap; -use tce_transport::{ProtocolEvents, TceCommands}; -use tokio::spawn; +use tce_transport::ProtocolEvents; use tokio::sync::{mpsc, oneshot}; -use topos_core::api::grpc::checkpoints::TargetStreamPosition; -use topos_core::uci::{Certificate, CertificateId, SubnetId}; -use topos_metrics::CERTIFICATE_DELIVERED_TOTAL; +use topos_core::uci::CertificateId; use topos_p2p::{Client as NetworkClient, Event as NetEvent}; +use topos_tce_api::RuntimeClient as ApiClient; use topos_tce_api::RuntimeEvent as ApiEvent; -use topos_tce_api::{RuntimeClient as ApiClient, RuntimeError}; -use topos_tce_broadcast::DoubleEchoCommand; use topos_tce_broadcast::ReliableBroadcastClient; -use topos_tce_gatekeeper::{GatekeeperClient, GatekeeperError}; -use topos_tce_storage::errors::{InternalStorageError, StorageError}; +use topos_tce_gatekeeper::GatekeeperClient; use topos_tce_storage::events::StorageEvent; use topos_tce_storage::StorageClient; use topos_tce_synchronizer::{SynchronizerClient, SynchronizerEvent}; -use topos_telemetry::PropagationContext; -use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::{error, info, warn}; use crate::events::Events; -use crate::NetworkMessage; + +mod api; +mod network; +mod protocol; /// Top-level transducer main app context & driver (alike) /// @@ -127,387 +120,6 @@ impl AppContext { warn!("Exiting main TCE app processing loop") } - async fn on_api_event(&mut self, event: ApiEvent) { - match event { - ApiEvent::CertificateSubmitted { - certificate, - sender, - ctx, - } => { - let span = info_span!(parent: &ctx, "TCE Runtime"); - - _ = self - .tce_cli - .broadcast_new_certificate(*certificate, true) - .with_context(span.context()) - .instrument(span) - .await; - - _ = sender.send(Ok(())); - } - - ApiEvent::PeerListPushed { peers, sender } => { - let sampler = self.tce_cli.clone(); - let gatekeeper = self.gatekeeper.clone(); - let events = self.events.clone(); - let api = self.api_client.clone(); - - spawn(async move { - match gatekeeper.push_peer_list(peers).await { - Ok(peers) => { - info!("Gatekeeper has detected changes on the peer list, new sample in creation"); - if sampler.peer_changed(peers).await.is_err() { - _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); - } else { - api.set_active_sample(true).await; - if events.send(Events::StableSample).await.is_err() { - error!("Unable to send StableSample event"); - } - _ = sender.send(Ok(())); - } - } - Err(GatekeeperError::NoUpdate) => { - _ = sender.send(Ok(())); - } - Err(_) => { - _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); - } - } - }); - } - - ApiEvent::GetSourceHead { subnet_id, sender } => { - // Get source head certificate - let mut result = self - .pending_storage - .get_source_head(subnet_id) - .await - .map_err(|e| match e { - StorageError::InternalStorage(internal) => { - if let InternalStorageError::MissingHeadForSubnet(subnet_id) = internal - { - RuntimeError::UnknownSubnet(subnet_id) - } else { - RuntimeError::UnableToGetSourceHead(subnet_id, internal.to_string()) - } - } - e => RuntimeError::UnableToGetSourceHead(subnet_id, e.to_string()), - }); - - // TODO: Initial genesis certificate eventually will be fetched from the topos subnet - // Currently, for subnet starting from scratch there are no certificates in the database - // So for MissingHeadForSubnet error we will return some default dummy certificate - if let Err(RuntimeError::UnknownSubnet(subnet_id)) = result { - warn!("Returning dummy certificate as head certificate, to be fixed..."); - result = Ok(( - 0, - topos_core::uci::Certificate { - prev_id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, - source_subnet_id: subnet_id, - state_root: Default::default(), - tx_root_hash: Default::default(), - target_subnets: vec![], - verifier: 0, - id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, - proof: Default::default(), - signature: Default::default(), - }, - )); - }; - - _ = sender.send(result); - } - - ApiEvent::GetLastPendingCertificates { - mut subnet_ids, - sender, - } => { - let mut last_pending_certificates: HashMap> = - subnet_ids - .iter() - .map(|subnet_id| (*subnet_id, None)) - .collect(); - - if let Ok(pending_certificates) = - self.pending_storage.get_pending_certificates().await - { - // Iterate through pending certificates and determine last one for every subnet - // Last certificate in the subnet should be one with the highest index - for (pending_certificate_id, cert) in pending_certificates.into_iter().rev() { - if let Some(subnet_id) = subnet_ids.take(&cert.source_subnet_id) { - *last_pending_certificates.entry(subnet_id).or_insert(None) = - Some(cert); - } - if subnet_ids.is_empty() { - break; - } - } - } - - // Add None pending certificate for any other requested subnet_id - subnet_ids.iter().for_each(|subnet_id| { - last_pending_certificates.insert(*subnet_id, None); - }); - - _ = sender.send(Ok(last_pending_certificates)); - } - } - } - - async fn on_protocol_event(&mut self, evt: ProtocolEvents) { - match evt { - ProtocolEvents::StableSample => { - info!("Stable Sample detected"); - self.api_client.set_active_sample(true).await; - if self.events.send(Events::StableSample).await.is_err() { - error!("Unable to send StableSample event"); - } - } - - ProtocolEvents::Broadcast { certificate_id } => { - info!("Broadcasting certificate {}", certificate_id); - } - - ProtocolEvents::CertificateDelivered { certificate } => { - warn!("Certificate delivered {}", certificate.id); - CERTIFICATE_DELIVERED_TOTAL.inc(); - let storage = self.pending_storage.clone(); - let api_client = self.api_client.clone(); - - let certificate_id = certificate.id; - spawn(async move { - match storage - .certificate_delivered(certificate_id, Some(certificate.clone())) - .await - { - Ok(positions) => { - api_client - .dispatch_certificate( - certificate, - positions - .targets - .into_iter() - .map(|(subnet_id, certificate_target_stream_position)| { - ( - subnet_id, - TargetStreamPosition { - target_subnet_id: - certificate_target_stream_position - .target_subnet_id, - source_subnet_id: - certificate_target_stream_position - .source_subnet_id, - position: certificate_target_stream_position - .position - .0, - certificate_id: Some(certificate_id), - }, - ) - }) - .collect::>(), - ) - .await; - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateNotFound(_), - )) => { - error!( - "Certificate {} not found in pending storage", - certificate_id - ); - } - Err(e) => { - error!("Pending storage error while delivering certificate: {e}"); - } - }; - }); - } - - ProtocolEvents::Gossip { cert, ctx } => { - let span = info_span!( - parent: &ctx, - "SEND Outbound Gossip", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "producer", - ); - let cert_id = cert.id; - - let data = NetworkMessage::from(TceCommands::OnGossip { - cert, - ctx: PropagationContext::inject(&span.context()), - }); - - info!("Sending Gossip for certificate {}", cert_id); - if let Err(e) = self - .network_client - .publish::(topos_p2p::TOPOS_GOSSIP, data) - .await - { - error!("Unable to send Gossip due to error: {e}"); - } - } - - ProtocolEvents::Echo { - certificate_id, - ctx, - } => { - let span = info_span!( - parent: &ctx, - "SEND Outbound Echo", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "producer", - ); - let my_peer_id = self.network_client.local_peer_id; - // Send echo message - let data = NetworkMessage::from(TceCommands::OnEcho { - certificate_id, - ctx: PropagationContext::inject(&span.context()), - }); - - if let Err(e) = self - .network_client - .publish::(topos_p2p::TOPOS_ECHO, data) - .await - { - error!("Unable to send Echo due to error: {e}"); - } - } - - ProtocolEvents::Ready { - certificate_id, - ctx, - } => { - let span = info_span!( - parent: &ctx, - "SEND Outbound Ready", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "producer", - ); - let my_peer_id = self.network_client.local_peer_id; - let data = NetworkMessage::from(TceCommands::OnReady { - certificate_id, - ctx: PropagationContext::inject(&span.context()), - }); - - if let Err(e) = self - .network_client - .publish::(topos_p2p::TOPOS_READY, data) - .await - { - error!("Unable to send Ready due to error: {e}"); - } - } - - evt => { - debug!("Unhandled event: {:?}", evt); - } - } - } - - async fn on_net_event(&mut self, evt: NetEvent) { - trace!( - "on_net_event: peer: {} event {:?}", - &self.network_client.local_peer_id, - &evt - ); - - if let NetEvent::Gossip { from, data } = evt { - let msg: NetworkMessage = data.into(); - - if let NetworkMessage::Cmd(cmd) = msg { - match cmd { - TceCommands::OnGossip { cert, ctx } => { - let span = info_span!( - "RECV Outbound Gossip", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "consumer", - sender = from.to_string() - ); - let parent = ctx.extract(); - span.add_link(parent.span().span_context().clone()); - - let channel = self.tce_cli.get_double_echo_channel(); - - spawn(async move { - info!("Send certificate to be broadcast"); - if channel - .send(DoubleEchoCommand::Broadcast { - cert, - need_gossip: false, - ctx: span, - }) - .await - .is_err() - { - error!("Unable to send broadcast_new_certificate command, Receiver was dropped"); - } - }); - } - - TceCommands::OnEcho { - certificate_id, - ctx, - } => { - let span = info_span!( - "RECV Outbound Echo", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "consumer", - sender = from.to_string() - ); - let context = ctx.extract(); - span.add_link(context.span().span_context().clone()); - - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - if let Err(e) = channel - .send(DoubleEchoCommand::Echo { - from_peer: from, - certificate_id, - ctx: span.clone(), - }) - .with_context(span.context().clone()) - .instrument(span) - .await - { - error!("Unable to send Echo, {:?}", e); - } - }); - } - TceCommands::OnReady { - certificate_id, - ctx, - } => { - let span = info_span!( - "RECV Outbound Ready", - peer_id = self.network_client.local_peer_id.to_string(), - "otel.kind" = "consumer", - sender = from.to_string() - ); - let context = ctx.extract(); - span.add_link(context.span().span_context().clone()); - - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - if let Err(e) = channel - .send(DoubleEchoCommand::Ready { - from_peer: from, - certificate_id, - ctx: span.clone(), - }) - .with_context(context) - .instrument(span) - .await - { - error!("Unable to send Ready {:?}", e); - } - }); - } - _ => {} - } - } - } - } - pub async fn shutdown(&mut self) -> Result<(), Box> { info!("Shutting down the TCE client..."); self.api_client.shutdown().await?; diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs new file mode 100644 index 000000000..084c7ae12 --- /dev/null +++ b/crates/topos-tce/src/app_context/api.rs @@ -0,0 +1,142 @@ +use opentelemetry::trace::FutureExt as TraceFutureExt; +use std::collections::HashMap; +use tokio::spawn; +use topos_core::uci::{Certificate, SubnetId}; +use topos_tce_api::RuntimeError; +use topos_tce_api::RuntimeEvent as ApiEvent; +use topos_tce_gatekeeper::GatekeeperError; +use topos_tce_storage::errors::{InternalStorageError, StorageError}; +use tracing::{error, info, info_span, warn, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::events::Events; +use crate::AppContext; + +impl AppContext { + pub async fn on_api_event(&mut self, event: ApiEvent) { + match event { + ApiEvent::CertificateSubmitted { + certificate, + sender, + ctx, + } => { + let span = info_span!(parent: &ctx, "TCE Runtime"); + + _ = self + .tce_cli + .broadcast_new_certificate(*certificate, true) + .with_context(span.context()) + .instrument(span) + .await; + + _ = sender.send(Ok(())); + } + + ApiEvent::PeerListPushed { peers, sender } => { + let sampler = self.tce_cli.clone(); + let gatekeeper = self.gatekeeper.clone(); + let events = self.events.clone(); + let api = self.api_client.clone(); + + spawn(async move { + match gatekeeper.push_peer_list(peers).await { + Ok(peers) => { + info!("Gatekeeper has detected changes on the peer list, new sample in creation"); + if sampler.peer_changed(peers).await.is_err() { + _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); + } else { + api.set_active_sample(true).await; + if events.send(Events::StableSample).await.is_err() { + error!("Unable to send StableSample event"); + } + _ = sender.send(Ok(())); + } + } + Err(GatekeeperError::NoUpdate) => { + _ = sender.send(Ok(())); + } + Err(_) => { + _ = sender.send(Err(RuntimeError::UnableToPushPeerList)); + } + } + }); + } + + ApiEvent::GetSourceHead { subnet_id, sender } => { + // Get source head certificate + let mut result = self + .pending_storage + .get_source_head(subnet_id) + .await + .map_err(|e| match e { + StorageError::InternalStorage(internal) => { + if let InternalStorageError::MissingHeadForSubnet(subnet_id) = internal + { + RuntimeError::UnknownSubnet(subnet_id) + } else { + RuntimeError::UnableToGetSourceHead(subnet_id, internal.to_string()) + } + } + e => RuntimeError::UnableToGetSourceHead(subnet_id, e.to_string()), + }); + + // TODO: Initial genesis certificate eventually will be fetched from the topos subnet + // Currently, for subnet starting from scratch there are no certificates in the database + // So for MissingHeadForSubnet error we will return some default dummy certificate + if let Err(RuntimeError::UnknownSubnet(subnet_id)) = result { + warn!("Returning dummy certificate as head certificate, to be fixed..."); + result = Ok(( + 0, + topos_core::uci::Certificate { + prev_id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, + source_subnet_id: subnet_id, + state_root: Default::default(), + tx_root_hash: Default::default(), + target_subnets: vec![], + verifier: 0, + id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID, + proof: Default::default(), + signature: Default::default(), + }, + )); + }; + + _ = sender.send(result); + } + + ApiEvent::GetLastPendingCertificates { + mut subnet_ids, + sender, + } => { + let mut last_pending_certificates: HashMap> = + subnet_ids + .iter() + .map(|subnet_id| (*subnet_id, None)) + .collect(); + + if let Ok(pending_certificates) = + self.pending_storage.get_pending_certificates().await + { + // Iterate through pending certificates and determine last one for every subnet + // Last certificate in the subnet should be one with the highest index + for (_pending_certificate_id, cert) in pending_certificates.into_iter().rev() { + if let Some(subnet_id) = subnet_ids.take(&cert.source_subnet_id) { + *last_pending_certificates.entry(subnet_id).or_insert(None) = + Some(cert); + } + if subnet_ids.is_empty() { + break; + } + } + } + + // Add None pending certificate for any other requested subnet_id + subnet_ids.iter().for_each(|subnet_id| { + last_pending_certificates.insert(*subnet_id, None); + }); + + _ = sender.send(Ok(last_pending_certificates)); + } + } + } +} diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs new file mode 100644 index 000000000..2fdc72211 --- /dev/null +++ b/crates/topos-tce/src/app_context/network.rs @@ -0,0 +1,116 @@ +use opentelemetry::trace::{FutureExt as TraceFutureExt, TraceContextExt}; +use tce_transport::TceCommands; +use tokio::spawn; +use topos_p2p::Event as NetEvent; +use topos_tce_broadcast::DoubleEchoCommand; +use tracing::{error, info, info_span, trace, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::messages::NetworkMessage; +use crate::AppContext; + +impl AppContext { + pub async fn on_net_event(&mut self, evt: NetEvent) { + trace!( + "on_net_event: peer: {} event {:?}", + &self.network_client.local_peer_id, + &evt + ); + + if let NetEvent::Gossip { from, data } = evt { + let msg: NetworkMessage = data.into(); + + if let NetworkMessage::Cmd(cmd) = msg { + match cmd { + TceCommands::OnGossip { cert, ctx } => { + let span = info_span!( + "RECV Outbound Gossip", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "consumer", + sender = from.to_string() + ); + let parent = ctx.extract(); + span.add_link(parent.span().span_context().clone()); + + let channel = self.tce_cli.get_double_echo_channel(); + + spawn(async move { + info!("Send certificate to be broadcast"); + if channel + .send(DoubleEchoCommand::Broadcast { + cert, + need_gossip: false, + ctx: span, + }) + .await + .is_err() + { + error!("Unable to send broadcast_new_certificate command, Receiver was dropped"); + } + }); + } + + TceCommands::OnEcho { + certificate_id, + ctx, + } => { + let span = info_span!( + "RECV Outbound Echo", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "consumer", + sender = from.to_string() + ); + let context = ctx.extract(); + span.add_link(context.span().span_context().clone()); + + let channel = self.tce_cli.get_double_echo_channel(); + spawn(async move { + if let Err(e) = channel + .send(DoubleEchoCommand::Echo { + from_peer: from, + certificate_id, + ctx: span.clone(), + }) + .with_context(span.context().clone()) + .instrument(span) + .await + { + error!("Unable to send Echo, {:?}", e); + } + }); + } + TceCommands::OnReady { + certificate_id, + ctx, + } => { + let span = info_span!( + "RECV Outbound Ready", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "consumer", + sender = from.to_string() + ); + let context = ctx.extract(); + span.add_link(context.span().span_context().clone()); + + let channel = self.tce_cli.get_double_echo_channel(); + spawn(async move { + if let Err(e) = channel + .send(DoubleEchoCommand::Ready { + from_peer: from, + certificate_id, + ctx: span.clone(), + }) + .with_context(context) + .instrument(span) + .await + { + error!("Unable to send Ready {:?}", e); + } + }); + } + _ => {} + } + } + } + } +} diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs new file mode 100644 index 000000000..85dcb9621 --- /dev/null +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -0,0 +1,164 @@ +use std::collections::HashMap; +use tce_transport::{ProtocolEvents, TceCommands}; +use tokio::spawn; +use topos_core::api::grpc::checkpoints::TargetStreamPosition; +use topos_core::uci::SubnetId; +use topos_metrics::CERTIFICATE_DELIVERED_TOTAL; +use topos_tce_storage::errors::{InternalStorageError, StorageError}; +use topos_telemetry::PropagationContext; +use tracing::{debug, error, info, info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +use crate::events::Events; +use crate::messages::NetworkMessage; +use crate::AppContext; + +impl AppContext { + pub async fn on_protocol_event(&mut self, evt: ProtocolEvents) { + match evt { + ProtocolEvents::StableSample => { + info!("Stable Sample detected"); + self.api_client.set_active_sample(true).await; + if self.events.send(Events::StableSample).await.is_err() { + error!("Unable to send StableSample event"); + } + } + + ProtocolEvents::Broadcast { certificate_id } => { + info!("Broadcasting certificate {}", certificate_id); + } + + ProtocolEvents::CertificateDelivered { certificate } => { + warn!("Certificate delivered {}", certificate.id); + CERTIFICATE_DELIVERED_TOTAL.inc(); + let storage = self.pending_storage.clone(); + let api_client = self.api_client.clone(); + + let certificate_id = certificate.id; + spawn(async move { + match storage + .certificate_delivered(certificate_id, Some(certificate.clone())) + .await + { + Ok(positions) => { + api_client + .dispatch_certificate( + certificate, + positions + .targets + .into_iter() + .map(|(subnet_id, certificate_target_stream_position)| { + ( + subnet_id, + TargetStreamPosition { + target_subnet_id: + certificate_target_stream_position + .target_subnet_id, + source_subnet_id: + certificate_target_stream_position + .source_subnet_id, + position: certificate_target_stream_position + .position + .0, + certificate_id: Some(certificate_id), + }, + ) + }) + .collect::>(), + ) + .await; + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateNotFound(_), + )) => { + error!( + "Certificate {} not found in pending storage", + certificate_id + ); + } + Err(e) => { + error!("Pending storage error while delivering certificate: {e}"); + } + }; + }); + } + + ProtocolEvents::Gossip { cert, ctx } => { + let span = info_span!( + parent: &ctx, + "SEND Outbound Gossip", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "producer", + ); + let cert_id = cert.id; + + let data = NetworkMessage::from(TceCommands::OnGossip { + cert, + ctx: PropagationContext::inject(&span.context()), + }); + + info!("Sending Gossip for certificate {}", cert_id); + if let Err(e) = self + .network_client + .publish::(topos_p2p::TOPOS_GOSSIP, data) + .await + { + error!("Unable to send Gossip due to error: {e}"); + } + } + + ProtocolEvents::Echo { + certificate_id, + ctx, + } => { + let span = info_span!( + parent: &ctx, + "SEND Outbound Echo", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "producer", + ); + // Send echo message + let data = NetworkMessage::from(TceCommands::OnEcho { + certificate_id, + ctx: PropagationContext::inject(&span.context()), + }); + + if let Err(e) = self + .network_client + .publish::(topos_p2p::TOPOS_ECHO, data) + .await + { + error!("Unable to send Echo due to error: {e}"); + } + } + + ProtocolEvents::Ready { + certificate_id, + ctx, + } => { + let span = info_span!( + parent: &ctx, + "SEND Outbound Ready", + peer_id = self.network_client.local_peer_id.to_string(), + "otel.kind" = "producer", + ); + let data = NetworkMessage::from(TceCommands::OnReady { + certificate_id, + ctx: PropagationContext::inject(&span.context()), + }); + + if let Err(e) = self + .network_client + .publish::(topos_p2p::TOPOS_READY, data) + .await + { + error!("Unable to send Ready due to error: {e}"); + } + } + + evt => { + debug!("Unhandled event: {:?}", evt); + } + } + } +} diff --git a/crates/topos-tce/src/config.rs b/crates/topos-tce/src/config.rs new file mode 100644 index 000000000..735f3dee8 --- /dev/null +++ b/crates/topos-tce/src/config.rs @@ -0,0 +1,30 @@ +use std::net::SocketAddr; +use std::path::PathBuf; +use std::time::Duration; + +use tce_transport::ReliableBroadcastParams; +use topos_p2p::{Multiaddr, PeerId}; + +pub use crate::AppContext; + +#[derive(Debug)] +pub struct TceConfiguration { + pub local_key_seed: Option>, + pub tce_params: ReliableBroadcastParams, + pub boot_peers: Vec<(PeerId, Multiaddr)>, + pub api_addr: SocketAddr, + pub graphql_api_addr: SocketAddr, + pub metrics_api_addr: SocketAddr, + pub tce_addr: String, + pub tce_local_port: u16, + pub storage: StorageConfiguration, + pub network_bootstrap_timeout: Duration, + pub minimum_cluster_size: usize, + pub version: &'static str, +} + +#[derive(Debug)] +pub enum StorageConfiguration { + RAM, + RocksDB(Option), +} diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index f2ede934b..1156cc25f 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -1,44 +1,22 @@ -mod app_context; - use std::future::IntoFuture; -use std::net::SocketAddr; -use std::path::PathBuf; -use std::time::Duration; -pub use app_context::AppContext; +use config::TceConfiguration; use opentelemetry::global; -use serde::{Deserialize, Serialize}; -use tce_transport::{ReliableBroadcastParams, TceCommands}; use tokio::{spawn, sync::mpsc, sync::oneshot}; use topos_p2p::utils::local_key_pair_from_slice; -use topos_p2p::{utils::local_key_pair, Multiaddr, PeerId}; +use topos_p2p::{utils::local_key_pair, Multiaddr}; use topos_tce_broadcast::{ReliableBroadcastClient, ReliableBroadcastConfig}; use topos_tce_storage::{Connection, RocksDBStorage}; use tracing::{debug, warn}; +mod app_context; +pub mod config; pub mod events; +pub mod messages; -#[derive(Debug)] -pub struct TceConfiguration { - pub local_key_seed: Option>, - pub tce_params: ReliableBroadcastParams, - pub boot_peers: Vec<(PeerId, Multiaddr)>, - pub api_addr: SocketAddr, - pub graphql_api_addr: SocketAddr, - pub metrics_api_addr: SocketAddr, - pub tce_addr: String, - pub tce_local_port: u16, - pub storage: StorageConfiguration, - pub network_bootstrap_timeout: Duration, - pub minimum_cluster_size: usize, - pub version: &'static str, -} +pub use app_context::AppContext; -#[derive(Debug)] -pub enum StorageConfiguration { - RAM, - RocksDB(Option), -} +use crate::config::StorageConfiguration; pub async fn run( config: &TceConfiguration, @@ -156,37 +134,3 @@ pub async fn run( global::shutdown_tracer_provider(); Ok(()) } - -/// Definition of networking payload. -/// -/// We assume that only Commands will go through the network, -/// [Response] is used to allow reporting of logic errors to the caller. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum NetworkMessage { - Cmd(TceCommands), - Bulk(Vec), - - NotReady(topos_p2p::NotReadyMessage), -} - -// deserializer -impl From> for NetworkMessage { - fn from(data: Vec) -> Self { - bincode::deserialize::(data.as_ref()).expect("msg deser") - } -} - -// serializer -impl From for Vec { - fn from(msg: NetworkMessage) -> Self { - bincode::serialize::(&msg).expect("msg ser") - } -} - -// transformer of protocol commands into network commands -impl From for NetworkMessage { - fn from(cmd: TceCommands) -> Self { - Self::Cmd(cmd) - } -} diff --git a/crates/topos-tce/src/messages.rs b/crates/topos-tce/src/messages.rs new file mode 100644 index 000000000..6f00c2a8c --- /dev/null +++ b/crates/topos-tce/src/messages.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; +use tce_transport::TceCommands; + +/// Definition of networking payload. +/// +/// We assume that only Commands will go through the network, +/// [Response] is used to allow reporting of logic errors to the caller. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] +pub enum NetworkMessage { + Cmd(TceCommands), + Bulk(Vec), + + NotReady(topos_p2p::NotReadyMessage), +} + +// deserializer +impl From> for NetworkMessage { + fn from(data: Vec) -> Self { + bincode::deserialize::(data.as_ref()).expect("msg deser") + } +} + +// serializer +impl From for Vec { + fn from(msg: NetworkMessage) -> Self { + bincode::serialize::(&msg).expect("msg ser") + } +} + +// transformer of protocol commands into network commands +impl From for NetworkMessage { + fn from(cmd: TceCommands) -> Self { + Self::Cmd(cmd) + } +} diff --git a/crates/topos/src/components/tce/mod.rs b/crates/topos/src/components/tce/mod.rs index 76e9b7aff..8fa4de106 100644 --- a/crates/topos/src/components/tce/mod.rs +++ b/crates/topos/src/components/tce/mod.rs @@ -15,7 +15,7 @@ use topos_core::api::grpc::tce::v1::{ api_service_client::ApiServiceClient, console_service_client::ConsoleServiceClient, }; use topos_p2p::config::NetworkConfig; -use topos_tce::{StorageConfiguration, TceConfiguration}; +use topos_tce::config::{StorageConfiguration, TceConfiguration}; use tower::Service; use tracing::{debug, error, info, warn}; diff --git a/crates/topos/src/components/tce/parser.rs b/crates/topos/src/components/tce/parser.rs index e9bccb604..05ecaefd5 100644 --- a/crates/topos/src/components/tce/parser.rs +++ b/crates/topos/src/components/tce/parser.rs @@ -15,7 +15,7 @@ use tokio::{ use tonic::transport::Channel; use topos_core::api::grpc::tce::v1::console_service_client::ConsoleServiceClient; use topos_p2p::PeerId; -use topos_tce::{StorageConfiguration, TceConfiguration}; +use topos_tce::config::{StorageConfiguration, TceConfiguration}; use tower::Service; use tracing::{debug, error, info, trace};