Skip to content

Commit

Permalink
refactor: simplify the state change followup
Browse files Browse the repository at this point in the history
  • Loading branch information
hadjiszs committed Jun 27, 2023
1 parent dc37d0b commit 50ef877
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 67 deletions.
89 changes: 41 additions & 48 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Clone)]
pub struct DeliveryState {
pub subscriptions: SubscriptionsView,
pub ready_sent: bool,
pub delivered: bool,
ctx: Span,
}

Expand Down Expand Up @@ -395,7 +397,6 @@ impl DoubleEcho {
match sample_type {
SampleType::EchoSubscription => state.subscriptions.echo.remove(from_peer),
SampleType::ReadySubscription => state.subscriptions.ready.remove(from_peer),
SampleType::DeliverySubscription => state.subscriptions.delivery.remove(from_peer),
_ => false,
};
}
Expand All @@ -409,7 +410,6 @@ impl DoubleEcho {
pub(crate) fn handle_ready(&mut self, from_peer: PeerId, certificate_id: &CertificateId) {
if let Some((_certificate, state)) = self.cert_candidate.get_mut(certificate_id) {
Self::sample_consume_peer(&from_peer, state, SampleType::ReadySubscription);
Self::sample_consume_peer(&from_peer, state, SampleType::DeliverySubscription);
}
}

Expand Down Expand Up @@ -465,9 +465,8 @@ impl DoubleEcho {
.unwrap_or_else(Span::current);

if origin {
warn!("📣 Gossipping the Certificate {}", &cert.id);
warn!("📣 Gossiping the Certificate {}", &cert.id);
let _ = self.event_sender.send(ProtocolEvents::Gossip {
// peers: gossip_peers, // considered as the G-set for erdos-renyi
cert: cert.clone(),
ctx: span,
});
Expand Down Expand Up @@ -509,7 +508,6 @@ impl DoubleEcho {
.unwrap_or_else(Span::current);

let _ = self.event_sender.send(ProtocolEvents::Echo {
// peers: echo_peers,
certificate_id: cert.id,
ctx,
});
Expand All @@ -521,17 +519,13 @@ impl DoubleEcho {
certificate_id: &CertificateId,
) -> Option<DeliveryState> {
let subscriptions = self.subscriptions.clone();
// check inbound sets are not empty

if subscriptions.echo.is_empty()
|| subscriptions.ready.is_empty()
|| subscriptions.delivery.is_empty()
{
// Check whether inbound sets are empty
if subscriptions.echo.is_empty() || subscriptions.ready.is_empty() {
error!(
"One Subscription sample is empty: Echo({}), Ready({}), Delivery({})",
"One Subscription sample is empty: Echo({}), Ready({})",
subscriptions.echo.is_empty(),
subscriptions.ready.is_empty(),
subscriptions.delivery.is_empty()
);
None
} else {
Expand All @@ -541,49 +535,53 @@ impl DoubleEcho {
.cloned()
.unwrap_or_else(Span::current);

Some(DeliveryState { subscriptions, ctx })
Some(DeliveryState {
subscriptions,
ready_sent: false,
delivered: false,
ctx,
})
}
}

pub(crate) fn state_change_follow_up(&mut self) {
debug!("StateChangeFollowUp called");
let mut state_modified = false;
let mut gen_evts = Vec::<ProtocolEvents>::new();

// For all current Cert on processing
for (certificate_id, (certificate, state_to_delivery)) in &mut self.cert_candidate {
if !state_to_delivery.subscriptions.ready.is_empty()
&& (is_e_ready(
&self.params,
self.subscriptions.network_size,
state_to_delivery,
) || is_r_ready(
// Check whether we should send Ready
if !state_to_delivery.ready_sent
&& is_r_ready(
&self.params,
self.subscriptions.network_size,
state_to_delivery,
))
)
{
// Fanout the Ready messages to my subscribers
gen_evts.push(ProtocolEvents::Ready {
certificate_id: certificate.id,
ctx: state_to_delivery.ctx.clone(),
});
state_to_delivery.subscriptions.ready.clear();
state_to_delivery.subscriptions.echo.clear();

state_to_delivery.ready_sent = true;
state_modified = true;
}

if !state_to_delivery.subscriptions.delivery.is_empty()
// Check whether we should deliver
if !state_to_delivery.delivered
&& is_ok_to_deliver(
&self.params,
self.subscriptions.network_size,
state_to_delivery,
)
{
state_to_delivery.subscriptions.delivery.clear();
self.pending_delivery.insert(
*certificate_id,
(certificate.clone(), state_to_delivery.ctx.clone()),
);
state_to_delivery.delivered = true;
state_modified = true;
}

Expand All @@ -602,28 +600,23 @@ impl DoubleEcho {
let delivery_missing = self
.subscriptions
.network_size
.checked_sub(state_to_delivery.subscriptions.delivery.len())
.checked_sub(state_to_delivery.subscriptions.ready.len())
.map(|consumed| self.params.delivery_threshold.saturating_sub(consumed))
.unwrap_or(0);

debug!("Waiting for {echo_missing} Echo from the E-Sample");
trace!("Echo Sample: {:?}", state_to_delivery.subscriptions.echo);

debug!("Waiting for {ready_missing} Ready from the R-Sample");
trace!("Ready Sample: {:?}", state_to_delivery.subscriptions.ready);

debug!("Waiting for {delivery_missing} Ready from the D-Sample");
trace!(
"Delivery Sample: {:?}",
state_to_delivery.subscriptions.delivery
debug!(
"Waiting for {ready_missing}-R and {delivery_missing}-D Ready from the R-Sample"
);
trace!("Ready Sample: {:?}", state_to_delivery.subscriptions.ready);
}

if state_modified {
// Keep the candidates only if not delivered, or not (E|R)-Ready yet
self.cert_candidate.retain(|c, (_, state)| {
!self.pending_delivery.contains_key(c) || !state.subscriptions.ready.is_empty()
});
self.cert_candidate
.retain(|_, (_, state)| !state.delivered || !state.ready_sent);

let delivered_certificates = self
.pending_delivery
Expand Down Expand Up @@ -685,36 +678,36 @@ impl DoubleEcho {
}
}

// state checkers
/// Predicate on whether we reached the threshold to deliver the Certificate
fn is_ok_to_deliver(
params: &ReliableBroadcastParams,
network_size: usize,
state: &DeliveryState,
) -> bool {
match network_size.checked_sub(state.subscriptions.delivery.len()) {
// If reached the delivery threshold, I can deliver
match network_size.checked_sub(state.subscriptions.ready.len()) {
Some(consumed) => consumed >= params.delivery_threshold,
None => false,
}
}

fn is_e_ready(
/// Predicate on whether we reached the threshold to send our Ready for this Certificate
fn is_r_ready(
params: &ReliableBroadcastParams,
network_size: usize,
state: &DeliveryState,
) -> bool {
match network_size.checked_sub(state.subscriptions.echo.len()) {
// Compute the threshold
let reached_echo_threshold = match network_size.checked_sub(state.subscriptions.echo.len()) {
Some(consumed) => consumed >= params.echo_threshold,
None => false,
}
}
};

fn is_r_ready(
params: &ReliableBroadcastParams,
network_size: usize,
state: &DeliveryState,
) -> bool {
match network_size.checked_sub(state.subscriptions.ready.len()) {
let reached_ready_threshold = match network_size.checked_sub(state.subscriptions.ready.len()) {
Some(consumed) => consumed >= params.ready_threshold,
None => false,
}
};

// If reached any of the Echo or Ready thresholds, I send the Ready
reached_echo_threshold || reached_ready_threshold
}
1 change: 0 additions & 1 deletion crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl ReliableBroadcastClient {
.send(SubscriptionsView {
echo: set.clone(),
ready: set.clone(),
delivery: set.clone(),
network_size: set.len(),
})
.await
Expand Down
17 changes: 5 additions & 12 deletions crates/topos-tce-broadcast/src/sampler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@ use topos_p2p::PeerId;
// These are all the same in the new, deterministic view (Subscriptions == Subscribers)
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
pub enum SampleType {
/// Inbound: FROM external peer TO me
/// Message from those I am following
/// Listen Echo from this Sample
EchoSubscription,
/// Listen Ready from this Sample
ReadySubscription,
DeliverySubscription,

/// Outbound: FROM me TO external peer
/// Message to my followers
/// Send Echo to this Sample
EchoSubscriber,
/// Send Ready to this Sample
ReadySubscriber,
}

// Need to separate echo and ready (echo removes it from echo set, ready removes it from ready and delivery set)
// TODO: HashSet is all participants, once I receive echo | ready | delivery, I remove it to get to the threshold
// Maybe structure for keeping track of different counters
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct SubscriptionsView {
// All have the same peers (whole network) initially
pub echo: HashSet<PeerId>,
pub ready: HashSet<PeerId>,
pub delivery: HashSet<PeerId>,
pub network_size: usize,
}

Expand All @@ -34,15 +28,14 @@ impl SubscriptionsView {
}

pub fn is_none(&self) -> bool {
self.echo.is_empty() && self.ready.is_empty() && self.delivery.is_empty()
self.echo.is_empty() && self.ready.is_empty()
}

/// Current view of subscriptions of the node, which is the whole network
pub fn get_subscriptions(&self) -> Vec<PeerId> {
self.echo
.iter()
.chain(self.ready.iter())
.chain(self.delivery.iter())
.cloned()
.collect::<HashSet<_>>()
.into_iter()
Expand Down
7 changes: 1 addition & 6 deletions crates/topos-tce-broadcast/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ fn create_context(params: TceParams) -> (DoubleEcho, Context) {
// Subscriptions
double_echo.subscriptions.echo = peers.clone();
double_echo.subscriptions.ready = peers.clone();
double_echo.subscriptions.delivery = peers.clone();
double_echo.subscriptions.network_size = params.nb_peers;

(
Expand Down Expand Up @@ -145,7 +144,7 @@ fn reach_ready_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) {
fn reach_delivery_threshold(double_echo: &mut DoubleEcho, cert: &Certificate) {
let selected = double_echo
.subscriptions
.delivery
.ready
.iter()
.take(double_echo.params.delivery_threshold)
.cloned()
Expand Down Expand Up @@ -304,10 +303,6 @@ async fn process_after_delivery_until_sending_ready(#[case] params: TceParams) {
));

// Trigger Delivery upon reaching the Delivery threshold

// NOTE: Samples are done with replacement
// Reaching the D-threshold may bring the R-threshold reached
// If the overlap between R and D is greater than R-threshold
reach_delivery_threshold(&mut double_echo, &dummy_cert);
double_echo.state_change_follow_up();

Expand Down

0 comments on commit 50ef877

Please sign in to comment.