Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify the double echo state change #254

Merged
merged 2 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 41 additions & 48 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Clone)]
pub struct DeliveryState {
pub subscriptions: SubscriptionsView,
pub ready_sent: bool,
pub delivered: bool,
ctx: Span,
}

Expand Down Expand Up @@ -395,7 +397,6 @@ impl DoubleEcho {
match sample_type {
SampleType::EchoSubscription => state.subscriptions.echo.remove(from_peer),
SampleType::ReadySubscription => state.subscriptions.ready.remove(from_peer),
SampleType::DeliverySubscription => state.subscriptions.delivery.remove(from_peer),
_ => false,
};
}
Expand All @@ -409,7 +410,6 @@ impl DoubleEcho {
pub(crate) fn handle_ready(&mut self, from_peer: PeerId, certificate_id: &CertificateId) {
if let Some((_certificate, state)) = self.cert_candidate.get_mut(certificate_id) {
Self::sample_consume_peer(&from_peer, state, SampleType::ReadySubscription);
Self::sample_consume_peer(&from_peer, state, SampleType::DeliverySubscription);
}
}

Expand Down Expand Up @@ -465,9 +465,8 @@ impl DoubleEcho {
.unwrap_or_else(Span::current);

if origin {
warn!("📣 Gossipping the Certificate {}", &cert.id);
warn!("📣 Gossiping the Certificate {}", &cert.id);
let _ = self.event_sender.send(ProtocolEvents::Gossip {
// peers: gossip_peers, // considered as the G-set for erdos-renyi
cert: cert.clone(),
ctx: span,
});
Expand Down Expand Up @@ -509,7 +508,6 @@ impl DoubleEcho {
.unwrap_or_else(Span::current);

let _ = self.event_sender.send(ProtocolEvents::Echo {
// peers: echo_peers,
certificate_id: cert.id,
ctx,
});
Expand All @@ -521,17 +519,13 @@ impl DoubleEcho {
certificate_id: &CertificateId,
) -> Option<DeliveryState> {
let subscriptions = self.subscriptions.clone();
// check inbound sets are not empty

if subscriptions.echo.is_empty()
|| subscriptions.ready.is_empty()
|| subscriptions.delivery.is_empty()
{
// Check whether inbound sets are empty
if subscriptions.echo.is_empty() || subscriptions.ready.is_empty() {
error!(
"One Subscription sample is empty: Echo({}), Ready({}), Delivery({})",
"One Subscription sample is empty: Echo({}), Ready({})",
subscriptions.echo.is_empty(),
subscriptions.ready.is_empty(),
subscriptions.delivery.is_empty()
);
None
} else {
Expand All @@ -541,49 +535,53 @@ impl DoubleEcho {
.cloned()
.unwrap_or_else(Span::current);

Some(DeliveryState { subscriptions, ctx })
Some(DeliveryState {
subscriptions,
ready_sent: false,
delivered: false,
ctx,
})
}
}

pub(crate) fn state_change_follow_up(&mut self) {
debug!("StateChangeFollowUp called");
let mut state_modified = false;
let mut gen_evts = Vec::<ProtocolEvents>::new();

// For all current Cert on processing
for (certificate_id, (certificate, state_to_delivery)) in &mut self.cert_candidate {
if !state_to_delivery.subscriptions.ready.is_empty()
&& (is_e_ready(
&self.params,
self.subscriptions.network_size,
state_to_delivery,
) || is_r_ready(
// Check whether we should send Ready
if !state_to_delivery.ready_sent
&& is_r_ready(
&self.params,
self.subscriptions.network_size,
state_to_delivery,
))
)
{
// Fanout the Ready messages to my subscribers
gen_evts.push(ProtocolEvents::Ready {
certificate_id: certificate.id,
ctx: state_to_delivery.ctx.clone(),
});
state_to_delivery.subscriptions.ready.clear();
state_to_delivery.subscriptions.echo.clear();

state_to_delivery.ready_sent = true;
state_modified = true;
}

if !state_to_delivery.subscriptions.delivery.is_empty()
// Check whether we should deliver
if !state_to_delivery.delivered
&& is_ok_to_deliver(
&self.params,
self.subscriptions.network_size,
state_to_delivery,
)
{
state_to_delivery.subscriptions.delivery.clear();
self.pending_delivery.insert(
*certificate_id,
(certificate.clone(), state_to_delivery.ctx.clone()),
);
state_to_delivery.delivered = true;
state_modified = true;
}

Expand All @@ -602,28 +600,23 @@ impl DoubleEcho {
let delivery_missing = self
.subscriptions
.network_size
.checked_sub(state_to_delivery.subscriptions.delivery.len())
.checked_sub(state_to_delivery.subscriptions.ready.len())
.map(|consumed| self.params.delivery_threshold.saturating_sub(consumed))
.unwrap_or(0);

debug!("Waiting for {echo_missing} Echo from the E-Sample");
trace!("Echo Sample: {:?}", state_to_delivery.subscriptions.echo);

debug!("Waiting for {ready_missing} Ready from the R-Sample");
trace!("Ready Sample: {:?}", state_to_delivery.subscriptions.ready);

debug!("Waiting for {delivery_missing} Ready from the D-Sample");
trace!(
"Delivery Sample: {:?}",
state_to_delivery.subscriptions.delivery
debug!(
"Waiting for {ready_missing}-R and {delivery_missing}-D Ready from the R-Sample"
);
trace!("Ready Sample: {:?}", state_to_delivery.subscriptions.ready);
}

if state_modified {
// Keep the candidates only if not delivered, or not (E|R)-Ready yet
self.cert_candidate.retain(|c, (_, state)| {
!self.pending_delivery.contains_key(c) || !state.subscriptions.ready.is_empty()
});
self.cert_candidate
.retain(|_, (_, state)| !state.delivered || !state.ready_sent);

let delivered_certificates = self
.pending_delivery
Expand Down Expand Up @@ -685,36 +678,36 @@ impl DoubleEcho {
}
}

// state checkers
/// Predicate on whether we reached the threshold to deliver the Certificate
fn is_ok_to_deliver(
params: &ReliableBroadcastParams,
network_size: usize,
state: &DeliveryState,
) -> bool {
match network_size.checked_sub(state.subscriptions.delivery.len()) {
// If reached the delivery threshold, I can deliver
match network_size.checked_sub(state.subscriptions.ready.len()) {
Some(consumed) => consumed >= params.delivery_threshold,
None => false,
}
}

fn is_e_ready(
/// Predicate on whether we reached the threshold to send our Ready for this Certificate
fn is_r_ready(
params: &ReliableBroadcastParams,
network_size: usize,
state: &DeliveryState,
) -> bool {
match network_size.checked_sub(state.subscriptions.echo.len()) {
// Compute the threshold
let reached_echo_threshold = match network_size.checked_sub(state.subscriptions.echo.len()) {
Some(consumed) => consumed >= params.echo_threshold,
None => false,
}
}
};

fn is_r_ready(
params: &ReliableBroadcastParams,
network_size: usize,
state: &DeliveryState,
) -> bool {
match network_size.checked_sub(state.subscriptions.ready.len()) {
let reached_ready_threshold = match network_size.checked_sub(state.subscriptions.ready.len()) {
Some(consumed) => consumed >= params.ready_threshold,
None => false,
}
};

// If reached any of the Echo or Ready thresholds, I send the Ready
reached_echo_threshold || reached_ready_threshold
}
1 change: 0 additions & 1 deletion crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl ReliableBroadcastClient {
.send(SubscriptionsView {
echo: set.clone(),
ready: set.clone(),
delivery: set.clone(),
network_size: set.len(),
})
.await
Expand Down
27 changes: 12 additions & 15 deletions crates/topos-tce-broadcast/src/sampler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
use std::collections::HashSet;
use topos_p2p::PeerId;

// These are all the same in the new, deterministic view (Subscriptions == Subscribers)
/// Categorize what we expect from which peer for the broadcast
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub enum SampleType {
/// Inbound: FROM external peer TO me
/// Message from those I am following
/// Listen Echo from this Sample
EchoSubscription,
/// Listen Ready from this Sample
ReadySubscription,
DeliverySubscription,

/// Outbound: FROM me TO external peer
/// Message to my followers
/// Send Echo to this Sample
EchoSubscriber,
/// Send Ready to this Sample
ReadySubscriber,
}

// Need to separate echo and ready (echo removes it from echo set, ready removes it from ready and delivery set)
// TODO: HashSet is all participants, once I receive echo | ready | delivery, I remove it to get to the threshold
// Maybe structure for keeping track of different counters
/// Stateful network view with whom we broadcast the Certificate
/// The Echo and the Ready sets are initially equal to the whole network
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct SubscriptionsView {
// All have the same peers (whole network) initially
/// Set of Peer from which we listen Echo
pub echo: HashSet<PeerId>,
/// Set of Peer from which we listen Ready
pub ready: HashSet<PeerId>,
pub delivery: HashSet<PeerId>,
/// Size of the network
pub network_size: usize,
}

Expand All @@ -34,15 +32,14 @@ impl SubscriptionsView {
}

pub fn is_none(&self) -> bool {
self.echo.is_empty() && self.ready.is_empty() && self.delivery.is_empty()
self.echo.is_empty() && self.ready.is_empty()
}

/// Current view of subscriptions of the node, which is the whole network
/// Current view of subscriptions of the node, which is initially the whole network
pub fn get_subscriptions(&self) -> Vec<PeerId> {
self.echo
.iter()
.chain(self.ready.iter())
.chain(self.delivery.iter())
.cloned()
.collect::<HashSet<_>>()
.into_iter()
Expand Down
7 changes: 1 addition & 6 deletions crates/topos-tce-broadcast/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ fn create_context(params: TceParams) -> (DoubleEcho, Context) {
// Subscriptions
double_echo.subscriptions.echo = peers.clone();
double_echo.subscriptions.ready = peers.clone();
double_echo.subscriptions.delivery = peers.clone();
double_echo.subscriptions.network_size = params.nb_peers;

(
Expand Down Expand Up @@ -145,7 +144,7 @@ fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) {
fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) {
let selected = double_echo
.subscriptions
.delivery
.ready
.iter()
.take(double_echo.params.delivery_threshold)
.cloned()
Expand Down Expand Up @@ -304,10 +303,6 @@ async fn process_after_delivery_until_sending_ready(#[case] params: TceParams) {
));

// Trigger Delivery upon reaching the Delivery threshold

// NOTE: Samples are done with replacement
// Reaching the D-threshold may bring the R-threshold reached
// If the overlap between R and D is greater than R-threshold
reach_delivery_threshold(&mut double_echo, &dummy_cert);
double_echo.state_change_follow_up();

Expand Down
Loading