From 6658f78a43aca4c9009851775c7311a376160387 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Mon, 25 Mar 2024 08:20:50 +0100 Subject: [PATCH] feat: switch tce-lib action to spawn tasks Signed-off-by: Simon Paitrault --- crates/topos-tce/src/app_context/api.rs | 140 ++++++++++--------- crates/topos-tce/src/app_context/network.rs | 116 +++++++-------- crates/topos-tce/src/app_context/protocol.rs | 88 ++++++------ crates/topos-tce/src/tests/mod.rs | 18 ++- 4 files changed, 190 insertions(+), 172 deletions(-) diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index 965087e90..46371bf25 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -1,5 +1,6 @@ use crate::AppContext; use std::collections::HashMap; +use tokio::spawn; use topos_core::uci::{Certificate, SubnetId}; use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_tce_api::RuntimeError; @@ -20,79 +21,82 @@ impl AppContext { self.delivery_latency .insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer()); - _ = match self - .validator_store - .insert_pending_certificate(&certificate) - .await - { - Ok(Some(pending_id)) => { - let certificate_id = certificate.id; - debug!( - "Certificate {} from subnet {} has been inserted into pending pool", - certificate_id, certificate.source_subnet_id - ); + let validator_store = self.validator_store.clone(); + let double_echo = self.tce_cli.get_double_echo_channel(); - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: true, - cert: *certificate, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo for {}", - certificate_id + spawn(async move { + _ = match validator_store + .insert_pending_certificate(&certificate) + .await + { + Ok(Some(pending_id)) => { + let certificate_id = certificate.id; + debug!( + "Certificate {} from subnet {} has been inserted into pending pool", + certificate_id, certificate.source_subnet_id ); - sender.send(Err(RuntimeError::CommunicationError( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo" - .to_string(), - ))) - } else { - sender.send(Ok(PendingResult::InPending(pending_id))) + if double_echo + .send(DoubleEchoCommand::Broadcast { + need_gossip: true, + cert: *certificate, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo for {}", + certificate_id + ); + + sender.send(Err(RuntimeError::CommunicationError( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo" + .to_string(), + ))) + } else { + sender.send(Ok(PendingResult::InPending(pending_id))) + } } - } - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into precedence pool \ - waiting for {}", - certificate.id, certificate.source_subnet_id, certificate.prev_id - ); - sender.send(Ok(PendingResult::AwaitPrecedence)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has already been added to the pending pool, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyPending)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has already been delivered, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyDelivered)) - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - certificate.id, error - ); + Ok(None) => { + debug!( + "Certificate {} from subnet {} has been inserted into precedence \ + pool waiting for {}", + certificate.id, certificate.source_subnet_id, certificate.prev_id + ); + sender.send(Ok(PendingResult::AwaitPrecedence)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has already been added to the pending pool, \ + skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyPending)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyExists, + )) => { + debug!( + "Certificate {} has already been delivered, skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyDelivered)) + } + Err(error) => { + error!( + "Unable to insert pending certificate {}: {}", + certificate.id, error + ); - sender.send(Err(error.into())) - } - }; + sender.send(Err(error.into())) + } + }; + }); } ApiEvent::GetSourceHead { subnet_id, sender } => { diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index acfdb569b..376b4e7ec 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -37,69 +37,71 @@ impl AppContext { { entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); } - info!( - "Received certificate {} from GossipSub from {}", - cert.id, from - ); - - match self.validator_store.insert_pending_certificate(&cert).await { - Ok(Some(pending_id)) => { - let certificate_id = cert.id; - debug!( - "Certificate {} has been inserted into pending pool", - certificate_id - ); + let validator_store = self.validator_store.clone(); + let double_echo = self.tce_cli.get_double_echo_channel(); + spawn(async move { + info!( + "Received certificate {} from GossipSub from {}", + cert.id, from + ); - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: false, - cert, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command \ - to double echo for {}", + match validator_store.insert_pending_certificate(&cert).await { + Ok(Some(pending_id)) => { + let certificate_id = cert.id; + debug!( + "Certificate {} has been inserted into pending pool", certificate_id ); + + if double_echo + .send(DoubleEchoCommand::Broadcast { + need_gossip: false, + cert, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast \ + command to double echo for {}", + certificate_id + ); + } } - } - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into \ - precedence pool waiting for {}", - cert.id, cert.source_subnet_id, cert.prev_id - ); - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has been already added to the pending \ - pool, skipping", - cert.id - ); - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has been already delivered, skipping", - cert.id - ); - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - cert.id, error - ); + Ok(None) => { + debug!( + "Certificate {} from subnet {} has been inserted into \ + precedence pool waiting for {}", + cert.id, cert.source_subnet_id, cert.prev_id + ); + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has been already added to the pending \ + pool, skipping", + cert.id + ); + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyExists, + )) => { + debug!( + "Certificate {} has been already delivered, skipping", + cert.id + ); + } + Err(error) => { + error!( + "Unable to insert pending certificate {}: {}", + cert.id, error + ); + } } - } + }); } Err(e) => { error!("Failed to parse the received Certificate: {e}"); diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 278a13c0a..c516a7dc5 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -1,3 +1,4 @@ +use tokio::spawn; use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; use topos_tce_broadcast::event::ProtocolEvents; use tracing::{error, info, warn}; @@ -14,20 +15,22 @@ impl AppContext { ProtocolEvents::Gossip { cert } => { let cert_id = cert.id; - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Gossip(Gossip { - certificate: Some(cert.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Gossip(Gossip { + certificate: Some(cert.into()), + })), + }; - info!("Sending Gossip for certificate {}", cert_id); - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_GOSSIP, request) - .await - { - error!("Unable to send Gossip: {e}"); - } + info!("Sending Gossip for certificate {}", cert_id); + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_GOSSIP, request) + .await + { + error!("Unable to send Gossip: {e}"); + } + }); } ProtocolEvents::Echo { @@ -35,22 +38,21 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - // Send echo message - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + // Send echo message + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Echo(Echo { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_ECHO, request) - .await - { - error!("Unable to send Echo: {e}"); - } + if let Err(e) = network_client.publish(topos_p2p::TOPOS_ECHO, request).await { + error!("Unable to send Echo: {e}"); + } + }); } ProtocolEvents::Ready { @@ -58,21 +60,23 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Ready(Ready { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_READY, request) - .await - { - error!("Unable to send Ready: {e}"); - } + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_READY, request) + .await + { + error!("Unable to send Ready: {e}"); + } + }); } ProtocolEvents::BroadcastFailed { certificate_id } => { warn!("Broadcast failed for certificate {certificate_id}") diff --git a/crates/topos-tce/src/tests/mod.rs b/crates/topos-tce/src/tests/mod.rs index 39ea06d8b..21249066d 100644 --- a/crates/topos-tce/src/tests/mod.rs +++ b/crates/topos-tce/src/tests/mod.rs @@ -1,6 +1,6 @@ use libp2p::PeerId; use rstest::{fixture, rstest}; -use std::{collections::HashSet, future::IntoFuture, sync::Arc}; +use std::{collections::HashSet, future::IntoFuture, sync::Arc, time::Duration}; use tokio_stream::Stream; use topos_tce_api::RuntimeEvent; use topos_tce_broadcast::event::ProtocolEvents; @@ -41,8 +41,8 @@ async fn non_validator_publish_gossip( .await; assert!(matches!( - p2p_receiver.try_recv(), - Ok(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip" + p2p_receiver.recv().await, + Some(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip" )); } @@ -64,7 +64,11 @@ async fn non_validator_do_not_publish_echo( }) .await; - assert!(p2p_receiver.try_recv().is_err(),); + assert!( + tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv()) + .await + .is_err() + ); } #[rstest] @@ -85,7 +89,11 @@ async fn non_validator_do_not_publish_ready( }) .await; - assert!(p2p_receiver.try_recv().is_err(),); + assert!( + tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv()) + .await + .is_err() + ); } #[fixture]