diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 5960f2e11..83959c1d0 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -108,10 +108,26 @@ impl DoubleEcho { } pub(crate) async fn run(mut self) { + // DoubleEcho main loop + // - On each tick, check if there is certificate in the buffer + // - If yes, stop the loop and broadcast them + // - Listen for shutdown signal + // - Read new messages from command_receiver + // - If a new certificate is received, add it to the buffer + // - If a new subscription view is received, update the subscriptions + // - If a new Echo/Ready is received, update the state of the certificate or buffer + // the message + info!("DoubleEcho started"); + let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); let shutdowned: Option> = loop { tokio::select! { + _ = interval.tick() => { + if self.buffer.len() > 0 { + break None; + } + }, shutdown = self.shutdown.recv() => { warn!("Double echo shutdown signal received {:?}", shutdown); break shutdown; @@ -311,121 +327,121 @@ impl DoubleEcho { }; #[cfg(not(feature = "direct"))] - let _has_subscriptions = self.subscriptions.is_some(); + let has_subscriptions = self.subscriptions.is_some(); #[cfg(feature = "direct")] - let _has_subscriptions = true; + let has_subscriptions = true; // Broadcast next certificate - // if has_subscriptions { - // // TODO: Remove the unused_variables attribute when the feature direct is removed - // #[allow(unused_variables)] - // if let Some((need_gossip, cert)) = self.buffer.pop_front() { - // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec(); - // if let Some(ctx) = self.span_tracker.get(&cert.id) { - // let span = info_span!( - // parent: ctx, - // "DoubleEcho start dispatching", - // certificate_id = cert.id.to_string(), - // peer_id = self.local_peer_id, - // "otel.kind" = "producer" - // ); - // let _span = span.entered(); - // - // let cert_id = cert.id; - // #[cfg(feature = "direct")] - // { - // _ = self - // .event_sender - // .send(ProtocolEvents::CertificateDelivered { certificate: cert }); - // } - // #[cfg(not(feature = "direct"))] - // self.handle_broadcast(cert, need_gossip); - // - // if let Some(messages) = self.buffered_messages.remove(&cert_id) { - // for message in messages { - // DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec(); - // match message { - // DoubleEchoCommand::Echo { - // from_peer, - // certificate_id, - // .. - // } => { - // let span = if let Some(root) = - // self.span_tracker.get(&certificate_id) - // { - // info_span!( - // parent: root, - // "RECV Inbound Echo (Buffered)", - // peer = self.local_peer_id, - // certificate_id = certificate_id.to_string() - // ) - // } else { - // info_span!( - // "RECV Inbound Echo (Buffered)", - // peer = self.local_peer_id, - // certificate_id = certificate_id.to_string() - // ) - // }; - // - // let _enter = span.enter(); - // self.handle_echo(from_peer, &certificate_id); - // self.state_change_follow_up(); - // } - // DoubleEchoCommand::Ready { - // from_peer, - // certificate_id, - // .. - // } => { - // let span = if let Some(root) = - // self.span_tracker.get(&certificate_id) - // { - // info_span!( - // parent: root, - // "RECV Inbound Ready (Buffered)", - // peer = self.local_peer_id, - // certificate_id = certificate_id.to_string() - // ) - // } else { - // info_span!( - // "RECV Inbound Ready (Buffered)", - // peer = self.local_peer_id, - // certificate_id = certificate_id.to_string() - // ) - // }; - // - // let _enter = span.enter(); - // self.handle_ready(from_peer, &certificate_id); - // - // self.state_change_follow_up(); - // } - // _ => {} - // } - // } - // } - // } else { - // warn!( - // "No span found for certificate id: {} {:?}", - // cert.id, cert.id - // ); - // } - // - // // if self.pending_certificate_count > 0 { - // // if let Ok(Some((pending, certificate))) = self - // // .storage - // // .next_pending_certificate(Some(self.last_pending_certificate as usize)) - // // .await - // // { - // // _ = self.pending_certificate_count.checked_sub(1); - // // self.last_pending_certificate = pending; - // // self.buffer.push_back((false, certificate)); - // // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc(); - // // } else { - // // info!("No more certificate to broadcast"); - // // } - // // } - // } - // } + if has_subscriptions { + // TODO: Remove the unused_variables attribute when the feature direct is removed + #[allow(unused_variables)] + if let Some((need_gossip, cert)) = self.buffer.pop_front() { + DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec(); + if let Some(ctx) = self.span_tracker.get(&cert.id) { + let span = info_span!( + parent: ctx, + "DoubleEcho start dispatching", + certificate_id = cert.id.to_string(), + peer_id = self.local_peer_id, + "otel.kind" = "producer" + ); + let _span = span.entered(); + + let cert_id = cert.id; + #[cfg(feature = "direct")] + { + _ = self + .event_sender + .send(ProtocolEvents::CertificateDelivered { certificate: cert }); + } + #[cfg(not(feature = "direct"))] + self.handle_broadcast(cert, need_gossip); + + if let Some(messages) = self.buffered_messages.remove(&cert_id) { + for message in messages { + DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec(); + match message { + DoubleEchoCommand::Echo { + from_peer, + certificate_id, + .. + } => { + let span = if let Some(root) = + self.span_tracker.get(&certificate_id) + { + info_span!( + parent: root, + "RECV Inbound Echo (Buffered)", + peer = self.local_peer_id, + certificate_id = certificate_id.to_string() + ) + } else { + info_span!( + "RECV Inbound Echo (Buffered)", + peer = self.local_peer_id, + certificate_id = certificate_id.to_string() + ) + }; + + let _enter = span.enter(); + self.handle_echo(from_peer, &certificate_id); + self.state_change_follow_up(); + } + DoubleEchoCommand::Ready { + from_peer, + certificate_id, + .. + } => { + let span = if let Some(root) = + self.span_tracker.get(&certificate_id) + { + info_span!( + parent: root, + "RECV Inbound Ready (Buffered)", + peer = self.local_peer_id, + certificate_id = certificate_id.to_string() + ) + } else { + info_span!( + "RECV Inbound Ready (Buffered)", + peer = self.local_peer_id, + certificate_id = certificate_id.to_string() + ) + }; + + let _enter = span.enter(); + self.handle_ready(from_peer, &certificate_id); + + self.state_change_follow_up(); + } + _ => {} + } + } + } + } else { + warn!( + "No span found for certificate id: {} {:?}", + cert.id, cert.id + ); + } + + // if self.pending_certificate_count > 0 { + // if let Ok(Some((pending, certificate))) = self + // .storage + // .next_pending_certificate(Some(self.last_pending_certificate as usize)) + // .await + // { + // _ = self.pending_certificate_count.checked_sub(1); + // self.last_pending_certificate = pending; + // self.buffer.push_back((false, certificate)); + // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc(); + // } else { + // info!("No more certificate to broadcast"); + // } + // } + } + } }; if let Some(sender) = shutdowned { @@ -688,7 +704,7 @@ impl DoubleEcho { debug!("📝 Accepted[{}]\t Delivery time: {:?}", &certificate_id, d); DOUBLE_ECHO_BROADCAST_FINISHED.inc(); - + self.delivered_certificates.insert(certificate_id); _ = self .event_sender .send(ProtocolEvents::CertificateDelivered {