From 57d260ec9d9ae508f7d428ad59625440b8025c45 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 21:07:43 -0300 Subject: [PATCH] feat: add lru cache for delivered certificates and known signatures --- Cargo.lock | 7 +- Cargo.toml | 1 + crates/topos-tce-broadcast/Cargo.toml | 1 + .../src/double_echo/mod.rs | 73 +++++++------------ .../src/task_manager/mod.rs | 20 ++++- 5 files changed, 51 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e56c9525..2b3b1b650 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3381,7 +3381,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -4508,9 +4508,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" dependencies = [ "hashbrown 0.14.3", ] @@ -8220,6 +8220,7 @@ dependencies = [ "futures", "hex", "lazy_static", + "lru", "rand", "rand_core", "rstest", diff --git a/Cargo.toml b/Cargo.toml index f758d9b67..dbd8ef2ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ lazy_static = "1" rand = { version = "0.8", default-features = false } rand_core = { version = "0.6", default-features = false } rand_distr = { version = "0.4", default-features = false } +lru = "0.12.3" # Async & Tokio related async-stream = { version = "0.3", default-features = false } diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index 616cfc1f7..97f8a6f65 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -24,6 +24,7 @@ topos-config = { path = "../topos-config/" } topos-metrics = { path = "../topos-metrics/" } topos-tce-storage = { path = "../topos-tce-storage/" } topos-crypto = { path = "../topos-crypto" } +lru.workspace = true [dev-dependencies] criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] } diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index b998e77bc..fe1941e5c 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -15,17 +15,18 @@ use crate::event::ProtocolEvents; use crate::{DoubleEchoCommand, SubscriptionsView}; +use lru::LruCache; use std::collections::HashSet; +use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_util::sync::CancellationToken; use topos_config::tce::broadcast::ReliableBroadcastParams; use topos_core::{types::ValidatorId, uci::CertificateId}; use topos_crypto::messages::{MessageSigner, Signature}; -use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; pub mod broadcast_state; @@ -49,7 +50,7 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, - pub known_signatures: HashSet, + pub known_signatures: LruCache, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, @@ -57,6 +58,7 @@ pub struct DoubleEcho { impl DoubleEcho { pub const MAX_BUFFER_SIZE: usize = 1024 * 20; + pub const KNOWN_SIGNATURES_CACHE_SIZE: usize = 15 * 10_000; #[allow(clippy::too_many_arguments)] pub fn new( @@ -86,7 +88,9 @@ impl DoubleEcho { }, shutdown, validator_store, - known_signatures: HashSet::new(), + known_signatures: LruCache::new( + NonZeroUsize::new(Self::KNOWN_SIGNATURES_CACHE_SIZE).unwrap(), + ), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -167,7 +171,7 @@ impl DoubleEcho { continue; } - self.known_signatures.insert(signature); + self.known_signatures.push(signature, ()); self.handle_echo(certificate_id, validator_id, signature).await }, @@ -184,7 +188,6 @@ impl DoubleEcho { continue; } - let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -194,7 +197,7 @@ impl DoubleEcho { continue; } - self.known_signatures.insert(signature); + self.known_signatures.push(signature, ()); self.handle_ready(certificate_id, validator_id, signature).await }, @@ -230,26 +233,14 @@ impl DoubleEcho { validator_id: ValidatorId, signature: Signature, ) { - match self.validator_store.get_certificate(&certificate_id) { - Err(storage_error) => error!( - "Unable to get the Certificate {} due to {:?}", - &certificate_id, storage_error - ), - Ok(Some(_)) => debug!( - "Certificate {} already delivered, ignoring echo", - &certificate_id - ), - Ok(None) => { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Echo { - validator_id, - certificate_id, - signature, - }) - .await; - } - } + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Echo { + validator_id, + certificate_id, + signature, + }) + .await; } pub async fn handle_ready( @@ -258,25 +249,13 @@ impl DoubleEcho { validator_id: ValidatorId, signature: Signature, ) { - match self.validator_store.get_certificate(&certificate_id) { - Err(storage_error) => error!( - "Unable to get the Certificate {} due to {:?}", - &certificate_id, storage_error - ), - Ok(Some(_)) => debug!( - "Certificate {} already delivered, ignoring echo", - &certificate_id - ), - Ok(None) => { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Ready { - validator_id, - certificate_id, - signature, - }) - .await; - } - } + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Ready { + validator_id, + certificate_id, + signature, + }) + .await; } } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index cb285b9b5..9118ec1f0 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -2,8 +2,10 @@ use crate::event::ProtocolEvents; use futures::stream::FuturesUnordered; use futures::Future; use futures::StreamExt; +use lru::LruCache; use std::collections::HashMap; use std::future::IntoFuture; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -52,11 +54,14 @@ pub struct TaskManager { pub thresholds: ReliableBroadcastParams, pub validator_id: ValidatorId, pub validator_store: Arc, + pub delivered_certificates: LruCache, pub broadcast_sender: broadcast::Sender, pub latest_pending_id: PendingCertificateId, } impl TaskManager { + pub const DELIVERED_CERTIFICATES_CACHE_SIZE: usize = 20_000; + #[allow(clippy::too_many_arguments)] pub fn new( message_receiver: mpsc::Receiver, @@ -79,6 +84,9 @@ impl TaskManager { message_signer, thresholds, validator_store, + delivered_certificates: LruCache::new( + NonZeroUsize::new(Self::DELIVERED_CERTIFICATES_CACHE_SIZE).unwrap(), + ), broadcast_sender, latest_pending_id: 0, } @@ -120,6 +128,11 @@ impl TaskManager { Some(msg) = self.message_receiver.recv() => { match msg { DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if self.delivered_certificates.contains(&certificate_id) { + trace!("Received message for certificate {} that has already been delivered", certificate_id); + continue; + } + if let Some(task_context) = self.tasks.get(&certificate_id) { _ = task_context.sink.send(msg).await; } else { @@ -130,6 +143,11 @@ impl TaskManager { }; } DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + if self.delivered_certificates.contains(&cert.id) { + trace!("Received message for certificate {} that has already been delivered", cert.id); + continue; + } + trace!("Received broadcast message for certificate {} ", cert.id); self.create_task(cert, need_gossip, pending_id) @@ -139,10 +157,10 @@ impl TaskManager { Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { + self.delivered_certificates.put(certificate_id, ()); trace!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); - } else { error!("Task for certificate {} finished unsuccessfully", certificate_id); }