diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index f15b55049..644df898e 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -24,6 +24,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Clone)] pub struct DeliveryState { pub subscriptions: SubscriptionsView, + pub ready_sent: bool, + pub delivered: bool, ctx: Span, } @@ -406,7 +408,6 @@ impl DoubleEcho { match sample_type { SampleType::EchoSubscription => state.subscriptions.echo.remove(from_peer), SampleType::ReadySubscription => state.subscriptions.ready.remove(from_peer), - SampleType::DeliverySubscription => state.subscriptions.delivery.remove(from_peer), _ => false, }; } @@ -420,7 +421,6 @@ impl DoubleEcho { pub(crate) fn handle_ready(&mut self, from_peer: PeerId, certificate_id: &CertificateId) { if let Some((_certificate, state)) = self.cert_candidate.get_mut(certificate_id) { Self::sample_consume_peer(&from_peer, state, SampleType::ReadySubscription); - Self::sample_consume_peer(&from_peer, state, SampleType::DeliverySubscription); } } @@ -476,9 +476,8 @@ impl DoubleEcho { .unwrap_or_else(Span::current); if origin { - warn!("📣 Gossipping the Certificate {}", &cert.id); + warn!("📣 Gossiping the Certificate {}", &cert.id); let _ = self.event_sender.send(ProtocolEvents::Gossip { - // peers: gossip_peers, // considered as the G-set for erdos-renyi cert: cert.clone(), ctx: span, }); @@ -520,7 +519,6 @@ impl DoubleEcho { .unwrap_or_else(Span::current); let _ = self.event_sender.send(ProtocolEvents::Echo { - // peers: echo_peers, certificate_id: cert.id, ctx, }); @@ -532,17 +530,13 @@ impl DoubleEcho { certificate_id: &CertificateId, ) -> Option { let subscriptions = self.subscriptions.clone(); - // check inbound sets are not empty - if subscriptions.echo.is_empty() - || subscriptions.ready.is_empty() - || subscriptions.delivery.is_empty() - { + // Check whether inbound sets are empty + if subscriptions.echo.is_empty() || subscriptions.ready.is_empty() { error!( - "One Subscription sample is empty: Echo({}), Ready({}), Delivery({})", + "One Subscription sample is empty: Echo({}), Ready({})", subscriptions.echo.is_empty(), subscriptions.ready.is_empty(), - subscriptions.delivery.is_empty() ); None } else { @@ -552,7 +546,12 @@ impl DoubleEcho { .cloned() .unwrap_or_else(Span::current); - Some(DeliveryState { subscriptions, ctx }) + Some(DeliveryState { + subscriptions, + ready_sent: false, + delivered: false, + ctx, + }) } } @@ -560,41 +559,40 @@ impl DoubleEcho { debug!("StateChangeFollowUp called"); let mut state_modified = false; let mut gen_evts = Vec::::new(); + // For all current Cert on processing for (certificate_id, (certificate, state_to_delivery)) in &mut self.cert_candidate { - if !state_to_delivery.subscriptions.ready.is_empty() - && (is_e_ready( - &self.params, - self.subscriptions.network_size, - state_to_delivery, - ) || is_r_ready( + // Check whether we should send Ready + if !state_to_delivery.ready_sent + && is_r_ready( &self.params, self.subscriptions.network_size, state_to_delivery, - )) + ) { // Fanout the Ready messages to my subscribers gen_evts.push(ProtocolEvents::Ready { certificate_id: certificate.id, ctx: state_to_delivery.ctx.clone(), }); - state_to_delivery.subscriptions.ready.clear(); - state_to_delivery.subscriptions.echo.clear(); + + state_to_delivery.ready_sent = true; state_modified = true; } - if !state_to_delivery.subscriptions.delivery.is_empty() + // Check whether we should deliver + if !state_to_delivery.delivered && is_ok_to_deliver( &self.params, self.subscriptions.network_size, state_to_delivery, ) { - state_to_delivery.subscriptions.delivery.clear(); self.pending_delivery.insert( *certificate_id, (certificate.clone(), state_to_delivery.ctx.clone()), ); + state_to_delivery.delivered = true; state_modified = true; } @@ -613,28 +611,23 @@ impl DoubleEcho { let delivery_missing = self .subscriptions .network_size - .checked_sub(state_to_delivery.subscriptions.delivery.len()) + .checked_sub(state_to_delivery.subscriptions.ready.len()) .map(|consumed| self.params.delivery_threshold.saturating_sub(consumed)) .unwrap_or(0); debug!("Waiting for {echo_missing} Echo from the E-Sample"); trace!("Echo Sample: {:?}", state_to_delivery.subscriptions.echo); - debug!("Waiting for {ready_missing} Ready from the R-Sample"); - trace!("Ready Sample: {:?}", state_to_delivery.subscriptions.ready); - - debug!("Waiting for {delivery_missing} Ready from the D-Sample"); - trace!( - "Delivery Sample: {:?}", - state_to_delivery.subscriptions.delivery + debug!( + "Waiting for {ready_missing}-R and {delivery_missing}-D Ready from the R-Sample" ); + trace!("Ready Sample: {:?}", state_to_delivery.subscriptions.ready); } if state_modified { // Keep the candidates only if not delivered, or not (E|R)-Ready yet - self.cert_candidate.retain(|c, (_, state)| { - !self.pending_delivery.contains_key(c) || !state.subscriptions.ready.is_empty() - }); + self.cert_candidate + .retain(|_, (_, state)| !state.delivered || !state.ready_sent); let delivered_certificates = self .pending_delivery @@ -696,36 +689,36 @@ impl DoubleEcho { } } -// state checkers +/// Predicate on whether we reached the threshold to deliver the Certificate fn is_ok_to_deliver( params: &ReliableBroadcastParams, network_size: usize, state: &DeliveryState, ) -> bool { - match network_size.checked_sub(state.subscriptions.delivery.len()) { + // If reached the delivery threshold, I can deliver + match network_size.checked_sub(state.subscriptions.ready.len()) { Some(consumed) => consumed >= params.delivery_threshold, None => false, } } -fn is_e_ready( +/// Predicate on whether we reached the threshold to send our Ready for this Certificate +fn is_r_ready( params: &ReliableBroadcastParams, network_size: usize, state: &DeliveryState, ) -> bool { - match network_size.checked_sub(state.subscriptions.echo.len()) { + // Compute the threshold + let reached_echo_threshold = match network_size.checked_sub(state.subscriptions.echo.len()) { Some(consumed) => consumed >= params.echo_threshold, None => false, - } -} + }; -fn is_r_ready( - params: &ReliableBroadcastParams, - network_size: usize, - state: &DeliveryState, -) -> bool { - match network_size.checked_sub(state.subscriptions.ready.len()) { + let reached_ready_threshold = match network_size.checked_sub(state.subscriptions.ready.len()) { Some(consumed) => consumed >= params.ready_threshold, None => false, - } + }; + + // If reached any of the Echo or Ready thresholds, I send the Ready + reached_echo_threshold || reached_ready_threshold } diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index 6b94a55d3..4d05a907b 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -176,7 +176,6 @@ impl ReliableBroadcastClient { .send(SubscriptionsView { echo: set.clone(), ready: set.clone(), - delivery: set.clone(), network_size: set.len(), }) .await diff --git a/crates/topos-tce-broadcast/src/sampler/mod.rs b/crates/topos-tce-broadcast/src/sampler/mod.rs index 960edabca..c7bf40a51 100644 --- a/crates/topos-tce-broadcast/src/sampler/mod.rs +++ b/crates/topos-tce-broadcast/src/sampler/mod.rs @@ -1,30 +1,28 @@ use std::collections::HashSet; use topos_p2p::PeerId; -// These are all the same in the new, deterministic view (Subscriptions == Subscribers) +/// Categorize what we expect from which peer for the broadcast #[derive(PartialEq, Eq, Hash, Debug, Clone)] pub enum SampleType { - /// Inbound: FROM external peer TO me - /// Message from those I am following + /// Listen Echo from this Sample EchoSubscription, + /// Listen Ready from this Sample ReadySubscription, - DeliverySubscription, - - /// Outbound: FROM me TO external peer - /// Message to my followers + /// Send Echo to this Sample EchoSubscriber, + /// Send Ready to this Sample ReadySubscriber, } -// Need to separate echo and ready (echo removes it from echo set, ready removes it from ready and delivery set) -// TODO: HashSet is all participants, once I receive echo | ready | delivery, I remove it to get to the threshold -// Maybe structure for keeping track of different counters +/// Stateful network view with whom we broadcast the Certificate +/// The Echo and the Ready sets are initially equal to the whole network #[derive(Debug, Clone, Eq, PartialEq, Default)] pub struct SubscriptionsView { - // All have the same peers (whole network) initially + /// Set of Peer from which we listen Echo pub echo: HashSet, + /// Set of Peer from which we listen Ready pub ready: HashSet, - pub delivery: HashSet, + /// Size of the network pub network_size: usize, } @@ -34,15 +32,14 @@ impl SubscriptionsView { } pub fn is_none(&self) -> bool { - self.echo.is_empty() && self.ready.is_empty() && self.delivery.is_empty() + self.echo.is_empty() && self.ready.is_empty() } - /// Current view of subscriptions of the node, which is the whole network + /// Current view of subscriptions of the node, which is initially the whole network pub fn get_subscriptions(&self) -> Vec { self.echo .iter() .chain(self.ready.iter()) - .chain(self.delivery.iter()) .cloned() .collect::>() .into_iter() diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index 52eaffa1c..8cbee5f2f 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -99,7 +99,6 @@ fn create_context(params: TceParams) -> (DoubleEcho, Context) { // Subscriptions double_echo.subscriptions.echo = peers.clone(); double_echo.subscriptions.ready = peers.clone(); - double_echo.subscriptions.delivery = peers.clone(); double_echo.subscriptions.network_size = params.nb_peers; ( @@ -145,7 +144,7 @@ fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) { let selected = double_echo .subscriptions - .delivery + .ready .iter() .take(double_echo.params.delivery_threshold) .cloned() @@ -304,10 +303,6 @@ async fn process_after_delivery_until_sending_ready(#[case] params: TceParams) { )); // Trigger Delivery upon reaching the Delivery threshold - - // NOTE: Samples are done with replacement - // Reaching the D-threshold may bring the R-threshold reached - // If the overlap between R and D is greater than R-threshold reach_delivery_threshold(&mut double_echo, &dummy_cert); double_echo.state_change_follow_up();