Skip to content

Commit

Permalink
chore: update double echo
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jul 3, 2023
1 parent 967b128 commit 594ccc9
Showing 1 changed file with 127 additions and 127 deletions.
254 changes: 127 additions & 127 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct DeliveryState {

pub struct DoubleEcho {
pending_certificate_count: u64,
last_pending_certificate: PendingCertificateId,
// _last_pending_certificate: PendingCertificateId,
pub(crate) params: ReliableBroadcastParams,
command_receiver: mpsc::Receiver<DoubleEchoCommand>,
subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>,
Expand All @@ -56,6 +56,7 @@ pub struct DoubleEcho {
pub(crate) shutdown: mpsc::Receiver<oneshot::Sender<()>>,

known_certificates: HashSet<CertificateId>,
delivered_certificates: HashSet<CertificateId>,

local_peer_id: String,

Expand All @@ -77,13 +78,13 @@ impl DoubleEcho {
network_client: NetworkClient,
shutdown: mpsc::Receiver<oneshot::Sender<()>>,
local_peer_id: String,
last_pending_certificate: PendingCertificateId,
_last_pending_certificate: PendingCertificateId,
pending_certificate_count: u64,
max_buffer_size: usize,
) -> Self {
Self {
pending_certificate_count,
last_pending_certificate,
// last_pending_certificate,
params,
command_receiver,
subscriptions_view_receiver,
Expand All @@ -102,6 +103,7 @@ impl DoubleEcho {
buffered_messages: Default::default(),
max_buffer_size,
known_certificates: Default::default(),
delivered_certificates: Default::default(),
}
}

Expand Down Expand Up @@ -156,11 +158,11 @@ impl DoubleEcho {
});
span.add_link(ctx.context().span().span_context().clone());
let adding_timer = STORAGE_ADDING_PENDING_CERTIFICATE_LATENCY.start_timer();
let maybe_pending = self
.storage
.add_pending_certificate(cert.clone())
.instrument(span.clone())
.await;
// let maybe_pending = self
// .storage
// .add_pending_certificate(cert.clone())
// .instrument(span.clone())
// .await;
adding_timer.stop_and_record();

self.known_certificates.insert(cert.id);
Expand All @@ -170,9 +172,9 @@ impl DoubleEcho {
if self.buffer.len() < self.max_buffer_size {
self.buffer.push_back((need_gossip, cert));
DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc();
if let Ok(pending) = maybe_pending {
self.last_pending_certificate = pending;
}
// if let Ok(pending) = maybe_pending {
// self.last_pending_certificate = pending;
// }
} else {
DOUBLE_ECHO_BUFFER_CAPACITY.inc();
// Adding one to the pending_certificate_count because we
Expand Down Expand Up @@ -201,12 +203,9 @@ impl DoubleEcho {
match command {
DoubleEchoCommand::Echo { from_peer, certificate_id, ctx } => {
async {
let cert_delivered = self.store.cert_by_id(&certificate_id).is_ok();
let cert_delivered = self.delivered_certificates.get(&certificate_id).is_some();
if !cert_delivered {
if self.storage
.pending_certificate_exists(certificate_id)
.await
.is_ok()
if self.known_certificates.get(&certificate_id).is_some()
{
let span = if let Some(root) = self.span_tracker.get(&certificate_id) {
info!("DEBUG::Receive ECHO with root");
Expand Down Expand Up @@ -312,121 +311,121 @@ impl DoubleEcho {
};

#[cfg(not(feature = "direct"))]
let has_subscriptions = self.subscriptions.is_some();
let _has_subscriptions = self.subscriptions.is_some();

#[cfg(feature = "direct")]
let has_subscriptions = true;
let _has_subscriptions = true;

// Broadcast next certificate
if has_subscriptions {
// TODO: Remove the unused_variables attribute when the feature direct is removed
#[allow(unused_variables)]
if let Some((need_gossip, cert)) = self.buffer.pop_front() {
DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec();
if let Some(ctx) = self.span_tracker.get(&cert.id) {
let span = info_span!(
parent: ctx,
"DoubleEcho start dispatching",
certificate_id = cert.id.to_string(),
peer_id = self.local_peer_id,
"otel.kind" = "producer"
);
let _span = span.entered();

let cert_id = cert.id;
#[cfg(feature = "direct")]
{
_ = self
.event_sender
.send(ProtocolEvents::CertificateDelivered { certificate: cert });
}
#[cfg(not(feature = "direct"))]
self.handle_broadcast(cert, need_gossip);

if let Some(messages) = self.buffered_messages.remove(&cert_id) {
for message in messages {
DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec();
match message {
DoubleEchoCommand::Echo {
from_peer,
certificate_id,
..
} => {
let span = if let Some(root) =
self.span_tracker.get(&certificate_id)
{
info_span!(
parent: root,
"RECV Inbound Echo (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
} else {
info_span!(
"RECV Inbound Echo (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
};

let _enter = span.enter();
self.handle_echo(from_peer, &certificate_id);
self.state_change_follow_up();
}
DoubleEchoCommand::Ready {
from_peer,
certificate_id,
..
} => {
let span = if let Some(root) =
self.span_tracker.get(&certificate_id)
{
info_span!(
parent: root,
"RECV Inbound Ready (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
} else {
info_span!(
"RECV Inbound Ready (Buffered)",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
};

let _enter = span.enter();
self.handle_ready(from_peer, &certificate_id);

self.state_change_follow_up();
}
_ => {}
}
}
}
} else {
warn!(
"No span found for certificate id: {} {:?}",
cert.id, cert.id
);
}

// if self.pending_certificate_count > 0 {
// if let Ok(Some((pending, certificate))) = self
// .storage
// .next_pending_certificate(Some(self.last_pending_certificate as usize))
// .await
// {
// _ = self.pending_certificate_count.checked_sub(1);
// self.last_pending_certificate = pending;
// self.buffer.push_back((false, certificate));
// DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc();
// } else {
// info!("No more certificate to broadcast");
// }
// }
}
}
// if has_subscriptions {
// // TODO: Remove the unused_variables attribute when the feature direct is removed
// #[allow(unused_variables)]
// if let Some((need_gossip, cert)) = self.buffer.pop_front() {
// DOUBLE_ECHO_CURRENT_BUFFER_SIZE.dec();
// if let Some(ctx) = self.span_tracker.get(&cert.id) {
// let span = info_span!(
// parent: ctx,
// "DoubleEcho start dispatching",
// certificate_id = cert.id.to_string(),
// peer_id = self.local_peer_id,
// "otel.kind" = "producer"
// );
// let _span = span.entered();
//
// let cert_id = cert.id;
// #[cfg(feature = "direct")]
// {
// _ = self
// .event_sender
// .send(ProtocolEvents::CertificateDelivered { certificate: cert });
// }
// #[cfg(not(feature = "direct"))]
// self.handle_broadcast(cert, need_gossip);
//
// if let Some(messages) = self.buffered_messages.remove(&cert_id) {
// for message in messages {
// DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.dec();
// match message {
// DoubleEchoCommand::Echo {
// from_peer,
// certificate_id,
// ..
// } => {
// let span = if let Some(root) =
// self.span_tracker.get(&certificate_id)
// {
// info_span!(
// parent: root,
// "RECV Inbound Echo (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// } else {
// info_span!(
// "RECV Inbound Echo (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// };
//
// let _enter = span.enter();
// self.handle_echo(from_peer, &certificate_id);
// self.state_change_follow_up();
// }
// DoubleEchoCommand::Ready {
// from_peer,
// certificate_id,
// ..
// } => {
// let span = if let Some(root) =
// self.span_tracker.get(&certificate_id)
// {
// info_span!(
// parent: root,
// "RECV Inbound Ready (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// } else {
// info_span!(
// "RECV Inbound Ready (Buffered)",
// peer = self.local_peer_id,
// certificate_id = certificate_id.to_string()
// )
// };
//
// let _enter = span.enter();
// self.handle_ready(from_peer, &certificate_id);
//
// self.state_change_follow_up();
// }
// _ => {}
// }
// }
// }
// } else {
// warn!(
// "No span found for certificate id: {} {:?}",
// cert.id, cert.id
// );
// }
//
// // if self.pending_certificate_count > 0 {
// // if let Ok(Some((pending, certificate))) = self
// // .storage
// // .next_pending_certificate(Some(self.last_pending_certificate as usize))
// // .await
// // {
// // _ = self.pending_certificate_count.checked_sub(1);
// // self.last_pending_certificate = pending;
// // self.buffer.push_back((false, certificate));
// // DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc();
// // } else {
// // info!("No more certificate to broadcast");
// // }
// // }
// }
// }
};

if let Some(sender) = shutdowned {
Expand Down Expand Up @@ -460,6 +459,7 @@ impl DoubleEcho {
}

#[cfg_attr(feature = "direct", allow(dead_code))]
#[allow(dead_code)]
pub(crate) fn handle_broadcast(&mut self, cert: Certificate, origin: bool) {
info!("🙌 Starting broadcasting the Certificate {}", &cert.id);

Expand Down

0 comments on commit 594ccc9

Please sign in to comment.