Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch tce-lib action to spawn tasks #486

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 72 additions & 68 deletions crates/topos-tce/src/app_context/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::AppContext;
use std::collections::HashMap;
use tokio::spawn;
use topos_core::uci::{Certificate, SubnetId};
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
use topos_tce_api::RuntimeError;
Expand All @@ -20,79 +21,82 @@ impl AppContext {
self.delivery_latency
.insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer());

_ = match self
.validator_store
.insert_pending_certificate(&certificate)
.await
{
Ok(Some(pending_id)) => {
let certificate_id = certificate.id;
debug!(
"Certificate {} from subnet {} has been inserted into pending pool",
certificate_id, certificate.source_subnet_id
);
let validator_store = self.validator_store.clone();
let double_echo = self.tce_cli.get_double_echo_channel();

if self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: true,
cert: *certificate,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command to double \
echo for {}",
certificate_id
spawn(async move {
_ = match validator_store
.insert_pending_certificate(&certificate)
.await
{
Ok(Some(pending_id)) => {
let certificate_id = certificate.id;
debug!(
"Certificate {} from subnet {} has been inserted into pending pool",
certificate_id, certificate.source_subnet_id
);

sender.send(Err(RuntimeError::CommunicationError(
"Unable to send DoubleEchoCommand::Broadcast command to double \
echo"
.to_string(),
)))
} else {
sender.send(Ok(PendingResult::InPending(pending_id)))
if double_echo
.send(DoubleEchoCommand::Broadcast {
need_gossip: true,
cert: *certificate,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command to \
double echo for {}",
certificate_id
);

sender.send(Err(RuntimeError::CommunicationError(
"Unable to send DoubleEchoCommand::Broadcast command to \
double echo"
.to_string(),
)))
} else {
sender.send(Ok(PendingResult::InPending(pending_id)))
}
}
}
Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into precedence pool \
waiting for {}",
certificate.id, certificate.source_subnet_id, certificate.prev_id
);
sender.send(Ok(PendingResult::AwaitPrecedence))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has already been added to the pending pool, skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyPending))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has already been delivered, skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyDelivered))
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
certificate.id, error
);
Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into precedence \
pool waiting for {}",
certificate.id, certificate.source_subnet_id, certificate.prev_id
);
sender.send(Ok(PendingResult::AwaitPrecedence))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has already been added to the pending pool, \
skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyPending))
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has already been delivered, skipping",
certificate.id
);
sender.send(Ok(PendingResult::AlreadyDelivered))
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
certificate.id, error
);

sender.send(Err(error.into()))
}
};
sender.send(Err(error.into()))
}
};
});
}

ApiEvent::GetSourceHead { subnet_id, sender } => {
Expand Down
116 changes: 59 additions & 57 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,69 +37,71 @@ impl AppContext {
{
entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer());
}
info!(
"Received certificate {} from GossipSub from {}",
cert.id, from
);

match self.validator_store.insert_pending_certificate(&cert).await {
Ok(Some(pending_id)) => {
let certificate_id = cert.id;
debug!(
"Certificate {} has been inserted into pending pool",
certificate_id
);
let validator_store = self.validator_store.clone();
let double_echo = self.tce_cli.get_double_echo_channel();
spawn(async move {
info!(
"Received certificate {} from GossipSub from {}",
cert.id, from
);

if self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: false,
cert,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command \
to double echo for {}",
match validator_store.insert_pending_certificate(&cert).await {
Ok(Some(pending_id)) => {
let certificate_id = cert.id;
debug!(
"Certificate {} has been inserted into pending pool",
certificate_id
);

if double_echo
.send(DoubleEchoCommand::Broadcast {
need_gossip: false,
cert,
pending_id,
})
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast \
command to double echo for {}",
certificate_id
);
}
}
}

Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into \
precedence pool waiting for {}",
cert.id, cert.source_subnet_id, cert.prev_id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has been already added to the pending \
pool, skipping",
cert.id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has been already delivered, skipping",
cert.id
);
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
cert.id, error
);
Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into \
precedence pool waiting for {}",
cert.id, cert.source_subnet_id, cert.prev_id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyPending,
)) => {
debug!(
"Certificate {} has been already added to the pending \
pool, skipping",
cert.id
);
}
Err(StorageError::InternalStorage(
InternalStorageError::CertificateAlreadyExists,
)) => {
debug!(
"Certificate {} has been already delivered, skipping",
cert.id
);
}
Err(error) => {
error!(
"Unable to insert pending certificate {}: {}",
cert.id, error
);
}
}
}
});
}
Err(e) => {
error!("Failed to parse the received Certificate: {e}");
Expand Down
Loading
Loading