Skip to content

Commit

Permalink
chore: update double echo
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jul 3, 2023
1 parent 594ccc9 commit dfadab6
Showing 1 changed file with 128 additions and 112 deletions.
240 changes: 128 additions & 112 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,26 @@ impl DoubleEcho {
}

pub(crate) async fn run(mut self) {
// DoubleEcho main loop
// - On each tick, check if there is certificate in the buffer
// - If yes, stop the loop and broadcast them
// - Listen for shutdown signal
// - Read new messages from command_receiver
// - If a new certificate is received, add it to the buffer
// - If a new subscription view is received, update the subscriptions
// - If a new Echo/Ready is received, update the state of the certificate or buffer
// the message

info!("DoubleEcho started");

let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
let shutdowned: Option<oneshot::Sender<()>> = loop {
tokio::select! {
_ = interval.tick() => {
if self.buffer.len() > 0 {
break None;
}
},
shutdown = self.shutdown.recv() => {
warn!("Double echo shutdown signal received {:?}", shutdown);
break shutdown;
Expand Down Expand Up @@ -311,121 +327,121 @@ impl DoubleEcho {
};

#[cfg(not(feature = "direct"))]
let _has_subscriptions = self.subscriptions.is_some();
let has_subscriptions = self.subscriptions.is_some();

#[cfg(feature = "direct")]
let _has_subscriptions = true;
let has_subscriptions = true;

// Broadcast next certificate
// if has_subscriptions {
// // TODO: Remove the unused_variables attribute when the feature direct is removed
// #[allow(unused_variables)]
// if let Some((need_gossip, cert)) = self.buffer.pop_front() {
// DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec();
// if let Some(ctx) = self.span_tracker.get(&cert.id) {
// let span = info_span!(
// parent: ctx,
// "DoubleEcho start dispatching",
// certificate_id = cert.id.to_string(),
// peer_id = self.local_peer_id,
// "otel.kind" = "producer"
// );
// let _span = span.entered();
//
// let cert_id = cert.id;
// #[cfg(feature = "direct")]
// {
// _ = self
// .event_sender
// .send(ProtocolEvents::CertificateDelivered { certificate: cert });
// }
// #[cfg(not(feature = "direct"))]
// self.handle_broadcast(cert, need_gossip);
//
// if let Some(messages) = self.buffered_messages.remove(&cert_id) {
// for message in messages {
// DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec();
// match message {
// DoubleEchoCommand::Echo {
// from_peer,
// certificate_id,
// ..
// } => {
// let span = if let Some(root) =
// self.span_tracker.get(&certificate_id)
// {
// info_span!(
// parent: root,
// "RECV Inbound Echo (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// } else {
// info_span!(
// "RECV Inbound Echo (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// };
//
// let _enter = span.enter();
// self.handle_echo(from_peer, &certificate_id);
// self.state_change_follow_up();
// }
// DoubleEchoCommand::Ready {
// from_peer,
// certificate_id,
// ..
// } => {
// let span = if let Some(root) =
// self.span_tracker.get(&certificate_id)
// {
// info_span!(
// parent: root,
// "RECV Inbound Ready (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// } else {
// info_span!(
// "RECV Inbound Ready (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// };
//
// let _enter = span.enter();
// self.handle_ready(from_peer, &certificate_id);
//
// self.state_change_follow_up();
// }
// _ => {}
// }
// }
// }
// } else {
// warn!(
// "No span found for certificate id: {} {:?}",
// cert.id, cert.id
// );
// }
//
// // if self.pending_certificate_count > 0 {
// // if let Ok(Some((pending, certificate))) = self
// // .storage
// // .next_pending_certificate(Some(self.last_pending_certificate as usize))
// // .await
// // {
// // _ = self.pending_certificate_count.checked_sub(1);
// // self.last_pending_certificate = pending;
// // self.buffer.push_back((false, certificate));
// // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc();
// // } else {
// // info!("No more certificate to broadcast");
// // }
// // }
// }
// }
if has_subscriptions {
// TODO: Remove the unused_variables attribute when the feature direct is removed
#[allow(unused_variables)]
if let Some((need_gossip, cert)) = self.buffer.pop_front() {
DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec();
if let Some(ctx) = self.span_tracker.get(&cert.id) {
let span = info_span!(
parent: ctx,
"DoubleEcho start dispatching",
certificate_id = cert.id.to_string(),
peer_id = self.local_peer_id,
"otel.kind" = "producer"
);
let _span = span.entered();

let cert_id = cert.id;
#[cfg(feature = "direct")]
{
_ = self
.event_sender
.send(ProtocolEvents::CertificateDelivered { certificate: cert });
}
#[cfg(not(feature = "direct"))]
self.handle_broadcast(cert, need_gossip);

if let Some(messages) = self.buffered_messages.remove(&cert_id) {
for message in messages {
DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec();
match message {
DoubleEchoCommand::Echo {
from_peer,
certificate_id,
..
} => {
let span = if let Some(root) =
self.span_tracker.get(&certificate_id)
{
info_span!(
parent: root,
"RECV Inbound Echo (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
} else {
info_span!(
"RECV Inbound Echo (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
};

let _enter = span.enter();
self.handle_echo(from_peer, &certificate_id);
self.state_change_follow_up();
}
DoubleEchoCommand::Ready {
from_peer,
certificate_id,
..
} => {
let span = if let Some(root) =
self.span_tracker.get(&certificate_id)
{
info_span!(
parent: root,
"RECV Inbound Ready (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
} else {
info_span!(
"RECV Inbound Ready (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
};

let _enter = span.enter();
self.handle_ready(from_peer, &certificate_id);

self.state_change_follow_up();
}
_ => {}
}
}
}
} else {
warn!(
"No span found for certificate id: {} {:?}",
cert.id, cert.id
);
}

// if self.pending_certificate_count > 0 {
// if let Ok(Some((pending, certificate))) = self
// .storage
// .next_pending_certificate(Some(self.last_pending_certificate as usize))
// .await
// {
// _ = self.pending_certificate_count.checked_sub(1);
// self.last_pending_certificate = pending;
// self.buffer.push_back((false, certificate));
// DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc();
// } else {
// info!("No more certificate to broadcast");
// }
// }
}
}
};

if let Some(sender) = shutdowned {
Expand Down Expand Up @@ -688,7 +704,7 @@ impl DoubleEcho {
debug!("📝 Accepted[{}]\t Delivery time: {:?}", &certificate_id, d);

DOUBLE_ECHO_BROADCAST_FINISHED.inc();

self.delivered_certificates.insert(certificate_id);
_ = self
.event_sender
.send(ProtocolEvents::CertificateDelivered {
Expand Down

0 comments on commit dfadab6

Please sign in to comment.