Skip to content

Commit

Permalink
feat: cleanup app_context
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Jul 4, 2023
1 parent 4e911bb commit 72d8673
Show file tree
Hide file tree
Showing 11 changed files with 523 additions and 447 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ topos-crypto = { path = "./crates/topos-crypto", default-features = false }

# Various utility crates
clap = { version = "4.0", features = ["derive", "env", "string"] }
lazy_static = "1.4"
rand = { version = "0.8", default-features = false }
rand_core = { version = "0.6", default-features = false }
rand_distr = { version = "0.4", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async-trait.workspace = true
bincode.workspace = true
clap.workspace = true
futures.workspace = true
lazy_static.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
432 changes: 11 additions & 421 deletions crates/topos-tce/src/app_context.rs

Large diffs are not rendered by default.

144 changes: 144 additions & 0 deletions crates/topos-tce/src/app_context/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::collections::HashMap;

use crate::{events::Events, AppContext};
use opentelemetry::trace::FutureExt;
use tokio::spawn;
use topos_core::uci::{Certificate, SubnetId};
use topos_tce_api::{RuntimeError, RuntimeEvent as Event};
use topos_tce_gatekeeper::GatekeeperError;
use topos_tce_storage::errors::{InternalStorageError, StorageError};
use tracing::{error, info, info_span, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::metrics::{CERTIFICATE_RECEIVED, CERTIFICATE_RECEIVED_FROM_API};

impl AppContext {
pub async fn on_api_event(&mut self, event: Event) {
match event {
Event::CertificateSubmitted {
certificate,
sender,
ctx,
} => {
let span = info_span!(parent: &ctx, "TCE Runtime");

_ = self
.tce_cli
.broadcast_new_certificate(*certificate, true)
.with_context(span.context())
.instrument(span)
.await;

CERTIFICATE_RECEIVED.inc();
CERTIFICATE_RECEIVED_FROM_API.inc();
_ = sender.send(Ok(()));
}

Event::PeerListPushed { peers, sender } => {
let sampler = self.tce_cli.clone();
let gatekeeper = self.gatekeeper.clone();
let events = self.events.clone();
let api = self.api_client.clone();

spawn(async move {
match gatekeeper.push_peer_list(peers).await {
Ok(peers) => {
info!("Gatekeeper has detected changes on the peer list, new sample in creation");
if sampler.peer_changed(peers).await.is_err() {
_ = sender.send(Err(RuntimeError::UnableToPushPeerList));
} else {
api.set_active_sample(true).await;
if events.send(Events::StableSample).await.is_err() {
error!("Unable to send StableSample event");
}
_ = sender.send(Ok(()));
}
}
Err(GatekeeperError::NoUpdate) => {
_ = sender.send(Ok(()));
}
Err(_) => {
_ = sender.send(Err(RuntimeError::UnableToPushPeerList));
}
}
});
}

Event::GetSourceHead { subnet_id, sender } => {
// Get source head certificate
let mut result = self
.pending_storage
.get_source_head(subnet_id)
.await
.map_err(|e| match e {
StorageError::InternalStorage(internal) => {
if let InternalStorageError::MissingHeadForSubnet(subnet_id) = internal
{
RuntimeError::UnknownSubnet(subnet_id)
} else {
RuntimeError::UnableToGetSourceHead(subnet_id, internal.to_string())
}
}
e => RuntimeError::UnableToGetSourceHead(subnet_id, e.to_string()),
});

// TODO: Initial genesis certificate eventually will be fetched from the topos subnet
// Currently, for subnet starting from scratch there are no certificates in the database
// So for MissingHeadForSubnet error we will return some default dummy certificate
if let Err(RuntimeError::UnknownSubnet(subnet_id)) = result {
warn!("Returning dummy certificate as head certificate, to be fixed...");
result = Ok((
0,
topos_core::uci::Certificate {
prev_id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID,
source_subnet_id: subnet_id,
state_root: Default::default(),
tx_root_hash: Default::default(),
target_subnets: vec![],
verifier: 0,
id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID,
proof: Default::default(),
signature: Default::default(),
},
));
};

_ = sender.send(result);
}

Event::GetLastPendingCertificates {
mut subnet_ids,
sender,
} => {
let mut last_pending_certificates: HashMap<SubnetId, Option<Certificate>> =
subnet_ids
.iter()
.map(|subnet_id| (*subnet_id, None))
.collect();

if let Ok(pending_certificates) =
self.pending_storage.get_pending_certificates().await
{
// Iterate through pending certificates and determine last one for every subnet
// Last certificate in the subnet should be one with the highest index
for (pending_certificate_id, cert) in pending_certificates.into_iter().rev() {
if let Some(subnet_id) = subnet_ids.take(&cert.source_subnet_id) {
*last_pending_certificates.entry(subnet_id).or_insert(None) =
Some(cert);
}
if subnet_ids.is_empty() {
break;
}
}
}

// Add None pending certificate for any other requested subnet_id
subnet_ids.iter().for_each(|subnet_id| {
last_pending_certificates.insert(*subnet_id, None);
});

_ = sender.send(Ok(last_pending_certificates));
}
}
}
}
36 changes: 36 additions & 0 deletions crates/topos-tce/src/app_context/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use serde::{Deserialize, Serialize};
use tce_transport::TceCommands;

/// Definition of networking payload.
///
/// We assume that only Commands will go through the network,
/// [Response] is used to allow reporting of logic errors to the caller.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum NetworkMessage {
Cmd(TceCommands),
Bulk(Vec<TceCommands>),

NotReady(topos_p2p::NotReadyMessage),
}

// deserializer
impl From<Vec<u8>> for NetworkMessage {
fn from(data: Vec<u8>) -> Self {
bincode::deserialize::<NetworkMessage>(data.as_ref()).expect("msg deser")
}
}

// serializer
impl From<NetworkMessage> for Vec<u8> {
fn from(msg: NetworkMessage) -> Self {
bincode::serialize::<NetworkMessage>(&msg).expect("msg ser")
}
}

// transformer of protocol commands into network commands
impl From<TceCommands> for NetworkMessage {
fn from(cmd: TceCommands) -> Self {
Self::Cmd(cmd)
}
}
122 changes: 122 additions & 0 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::{
app_context::message::NetworkMessage,
metrics::{CERTIFICATE_RECEIVED, CERTIFICATE_RECEIVED_FROM_GOSSIP},
AppContext,
};

use opentelemetry::trace::{FutureExt, TraceContextExt};
use tce_transport::TceCommands;
use tokio::spawn;
use topos_p2p::Event;
use topos_tce_broadcast::DoubleEchoCommand;
use tracing::{error, info, info_span, trace, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

impl AppContext {
pub async fn on_net_event(&mut self, evt: Event) {
trace!(
"on_net_event: peer: {} event {:?}",
&self.network_client.local_peer_id,
&evt
);

if let Event::Gossip { from, data } = evt {
let msg: NetworkMessage = data.into();

if let NetworkMessage::Cmd(cmd) = msg {
match cmd {
TceCommands::OnGossip { cert, ctx } => {
let span = info_span!(
"RECV Outbound Gossip",
peer_id = self.network_client.local_peer_id.to_string(),
"otel.kind" = "consumer",
sender = from.to_string()
);
let parent = ctx.extract();
span.add_link(parent.span().span_context().clone());

let channel = self.tce_cli.get_double_echo_channel();

spawn(async move {
info!("Send certificate to be broadcast");
if channel
.send(DoubleEchoCommand::Broadcast {
cert,
need_gossip: false,
ctx: span,
})
.await
.is_err()
{
error!("Unable to send broadcast_new_certificate command, Receiver was dropped");
} else {
CERTIFICATE_RECEIVED.inc();
CERTIFICATE_RECEIVED_FROM_GOSSIP.inc();
}
});
}

TceCommands::OnEcho {
certificate_id,
ctx,
} => {
let span = info_span!(
"RECV Outbound Echo",
peer_id = self.network_client.local_peer_id.to_string(),
"otel.kind" = "consumer",
sender = from.to_string()
);
let context = ctx.extract();
span.add_link(context.span().span_context().clone());

let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
if let Err(e) = channel
.send(DoubleEchoCommand::Echo {
from_peer: from,
certificate_id,
ctx: span.clone(),
})
.with_context(span.context().clone())
.instrument(span)
.await
{
error!("Unable to send Echo, {:?}", e);
}
});
}
TceCommands::OnReady {
certificate_id,
ctx,
} => {
let span = info_span!(
"RECV Outbound Ready",
peer_id = self.network_client.local_peer_id.to_string(),
"otel.kind" = "consumer",
sender = from.to_string()
);
let context = ctx.extract();
span.add_link(context.span().span_context().clone());

let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
if let Err(e) = channel
.send(DoubleEchoCommand::Ready {
from_peer: from,
certificate_id,
ctx: span.clone(),
})
.with_context(context)
.instrument(span)
.await
{
error!("Unable to send Ready {:?}", e);
}
});
}
_ => {}
}
}
}
}
}
Loading

0 comments on commit 72d8673

Please sign in to comment.