diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index c7e868892..157964484 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -1,6 +1,7 @@ use crate::Errors; use crate::{sampler::SampleType, tce_store::TceStore, DoubleEchoCommand, SubscriptionsView}; use opentelemetry::trace::TraceContextExt; +use std::collections::HashSet; use std::{ collections::{HashMap, VecDeque}, time, @@ -47,12 +48,13 @@ pub struct DoubleEcho { pending_delivery: HashMap, span_tracker: HashMap, - all_known_certs: Vec, delivery_time: HashMap, pub(crate) subscriptions: SubscriptionsView, // My subscriptions for echo, ready and delivery feedback buffer: VecDeque<(bool, Certificate)>, pub(crate) shutdown: mpsc::Receiver>, + known_certificates: HashSet, + local_peer_id: String, buffered_messages: HashMap>, @@ -90,7 +92,6 @@ impl DoubleEcho { cert_candidate: Default::default(), pending_delivery: Default::default(), span_tracker: Default::default(), - all_known_certs: Default::default(), delivery_time: Default::default(), subscriptions: SubscriptionsView::default(), buffer: VecDeque::new(), @@ -98,6 +99,7 @@ impl DoubleEcho { local_peer_id, buffered_messages: Default::default(), max_buffer_size, + known_certificates: Default::default(), } } @@ -129,9 +131,12 @@ impl DoubleEcho { DoubleEchoCommand::Broadcast { need_gossip, cert, ctx } => { let checking_timer = STORAGE_PENDING_CERTIFICATE_EXISTANCE_LATENCY.start_timer(); - if self.storage.get_certificate(cert.id).await.is_err() && - self.storage.pending_certificate_exists(cert.id).await.is_err() - { + // NOTE: Comment in order to check performance during benchmark + // if self.storage.pending_certificate_exists(cert.id).await.is_err() && + // self.storage.get_certificate(cert.id).await.is_err() + // { + + if !self.known_certificates.contains(&cert.id) { checking_timer.stop_and_record(); let span = warn_span!("Broadcast", peer_id = self.local_peer_id, certificate_id = cert.id.to_string()); DOUBLE_ECHO_BROADCAST_CREATED.inc(); @@ -156,6 +161,7 @@ impl DoubleEcho { .await; adding_timer.stop_and_record(); + self.known_certificates.insert(cert.id); span.in_scope(||{ info!("Certificate {} added to pending storage", cert.id); debug!("DoubleEchoCommand::Broadcast certificate_id: {}", cert.id); @@ -538,7 +544,6 @@ impl DoubleEcho { return; } } - self.all_known_certs.push(cert.clone()); self.delivery_time .insert(cert.id, (time::SystemTime::now(), Default::default()));