From 594ccc972e445ef66a95e1bdf7576f5b52328c45 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Mon, 3 Jul 2023 11:39:51 +0200 Subject: [PATCH] chore: update double echo Signed-off-by: Simon Paitrault --- .../src/double_echo/mod.rs | 254 +++++++++--------- 1 file changed, 127 insertions(+), 127 deletions(-) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 2e71a9cef..5960f2e11 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -36,7 +36,7 @@ pub struct DeliveryState { pub struct DoubleEcho { pending_certificate_count: u64, - last_pending_certificate: PendingCertificateId, + // _last_pending_certificate: PendingCertificateId, pub(crate) params: ReliableBroadcastParams, command_receiver: mpsc::Receiver, subscriptions_view_receiver: mpsc::Receiver, @@ -56,6 +56,7 @@ pub struct DoubleEcho { pub(crate) shutdown: mpsc::Receiver>, known_certificates: HashSet, + delivered_certificates: HashSet, local_peer_id: String, @@ -77,13 +78,13 @@ impl DoubleEcho { network_client: NetworkClient, shutdown: mpsc::Receiver>, local_peer_id: String, - last_pending_certificate: PendingCertificateId, + _last_pending_certificate: PendingCertificateId, pending_certificate_count: u64, max_buffer_size: usize, ) -> Self { Self { pending_certificate_count, - last_pending_certificate, + // last_pending_certificate, params, command_receiver, subscriptions_view_receiver, @@ -102,6 +103,7 @@ impl DoubleEcho { buffered_messages: Default::default(), max_buffer_size, known_certificates: Default::default(), + delivered_certificates: Default::default(), } } @@ -156,11 +158,11 @@ impl DoubleEcho { }); span.add_link(ctx.context().span().span_context().clone()); let adding_timer = STORAGE_ADDING_PENDING_CERTIFICATE_LATENCY.start_timer(); - let maybe_pending = self - .storage - .add_pending_certificate(cert.clone()) - .instrument(span.clone()) - .await; + // let maybe_pending = self + // .storage + // .add_pending_certificate(cert.clone()) + // .instrument(span.clone()) + // .await; adding_timer.stop_and_record(); self.known_certificates.insert(cert.id); @@ -170,9 +172,9 @@ impl DoubleEcho { if self.buffer.len() < self.max_buffer_size { self.buffer.push_back((need_gossip, cert)); DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc(); - if let Ok(pending) = maybe_pending { - self.last_pending_certificate = pending; - } + // if let Ok(pending) = maybe_pending { + // self.last_pending_certificate = pending; + // } } else { DOUBLE_ECHO_BUFFER_CAPACITY.inc(); // Adding one to the pending_certificate_count because we @@ -201,12 +203,9 @@ impl DoubleEcho { match command { DoubleEchoCommand::Echo { from_peer, certificate_id, ctx } => { async { - let cert_delivered = self.store.cert_by_id(&certificate_id).is_ok(); + let cert_delivered = self.delivered_certificates.get(&certificate_id).is_some(); if !cert_delivered { - if self.storage - .pending_certificate_exists(certificate_id) - .await - .is_ok() + if self.known_certificates.get(&certificate_id).is_some() { let span = if let Some(root) = self.span_tracker.get(&certificate_id) { info!("DEBUG::Receive ECHO with root"); @@ -312,121 +311,121 @@ impl DoubleEcho { }; #[cfg(not(feature = "direct"))] - let has_subscriptions = self.subscriptions.is_some(); + let _has_subscriptions = self.subscriptions.is_some(); #[cfg(feature = "direct")] - let has_subscriptions = true; + let _has_subscriptions = true; // Broadcast next certificate - if has_subscriptions { - // TODO: Remove the unused_variables attribute when the feature direct is removed - #[allow(unused_variables)] - if let Some((need_gossip, cert)) = self.buffer.pop_front() { - DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec(); - if let Some(ctx) = self.span_tracker.get(&cert.id) { - let span = info_span!( - parent: ctx, - "DoubleEcho start dispatching", - certificate_id = cert.id.to_string(), - peer_id = self.local_peer_id, - "otel.kind" = "producer" - ); - let _span = span.entered(); - - let cert_id = cert.id; - #[cfg(feature = "direct")] - { - _ = self - .event_sender - .send(ProtocolEvents::CertificateDelivered { certificate: cert }); - } - #[cfg(not(feature = "direct"))] - self.handle_broadcast(cert, need_gossip); - - if let Some(messages) = self.buffered_messages.remove(&cert_id) { - for message in messages { - DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec(); - match message { - DoubleEchoCommand::Echo { - from_peer, - certificate_id, - .. - } => { - let span = if let Some(root) = - self.span_tracker.get(&certificate_id) - { - info_span!( - parent: root, - "RECV Inbound Echo (Buffered)", - peer = self.local_peer_id, - certificate_id = certificate_id.to_string() - ) - } else { - info_span!( - "RECV Inbound Echo (Buffered)", - peer = self.local_peer_id, - certificate_id = certificate_id.to_string() - ) - }; - - let _enter = span.enter(); - self.handle_echo(from_peer, &certificate_id); - self.state_change_follow_up(); - } - DoubleEchoCommand::Ready { - from_peer, - certificate_id, - .. - } => { - let span = if let Some(root) = - self.span_tracker.get(&certificate_id) - { - info_span!( - parent: root, - "RECV Inbound Ready (Buffered)", - peer = self.local_peer_id, - certificate_id = certificate_id.to_string() - ) - } else { - info_span!( - "RECV Inbound Ready (Buffered)", - peer = self.local_peer_id, - certificate_id = certificate_id.to_string() - ) - }; - - let _enter = span.enter(); - self.handle_ready(from_peer, &certificate_id); - - self.state_change_follow_up(); - } - _ => {} - } - } - } - } else { - warn!( - "No span found for certificate id: {} {:?}", - cert.id, cert.id - ); - } - - // if self.pending_certificate_count > 0 { - // if let Ok(Some((pending, certificate))) = self - // .storage - // .next_pending_certificate(Some(self.last_pending_certificate as usize)) - // .await - // { - // _ = self.pending_certificate_count.checked_sub(1); - // self.last_pending_certificate = pending; - // self.buffer.push_back((false, certificate)); - // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc(); - // } else { - // info!("No more certificate to broadcast"); - // } - // } - } - } + // if has_subscriptions { + // // TODO: Remove the unused_variables attribute when the feature direct is removed + // #[allow(unused_variables)] + // if let Some((need_gossip, cert)) = self.buffer.pop_front() { + // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec(); + // if let Some(ctx) = self.span_tracker.get(&cert.id) { + // let span = info_span!( + // parent: ctx, + // "DoubleEcho start dispatching", + // certificate_id = cert.id.to_string(), + // peer_id = self.local_peer_id, + // "otel.kind" = "producer" + // ); + // let _span = span.entered(); + // + // let cert_id = cert.id; + // #[cfg(feature = "direct")] + // { + // _ = self + // .event_sender + // .send(ProtocolEvents::CertificateDelivered { certificate: cert }); + // } + // #[cfg(not(feature = "direct"))] + // self.handle_broadcast(cert, need_gossip); + // + // if let Some(messages) = self.buffered_messages.remove(&cert_id) { + // for message in messages { + // DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec(); + // match message { + // DoubleEchoCommand::Echo { + // from_peer, + // certificate_id, + // .. + // } => { + // let span = if let Some(root) = + // self.span_tracker.get(&certificate_id) + // { + // info_span!( + // parent: root, + // "RECV Inbound Echo (Buffered)", + // peer = self.local_peer_id, + // certificate_id = certificate_id.to_string() + // ) + // } else { + // info_span!( + // "RECV Inbound Echo (Buffered)", + // peer = self.local_peer_id, + // certificate_id = certificate_id.to_string() + // ) + // }; + // + // let _enter = span.enter(); + // self.handle_echo(from_peer, &certificate_id); + // self.state_change_follow_up(); + // } + // DoubleEchoCommand::Ready { + // from_peer, + // certificate_id, + // .. + // } => { + // let span = if let Some(root) = + // self.span_tracker.get(&certificate_id) + // { + // info_span!( + // parent: root, + // "RECV Inbound Ready (Buffered)", + // peer = self.local_peer_id, + // certificate_id = certificate_id.to_string() + // ) + // } else { + // info_span!( + // "RECV Inbound Ready (Buffered)", + // peer = self.local_peer_id, + // certificate_id = certificate_id.to_string() + // ) + // }; + // + // let _enter = span.enter(); + // self.handle_ready(from_peer, &certificate_id); + // + // self.state_change_follow_up(); + // } + // _ => {} + // } + // } + // } + // } else { + // warn!( + // "No span found for certificate id: {} {:?}", + // cert.id, cert.id + // ); + // } + // + // // if self.pending_certificate_count > 0 { + // // if let Ok(Some((pending, certificate))) = self + // // .storage + // // .next_pending_certificate(Some(self.last_pending_certificate as usize)) + // // .await + // // { + // // _ = self.pending_certificate_count.checked_sub(1); + // // self.last_pending_certificate = pending; + // // self.buffer.push_back((false, certificate)); + // // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc(); + // // } else { + // // info!("No more certificate to broadcast"); + // // } + // // } + // } + // } }; if let Some(sender) = shutdowned { @@ -460,6 +459,7 @@ impl DoubleEcho { } #[cfg_attr(feature = "direct", allow(dead_code))] + #[allow(dead_code)] pub(crate) fn handle_broadcast(&mut self, cert: Certificate, origin: bool) { info!("🙌 Starting broadcasting the Certificate {}", &cert.id);