Skip to content

Commit

Permalink
chore: remove storage check
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jun 29, 2023
1 parent 5390ac9 commit 0a46fc2
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::Errors;
use crate::{sampler::SampleType, tce_store::TceStore, DoubleEchoCommand, SubscriptionsView};
use opentelemetry::trace::TraceContextExt;
use std::collections::HashSet;
use std::{
collections::{HashMap, VecDeque},
time,
Expand Down Expand Up @@ -47,12 +48,13 @@ pub struct DoubleEcho {

pending_delivery: HashMap<CertificateId, (Certificate, Span)>,
span_tracker: HashMap<CertificateId, Span>,
all_known_certs: Vec<Certificate>,
delivery_time: HashMap<CertificateId, (time::SystemTime, time::Duration)>,
pub(crate) subscriptions: SubscriptionsView, // My subscriptions for echo, ready and delivery feedback
buffer: VecDeque<(bool, Certificate)>,
pub(crate) shutdown: mpsc::Receiver<oneshot::Sender<()>>,

known_certificates: HashSet<CertificateId>,

local_peer_id: String,

buffered_messages: HashMap<CertificateId, Vec<DoubleEchoCommand>>,
Expand Down Expand Up @@ -90,14 +92,14 @@ impl DoubleEcho {
cert_candidate: Default::default(),
pending_delivery: Default::default(),
span_tracker: Default::default(),
all_known_certs: Default::default(),
delivery_time: Default::default(),
subscriptions: SubscriptionsView::default(),
buffer: VecDeque::new(),
shutdown,
local_peer_id,
buffered_messages: Default::default(),
max_buffer_size,
known_certificates: Default::default(),
}
}

Expand Down Expand Up @@ -129,9 +131,12 @@ impl DoubleEcho {

DoubleEchoCommand::Broadcast { need_gossip, cert, ctx } => {
let checking_timer = STORAGE_PENDING_CERTIFICATE_EXISTANCE_LATENCY.start_timer();
if self.storage.get_certificate(cert.id).await.is_err() &&
self.storage.pending_certificate_exists(cert.id).await.is_err()
{
// NOTE: Comment in order to check performance during benchmark
// if self.storage.pending_certificate_exists(cert.id).await.is_err() &&
// self.storage.get_certificate(cert.id).await.is_err()
// {

if !self.known_certificates.contains(&cert.id) {
checking_timer.stop_and_record();
let span = warn_span!("Broadcast", peer_id = self.local_peer_id, certificate_id = cert.id.to_string());
DOUBLE_ECHO_BROADCAST_CREATED.inc();
Expand All @@ -156,6 +161,7 @@ impl DoubleEcho {
.await;
adding_timer.stop_and_record();

self.known_certificates.insert(cert.id);
span.in_scope(||{
info!("Certificate {} added to pending storage", cert.id);
debug!("DoubleEchoCommand::Broadcast certificate_id: {}", cert.id);
Expand Down Expand Up @@ -538,7 +544,6 @@ impl DoubleEcho {
return;
}
}
self.all_known_certs.push(cert.clone());
self.delivery_time
.insert(cert.id, (time::SystemTime::now(), Default::default()));

Expand Down

0 comments on commit 0a46fc2

Please sign in to comment.