Skip to content

Commit

Permalink
feat: cleanup app_context (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
Freyskeyd authored and gruberb committed Jul 13, 2023
1 parent 38d17d6 commit d768b4e
Show file tree
Hide file tree
Showing 10 changed files with 507 additions and 463 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/quality.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ jobs:

msrv:
name: Check - MSRV
runs-on: ubuntu-latest
runs-on: ubuntu-latest-16-core
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
406 changes: 9 additions & 397 deletions crates/topos-tce/src/app_context.rs

Large diffs are not rendered by default.

142 changes: 142 additions & 0 deletions crates/topos-tce/src/app_context/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use opentelemetry::trace::FutureExt as TraceFutureExt;
use std::collections::HashMap;
use tokio::spawn;
use topos_core::uci::{Certificate, SubnetId};
use topos_tce_api::RuntimeError;
use topos_tce_api::RuntimeEvent as ApiEvent;
use topos_tce_gatekeeper::GatekeeperError;
use topos_tce_storage::errors::{InternalStorageError, StorageError};
use tracing::{error, info, info_span, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::events::Events;
use crate::AppContext;

impl AppContext {
pub async fn on_api_event(&mut self, event: ApiEvent) {
match event {
ApiEvent::CertificateSubmitted {
certificate,
sender,
ctx,
} => {
let span = info_span!(parent: &ctx, "TCE Runtime");

_ = self
.tce_cli
.broadcast_new_certificate(*certificate, true)
.with_context(span.context())
.instrument(span)
.await;

_ = sender.send(Ok(()));
}

ApiEvent::PeerListPushed { peers, sender } => {
let sampler = self.tce_cli.clone();
let gatekeeper = self.gatekeeper.clone();
let events = self.events.clone();
let api = self.api_client.clone();

spawn(async move {
match gatekeeper.push_peer_list(peers).await {
Ok(peers) => {
info!("Gatekeeper has detected changes on the peer list, new sample in creation");
if sampler.peer_changed(peers).await.is_err() {
_ = sender.send(Err(RuntimeError::UnableToPushPeerList));
} else {
api.set_active_sample(true).await;
if events.send(Events::StableSample).await.is_err() {
error!("Unable to send StableSample event");
}
_ = sender.send(Ok(()));
}
}
Err(GatekeeperError::NoUpdate) => {
_ = sender.send(Ok(()));
}
Err(_) => {
_ = sender.send(Err(RuntimeError::UnableToPushPeerList));
}
}
});
}

ApiEvent::GetSourceHead { subnet_id, sender } => {
// Get source head certificate
let mut result = self
.pending_storage
.get_source_head(subnet_id)
.await
.map_err(|e| match e {
StorageError::InternalStorage(internal) => {
if let InternalStorageError::MissingHeadForSubnet(subnet_id) = internal
{
RuntimeError::UnknownSubnet(subnet_id)
} else {
RuntimeError::UnableToGetSourceHead(subnet_id, internal.to_string())
}
}
e => RuntimeError::UnableToGetSourceHead(subnet_id, e.to_string()),
});

// TODO: Initial genesis certificate eventually will be fetched from the topos subnet
// Currently, for subnet starting from scratch there are no certificates in the database
// So for MissingHeadForSubnet error we will return some default dummy certificate
if let Err(RuntimeError::UnknownSubnet(subnet_id)) = result {
warn!("Returning dummy certificate as head certificate, to be fixed...");
result = Ok((
0,
topos_core::uci::Certificate {
prev_id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID,
source_subnet_id: subnet_id,
state_root: Default::default(),
tx_root_hash: Default::default(),
target_subnets: vec![],
verifier: 0,
id: AppContext::DUMMY_INITIAL_CERTIFICATE_ID,
proof: Default::default(),
signature: Default::default(),
},
));
};

_ = sender.send(result);
}

ApiEvent::GetLastPendingCertificates {
mut subnet_ids,
sender,
} => {
let mut last_pending_certificates: HashMap<SubnetId, Option<Certificate>> =
subnet_ids
.iter()
.map(|subnet_id| (*subnet_id, None))
.collect();

if let Ok(pending_certificates) =
self.pending_storage.get_pending_certificates().await
{
// Iterate through pending certificates and determine last one for every subnet
// Last certificate in the subnet should be one with the highest index
for (_pending_certificate_id, cert) in pending_certificates.into_iter().rev() {
if let Some(subnet_id) = subnet_ids.take(&cert.source_subnet_id) {
*last_pending_certificates.entry(subnet_id).or_insert(None) =
Some(cert);
}
if subnet_ids.is_empty() {
break;
}
}
}

// Add None pending certificate for any other requested subnet_id
subnet_ids.iter().for_each(|subnet_id| {
last_pending_certificates.insert(*subnet_id, None);
});

_ = sender.send(Ok(last_pending_certificates));
}
}
}
}
116 changes: 116 additions & 0 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use opentelemetry::trace::{FutureExt as TraceFutureExt, TraceContextExt};
use tce_transport::TceCommands;
use tokio::spawn;
use topos_p2p::Event as NetEvent;
use topos_tce_broadcast::DoubleEchoCommand;
use tracing::{error, info, info_span, trace, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::messages::NetworkMessage;
use crate::AppContext;

impl AppContext {
pub async fn on_net_event(&mut self, evt: NetEvent) {
trace!(
"on_net_event: peer: {} event {:?}",
&self.network_client.local_peer_id,
&evt
);

if let NetEvent::Gossip { from, data } = evt {
let msg: NetworkMessage = data.into();

if let NetworkMessage::Cmd(cmd) = msg {
match cmd {
TceCommands::OnGossip { cert, ctx } => {
let span = info_span!(
"RECV Outbound Gossip",
peer_id = self.network_client.local_peer_id.to_string(),
"otel.kind" = "consumer",
sender = from.to_string()
);
let parent = ctx.extract();
span.add_link(parent.span().span_context().clone());

let channel = self.tce_cli.get_double_echo_channel();

spawn(async move {
info!("Send certificate to be broadcast");
if channel
.send(DoubleEchoCommand::Broadcast {
cert,
need_gossip: false,
ctx: span,
})
.await
.is_err()
{
error!("Unable to send broadcast_new_certificate command, Receiver was dropped");
}
});
}

TceCommands::OnEcho {
certificate_id,
ctx,
} => {
let span = info_span!(
"RECV Outbound Echo",
peer_id = self.network_client.local_peer_id.to_string(),
"otel.kind" = "consumer",
sender = from.to_string()
);
let context = ctx.extract();
span.add_link(context.span().span_context().clone());

let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
if let Err(e) = channel
.send(DoubleEchoCommand::Echo {
from_peer: from,
certificate_id,
ctx: span.clone(),
})
.with_context(span.context().clone())
.instrument(span)
.await
{
error!("Unable to send Echo, {:?}", e);
}
});
}
TceCommands::OnReady {
certificate_id,
ctx,
} => {
let span = info_span!(
"RECV Outbound Ready",
peer_id = self.network_client.local_peer_id.to_string(),
"otel.kind" = "consumer",
sender = from.to_string()
);
let context = ctx.extract();
span.add_link(context.span().span_context().clone());

let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
if let Err(e) = channel
.send(DoubleEchoCommand::Ready {
from_peer: from,
certificate_id,
ctx: span.clone(),
})
.with_context(context)
.instrument(span)
.await
{
error!("Unable to send Ready {:?}", e);
}
});
}
_ => {}
}
}
}
}
}
Loading

0 comments on commit d768b4e

Please sign in to comment.