Skip to content

Commit

Permalink
chore: refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jul 4, 2023
1 parent 6c9c803 commit 7db2903
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 202 deletions.
232 changes: 169 additions & 63 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::Errors;
use crate::{sampler::SampleType, tce_store::TceStore, DoubleEchoCommand, SubscriptionsView};
use crate::constant;
use crate::{sampler::SampleType, DoubleEchoCommand, SubscriptionsView};
use opentelemetry::trace::TraceContextExt;
use std::collections::HashSet;
use std::{
Expand All @@ -15,12 +15,8 @@ use topos_metrics::{
DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT, DOUBLE_ECHO_BUFFER_CAPACITY,
DOUBLE_ECHO_CURRENT_BUFFER_SIZE,
};
use topos_p2p::Client as NetworkClient;
use topos_p2p::PeerId;
use topos_tce_storage::{PendingCertificateId, StorageClient};
use tracing::{
debug, error, info, info_span, instrument, trace, warn, warn_span, Instrument, Span,
};
use tracing::{debug, error, info, info_span, instrument, trace, warn, warn_span, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Processing data associated to a Certificate candidate for delivery
Expand Down Expand Up @@ -52,21 +48,21 @@ pub struct DoubleEcho {

pub(crate) params: ReliableBroadcastParams,

// Current certificates being processed
cert_candidate: HashMap<CertificateId, (Certificate, DeliveryState)>,

// Span tracker for each certificate
span_tracker: HashMap<CertificateId, Span>,

// Delivery time for each certificate (for metrics)
delivery_time: HashMap<CertificateId, (time::SystemTime, time::Duration)>,
pub(crate) subscriptions: SubscriptionsView, // My subscriptions for echo, ready and delivery feedback
buffer: VecDeque<(bool, Certificate)>,
pub(crate) shutdown: mpsc::Receiver<oneshot::Sender<()>>,

pub(crate) subscriptions: SubscriptionsView, // My subscriptions for echo, ready and delivery feedback

local_peer_id: String,

// Buffer of messages to be processed once the certificate payload is received
buffered_messages: HashMap<CertificateId, Vec<DoubleEchoCommand>>,
max_buffer_size: usize,
}

impl DoubleEcho {
Expand All @@ -78,9 +74,6 @@ impl DoubleEcho {
command_receiver: mpsc::Receiver<DoubleEchoCommand>,
subscriptions_view_receiver: mpsc::Receiver<SubscriptionsView>,
event_sender: broadcast::Sender<ProtocolEvents>,
store: Box<dyn TceStore + Send>,
storage: StorageClient,
network_client: NetworkClient,
shutdown: mpsc::Receiver<oneshot::Sender<()>>,
local_peer_id: String,
pending_certificate_count: u64,
Expand All @@ -91,9 +84,6 @@ impl DoubleEcho {
command_receiver,
subscriptions_view_receiver,
event_sender,
store,
storage,
network_client,
cert_candidate: Default::default(),
span_tracker: Default::default(),
delivery_time: Default::default(),
Expand All @@ -107,6 +97,13 @@ impl DoubleEcho {
}
}

/// DoubleEcho main loop
/// - Listen for shutdown signal
/// - Read new messages from command_receiver
/// - If a new certificate is received, add it to the buffer
/// - If a new subscription view is received, update the subscriptions
/// - If a new Echo/Ready is received, update the state of the certificate or buffer
/// the message
pub(crate) async fn run(mut self) {
info!("DoubleEcho started");

Expand Down Expand Up @@ -175,7 +172,7 @@ impl DoubleEcho {
.send(ProtocolEvents::CertificateDelivered { certificate: cert });
}
#[cfg(not(feature = "direct"))]
self.handle_broadcast(cert, need_gossip);
self.broadcast(cert, need_gossip);

if let Some(messages) = self.buffered_messages.remove(&cert_id) {
for message in messages {
Expand Down Expand Up @@ -204,7 +201,7 @@ impl DoubleEcho {
};

let _enter = span.enter();
self.handle_echo(from_peer, &certificate_id);
self.consume_echo(from_peer, &certificate_id);
self.state_change_follow_up();
}
DoubleEchoCommand::Ready {
Expand All @@ -230,7 +227,7 @@ impl DoubleEcho {
};

let _enter = span.enter();
self.handle_ready(from_peer, &certificate_id);
self.consume_ready(from_peer, &certificate_id);

self.state_change_follow_up();
}
Expand Down Expand Up @@ -266,34 +263,163 @@ impl DoubleEcho {
};
}

pub(crate) fn handle_echo(&mut self, from_peer: PeerId, certificate_id: &CertificateId) {
pub(crate) fn handle_ready(
&mut self,
from_peer: PeerId,
certificate_id: CertificateId,
ctx: Span,
) {
let cert_delivered = self.delivered_certificates.get(&certificate_id).is_some();
if !cert_delivered {
if self.known_certificates.get(&certificate_id).is_some() {
let span = if let Some(root) = self.span_tracker.get(&certificate_id) {
info_span!(
parent: root,
"RECV Inbound Ready",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
} else {
info_span!(
"RECV Inbound Ready",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
};

let _enter = span.enter();
debug!(
"Handling DoubleEchoCommand::Ready from_peer: {} cert_id: {}",
&from_peer, &certificate_id
);

self.consume_ready(from_peer, &certificate_id);

self.state_change_follow_up();
drop(_enter);
// need to deliver the certificate
} else if self.delivered_certificates.get(&certificate_id).is_none() {
// need to buffer the Ready
self.buffered_messages
.entry(certificate_id)
.or_default()
.push(DoubleEchoCommand::Ready {
from_peer,
certificate_id,
ctx,
});
DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.inc();
}
}
}
pub(crate) fn handle_echo(
&mut self,
from_peer: PeerId,
certificate_id: CertificateId,
ctx: Span,
) {
let cert_delivered = self.delivered_certificates.get(&certificate_id).is_some();
if !cert_delivered {
if self.known_certificates.get(&certificate_id).is_some() {
let span = if let Some(root) = self.span_tracker.get(&certificate_id) {
info!("DEBUG::Receive ECHO with root");
info_span!(
parent: root,
"RECV Inbound Echo",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
} else {
info!("DEBUG::Receive ECHO without root");
info_span!(
"RECV Inbound Echo",
peer = self.local_peer_id,
certificate_id = certificate_id.to_string()
)
};

let _enter = span.enter();
debug!(
"Handling DoubleEchoCommand::Echo from_peer: {} cert_id: {}",
&from_peer, certificate_id
);
self.consume_echo(from_peer, &certificate_id);

self.state_change_follow_up();
drop(_enter);
// need to deliver the certificate
} else if self.delivered_certificates.get(&certificate_id).is_none() {
info!("DEBUG::Receive ECHO BUFFERING");
// need to buffer the Echo
self.buffered_messages
.entry(certificate_id)
.or_default()
.push(DoubleEchoCommand::Echo {
from_peer,
certificate_id,
ctx,
});
DOUBLE_ECHO_BUFFERED_MESSAGE_COUNT.inc();
}
}
}
pub(crate) fn consume_echo(&mut self, from_peer: PeerId, certificate_id: &CertificateId) {
if let Some((_certificate, state)) = self.cert_candidate.get_mut(certificate_id) {
Self::sample_consume_peer(&from_peer, state, SampleType::EchoSubscription);
}
}

pub(crate) fn handle_ready(&mut self, from_peer: PeerId, certificate_id: &CertificateId) {
pub(crate) fn consume_ready(&mut self, from_peer: PeerId, certificate_id: &CertificateId) {
if let Some((_certificate, state)) = self.cert_candidate.get_mut(certificate_id) {
Self::sample_consume_peer(&from_peer, state, SampleType::ReadySubscription);
}
}

#[cfg_attr(feature = "direct", allow(dead_code))]
pub(crate) fn handle_broadcast(&mut self, cert: Certificate, origin: bool) {
info!("🙌 Starting broadcasting the Certificate {}", &cert.id);

self.dispatch(cert, origin);
}

pub(crate) fn handle_deliver(&mut self, cert: Certificate) {
self.dispatch(cert, false)
pub(crate) fn handle_broadcast(&mut self, cert: Certificate, need_gossip: bool, ctx: Span) {
if !self.known_certificates.contains(&cert.id) {
let span = warn_span!(
"Broadcast",
peer_id = self.local_peer_id,
certificate_id = cert.id.to_string()
);
DOUBLE_ECHO_BROADCAST_CREATED.inc();
span.in_scope(|| {
warn!("Broadcast registered for {}", cert.id);
self.span_tracker.insert(cert.id, span.clone());
CERTIFICATE_RECEIVED.inc();

if need_gossip {
CERTIFICATE_RECEIVED_FROM_API.inc();
} else {
CERTIFICATE_RECEIVED_FROM_GOSSIP.inc();
}
});
span.add_link(ctx.context().span().span_context().clone());

self.known_certificates.insert(cert.id);
span.in_scope(|| {
info!("Certificate {} added to pending storage", cert.id);
debug!("DoubleEchoCommand::Broadcast certificate_id: {}", cert.id);
if self.buffer.len() < *constant::TOPOS_DOUBLE_ECHO_MAX_BUFFER_SIZE {
self.buffer.push_back((need_gossip, cert));
DOUBLE_ECHO_CURRENT_BUFFER_SIZE.inc();
} else {
DOUBLE_ECHO_BUFFER_CAPACITY.inc();
// Adding one to the pending_certificate_count because we
// can't buffer it right now
_ = self.pending_certificate_count.checked_add(1);
}
});
}
}

/// Called to process potentially new certificate:
/// - either submitted from API ( [tce_transport::TceCommands::Broadcast] command)
/// - or received through the gossip (first step of protocol exchange)
#[instrument(skip_all)]
pub(crate) fn dispatch(&mut self, cert: Certificate, origin: bool) {
pub(crate) fn broadcast(&mut self, cert: Certificate, origin: bool) {
info!("🙌 Starting broadcasting the Certificate {}", &cert.id);

if self.cert_pre_broadcast_check(&cert).is_err() {
error!("Failure on the pre-check for the Certificate {}", &cert.id);
self.event_sender
Expand All @@ -313,7 +439,7 @@ impl DoubleEcho {
return;
}

if self.store.cert_by_id(&cert.id).is_ok() {
if self.delivered_certificates.get(&cert.id).is_some() {
self.event_sender
.send(ProtocolEvents::AlreadyDelivered {
certificate_id: cert.id,
Expand Down Expand Up @@ -412,9 +538,10 @@ impl DoubleEcho {
debug!("StateChangeFollowUp called");
let mut state_modified = false;
let mut gen_evts = Vec::<ProtocolEvents>::new();
let mut delivered_certificates = Vec::<(Certificate, Span)>::new();

// For all current Cert on processing
for (certificate_id, (certificate, state_to_delivery)) in &mut self.cert_candidate {
for (_certificate_id, (certificate, state_to_delivery)) in &mut self.cert_candidate {
// Check whether we should send Ready
if !state_to_delivery.ready_sent
&& is_r_ready(
Expand All @@ -441,10 +568,7 @@ impl DoubleEcho {
state_to_delivery,
)
{
self.pending_delivery.insert(
*certificate_id,
(certificate.clone(), state_to_delivery.ctx.clone()),
);
delivered_certificates.push((certificate.clone(), state_to_delivery.ctx.clone()));
state_to_delivery.delivered = true;
state_modified = true;
}
Expand Down Expand Up @@ -482,40 +606,27 @@ impl DoubleEcho {
self.cert_candidate
.retain(|_, (_, state)| !state.delivered || !state.ready_sent);

let delivered_certificates = self
.pending_delivery
.iter()
.filter(|(_, (c, _))| self.cert_post_delivery_check(c).is_ok())
.map(|(c, s)| (*c, s.clone()))
.collect::<HashMap<_, _>>();

for (certificate_id, (certificate, ctx)) in delivered_certificates {
for (certificate, ctx) in delivered_certificates {
let span = info_span!(parent: &ctx, "Delivered");

span.in_scope(|| {
let mut d = time::Duration::from_millis(0);
if let Some((from, duration)) = self.delivery_time.get_mut(&certificate_id) {
if let Some((from, duration)) = self.delivery_time.get_mut(&certificate.id) {
*duration = from.elapsed().unwrap();
d = *duration;

info!("Certificate {} got delivered in {:?}", certificate_id, d);
info!("Certificate {} got delivered in {:?}", certificate.id, d);
}
self.pending_delivery.remove(&certificate_id);
self.cert_candidate.remove(&certificate_id);
self.span_tracker.remove(&certificate_id);
self.cert_candidate.remove(&certificate.id);
self.span_tracker.remove(&certificate.id);

debug!("📝 Accepted[{}]\t Delivery time: {:?}", &certificate_id, d);
debug!("📝 Accepted[{}]\t Delivery time: {:?}", &certificate.id, d);

DOUBLE_ECHO_BROADCAST_FINISHED.inc();
self.delivered_certificates.insert(certificate.id);
_ = self
.event_sender
.send(ProtocolEvents::CertificateDelivered {
certificate: certificate.clone(),
});

self.store
.add_cert_in_hist(&certificate.source_subnet_id, &certificate);
.send(ProtocolEvents::CertificateDelivered { certificate });
});
}
}
Expand All @@ -537,11 +648,6 @@ impl DoubleEcho {

Ok(())
}

/// Here comes test that is necessarily done after delivery
fn cert_post_delivery_check(&self, _cert: &Certificate) -> Result<(), ()> {
Ok(())
}
}

/// Predicate on whether we reached the threshold to deliver the Certificate
Expand Down
Loading

0 comments on commit 7db2903

Please sign in to comment.