From fbca8677e385a16b1384e0b6e6c3696120145e31 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Wed, 20 Sep 2023 11:48:53 -0700 Subject: [PATCH 01/18] add debug for metrics --- crates/types/src/consensus.rs | 1 + crates/types/src/traits/metrics.rs | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index a29c840056..0f9a825b87 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -73,6 +73,7 @@ pub struct Consensus> { } /// The metrics being collected for the consensus algorithm +#[derive(Debug)] pub struct ConsensusMetrics { /// The current view pub current_view: Box, diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index 0253e129d5..59b62920f8 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -6,6 +6,8 @@ //! - [`Histogram`]: stores multiple float values based for a graph (example usage: CPU %) //! - [`Label`]: Stores the last string (example usage: current version, network online/offline) +use std::fmt::Debug; + /// The metrics type. pub trait Metrics: Send + Sync { /// Create a [`Counter`] with an optional `unit_label`. @@ -76,12 +78,12 @@ impl Label for NoMetrics { } /// An ever-incrementing counter -pub trait Counter: Send + Sync { +pub trait Counter: Send + Sync + Debug { /// Add a value to the counter fn add(&self, amount: usize); } /// A gauge that stores the latest value. -pub trait Gauge: Send + Sync { +pub trait Gauge: Send + Sync + Debug { /// Set the gauge value fn set(&self, amount: usize); @@ -90,7 +92,7 @@ pub trait Gauge: Send + Sync { } /// A histogram which will record a series of points. -pub trait Histogram: Send + Sync { +pub trait Histogram: Send + Sync + Debug { /// Add a point to this histogram. fn add_point(&self, point: f64); } From 1fd54e50bd02480862f42e4acb26a9127f773781 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Wed, 20 Sep 2023 11:48:53 -0700 Subject: [PATCH 02/18] add debug for metrics --- crates/types/src/consensus.rs | 1 + crates/types/src/traits/metrics.rs | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index a29c840056..0f9a825b87 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -73,6 +73,7 @@ pub struct Consensus> { } /// The metrics being collected for the consensus algorithm +#[derive(Debug)] pub struct ConsensusMetrics { /// The current view pub current_view: Box, diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index 0253e129d5..59b62920f8 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -6,6 +6,8 @@ //! - [`Histogram`]: stores multiple float values based for a graph (example usage: CPU %) //! - [`Label`]: Stores the last string (example usage: current version, network online/offline) +use std::fmt::Debug; + /// The metrics type. pub trait Metrics: Send + Sync { /// Create a [`Counter`] with an optional `unit_label`. @@ -76,12 +78,12 @@ impl Label for NoMetrics { } /// An ever-incrementing counter -pub trait Counter: Send + Sync { +pub trait Counter: Send + Sync + Debug { /// Add a value to the counter fn add(&self, amount: usize); } /// A gauge that stores the latest value. -pub trait Gauge: Send + Sync { +pub trait Gauge: Send + Sync + Debug { /// Set the gauge value fn set(&self, amount: usize); @@ -90,7 +92,7 @@ pub trait Gauge: Send + Sync { } /// A histogram which will record a series of points. -pub trait Histogram: Send + Sync { +pub trait Histogram: Send + Sync + Debug { /// Add a point to this histogram. fn add_point(&self, point: f64); } From 2f64ee972fba60bfb2fd010bb943a81df9317320 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Wed, 20 Sep 2023 12:20:55 -0700 Subject: [PATCH 03/18] add derive(Debug) for TestMetrics --- crates/types/src/traits/metrics.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index 59b62920f8..4d73e5f67a 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -111,6 +111,7 @@ mod test { sync::{Arc, Mutex}, }; + #[derive(Debug)] struct TestMetrics { prefix: String, values: Arc>, From 417ab81e2e4522485c3944ddef9f68ab0441029e Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Fri, 22 Sep 2023 15:01:52 -0700 Subject: [PATCH 04/18] Add clone() to NetworkingMetrics --- Cargo.lock | 9 ++++++++- crates/hotshot/Cargo.toml | 1 + crates/hotshot/src/traits/networking.rs | 2 +- .../src/traits/networking/libp2p_network.rs | 9 ++++++--- crates/types/Cargo.toml | 1 + crates/types/src/traits/metrics.rs | 15 ++++++++++----- 6 files changed, 27 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e18e2095ed..4a4c2f5394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,6 +1986,11 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555" +[[package]] +name = "dyn-clone" +version = "1.0.14" +source = "git+https://github.com/dtolnay/dyn-clone?tag=1.0.14#cee99471c46f9f512640aa03c680a547ac72c22c" + [[package]] name = "ed25519" version = "2.2.2" @@ -2588,6 +2593,7 @@ dependencies = [ "custom_debug", "dashmap", "derivative", + "dyn-clone 1.0.14", "either", "embed-doc-image", "espresso-systems-common 0.4.1", @@ -2816,6 +2822,7 @@ dependencies = [ "derivative", "digest 0.10.7", "displaydoc", + "dyn-clone 1.0.14", "either", "espresso-systems-common 0.4.1", "ethereum-types", @@ -3329,7 +3336,7 @@ dependencies = [ "derivative", "displaydoc", "downcast-rs", - "dyn-clone", + "dyn-clone 1.0.13", "hashbrown 0.13.2", "itertools 0.10.5", "jf-utils", diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index 671519ec84..34f9a9b63e 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -112,6 +112,7 @@ sha3 = "^0.10" snafu = { workspace = true } surf-disco = { workspace = true } time = { workspace = true } +dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.14" } tracing = { workspace = true } typenum = { workspace = true } diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index e48e71d29b..017fae77ec 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -14,10 +14,10 @@ pub use hotshot_types::traits::network::{ ChannelSendSnafu, CouldNotDeliverSnafu, FailedToDeserializeSnafu, FailedToSerializeSnafu, NetworkError, NetworkReliability, NoSuchNodeSnafu, ShutDownSnafu, }; - use hotshot_types::traits::metrics::{Counter, Gauge, Metrics}; /// Contains the metrics that we're interested in from the networking interfaces +#[derive(Clone)] pub struct NetworkingMetrics { #[allow(dead_code)] /// A [`Gauge`] which tracks how many peers are connected diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index da8784e0a7..1cea7633c1 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -1,7 +1,6 @@ //! Libp2p based/production networking implementation //! This module provides a libp2p based networking implementation where each node in the //! network forms a tcp or udp connection to a subset of other nodes in the network - use super::NetworkingMetrics; use crate::NodeImplementation; use async_compatibility_layer::{ @@ -320,7 +319,7 @@ impl Libp2pNetwork { let (node_lookup_send, node_lookup_recv) = unbounded(); let (cache_gc_shutdown_send, cache_gc_shutdown_recv) = unbounded::<()>(); - let result = Libp2pNetwork { + let mut result = Libp2pNetwork { inner: Arc::new(Libp2pNetworkInner { handle: network_handle, broadcast_recv, @@ -404,13 +403,14 @@ impl Libp2pNetwork { } /// Initiates connection to the outside world - fn spawn_connect(&self, id: usize) { + fn spawn_connect(&mut self, id: usize) { let pk = self.inner.pk.clone(); let bootstrap_ref = self.inner.bootstrap_addrs.clone(); let num_bootstrap = self.inner.bootstrap_addrs_len; let handle = self.inner.handle.clone(); let is_bootstrapped = self.inner.is_bootstrapped.clone(); let node_type = self.inner.handle.config().node_type; + let metrics_connected_peers = self.inner.metrics.connected_peers.clone(); async_spawn({ let is_ready = self.inner.is_ready.clone(); async move { @@ -439,6 +439,9 @@ impl Libp2pNetwork { .await .unwrap(); + let connected_num = handle.num_connected().await?; + metrics_connected_peers.set(connected_num); + while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 742882cce2..9f65d06657 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -49,6 +49,7 @@ time = { workspace = true } tracing = { workspace = true } ethereum-types = { workspace = true } typenum = { workspace = true } +dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.14" } [dev-dependencies] serde_json = "1.0.107" diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index 4d73e5f67a..78aaef7817 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -7,6 +7,7 @@ //! - [`Label`]: Stores the last string (example usage: current version, network online/offline) use std::fmt::Debug; +use dyn_clone::DynClone; /// The metrics type. pub trait Metrics: Send + Sync { @@ -78,12 +79,12 @@ impl Label for NoMetrics { } /// An ever-incrementing counter -pub trait Counter: Send + Sync + Debug { +pub trait Counter: Send + Sync + Debug + DynClone { /// Add a value to the counter fn add(&self, amount: usize); } /// A gauge that stores the latest value. -pub trait Gauge: Send + Sync + Debug { +pub trait Gauge: Send + Sync + Debug + DynClone { /// Set the gauge value fn set(&self, amount: usize); @@ -92,16 +93,20 @@ pub trait Gauge: Send + Sync + Debug { } /// A histogram which will record a series of points. -pub trait Histogram: Send + Sync + Debug { +pub trait Histogram: Send + Sync + Debug + DynClone { /// Add a point to this histogram. fn add_point(&self, point: f64); } /// A label that stores the last string value. -pub trait Label: Send + Sync { +pub trait Label: Send + Sync + DynClone { /// Set the label value fn set(&self, value: String); } +dyn_clone::clone_trait_object!(Gauge); +dyn_clone::clone_trait_object!(Counter); +dyn_clone::clone_trait_object!(Histogram); +dyn_clone::clone_trait_object!(Label); #[cfg(test)] mod test { @@ -111,7 +116,7 @@ mod test { sync::{Arc, Mutex}, }; - #[derive(Debug)] + #[derive(Debug, Clone)] struct TestMetrics { prefix: String, values: Arc>, From d5c07866631c2983b9bbde1c949aea8ef091a675 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Fri, 22 Sep 2023 16:48:54 -0700 Subject: [PATCH 05/18] fix lint --- crates/hotshot/src/traits/networking.rs | 3 +-- crates/hotshot/src/traits/networking/libp2p_network.rs | 1 - crates/types/src/traits/metrics.rs | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index 017fae77ec..e1ac026546 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -9,12 +9,11 @@ pub mod libp2p_network; pub mod memory_network; pub mod web_server_libp2p_fallback; pub mod web_server_network; - +use hotshot_types::traits::metrics::{Counter, Gauge, Metrics}; pub use hotshot_types::traits::network::{ ChannelSendSnafu, CouldNotDeliverSnafu, FailedToDeserializeSnafu, FailedToSerializeSnafu, NetworkError, NetworkReliability, NoSuchNodeSnafu, ShutDownSnafu, }; -use hotshot_types::traits::metrics::{Counter, Gauge, Metrics}; /// Contains the metrics that we're interested in from the networking interfaces #[derive(Clone)] diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 1cea7633c1..ac949a331f 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -441,7 +441,6 @@ impl Libp2pNetwork { let connected_num = handle.num_connected().await?; metrics_connected_peers.set(connected_num); - while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; } diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index 78aaef7817..e326fca9bc 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -6,8 +6,8 @@ //! - [`Histogram`]: stores multiple float values based for a graph (example usage: CPU %) //! - [`Label`]: Stores the last string (example usage: current version, network online/offline) -use std::fmt::Debug; use dyn_clone::DynClone; +use std::fmt::Debug; /// The metrics type. pub trait Metrics: Send + Sync { From c1fd4ddda8e5fc7cf0a0e0e2ca9d5384e754410e Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 25 Sep 2023 15:07:57 -0700 Subject: [PATCH 06/18] add Debug to NetworkingMetrics --- crates/hotshot/src/traits/networking.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index e1ac026546..3f197d6f94 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -9,6 +9,7 @@ pub mod libp2p_network; pub mod memory_network; pub mod web_server_libp2p_fallback; pub mod web_server_network; +use custom_debug::Debug; use hotshot_types::traits::metrics::{Counter, Gauge, Metrics}; pub use hotshot_types::traits::network::{ ChannelSendSnafu, CouldNotDeliverSnafu, FailedToDeserializeSnafu, FailedToSerializeSnafu, @@ -16,7 +17,7 @@ pub use hotshot_types::traits::network::{ }; /// Contains the metrics that we're interested in from the networking interfaces -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct NetworkingMetrics { #[allow(dead_code)] /// A [`Gauge`] which tracks how many peers are connected From 5fb921802e4274518febe540ad6c73292c781709 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 25 Sep 2023 17:35:17 -0700 Subject: [PATCH 07/18] Add Debug for MemoryNetowkrInner --- crates/hotshot/src/traits/networking/libp2p_network.rs | 5 +++-- crates/hotshot/src/traits/networking/memory_network.rs | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index ac949a331f..7654490e2a 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -77,6 +77,7 @@ impl Debug for Libp2pNetwork { pub type PeerInfoVec = Arc, Multiaddr)>>>; /// The underlying state of the libp2p network +#[derive(Debug)] struct Libp2pNetworkInner { /// this node's public key pk: K, @@ -410,7 +411,7 @@ impl Libp2pNetwork { let handle = self.inner.handle.clone(); let is_bootstrapped = self.inner.is_bootstrapped.clone(); let node_type = self.inner.handle.config().node_type; - let metrics_connected_peers = self.inner.metrics.connected_peers.clone(); + let metrics_connected_peers = self.inner.clone(); async_spawn({ let is_ready = self.inner.is_ready.clone(); async move { @@ -440,7 +441,7 @@ impl Libp2pNetwork { .unwrap(); let connected_num = handle.num_connected().await?; - metrics_connected_peers.set(connected_num); + metrics_connected_peers.metrics.connected_peers.set(connected_num); while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 51f7bbbdf7..4add5dd222 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -87,6 +87,7 @@ enum Combo { } /// Internal state for a `MemoryNetwork` instance +#[derive(Debug)] struct MemoryNetworkInner { /// Input for broadcast messages broadcast_input: RwLock>>>, From b44750336a9c9784a5ae4468084944996701f5bb Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 25 Sep 2023 17:36:52 -0700 Subject: [PATCH 08/18] fmt check --- crates/hotshot/src/traits/networking/libp2p_network.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 7654490e2a..c010f8a8da 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -441,7 +441,10 @@ impl Libp2pNetwork { .unwrap(); let connected_num = handle.num_connected().await?; - metrics_connected_peers.metrics.connected_peers.set(connected_num); + metrics_connected_peers + .metrics + .connected_peers + .set(connected_num); while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; } From 480ac7ad53d8c23e4fb8f4d441d5dc94fa6408b9 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Thu, 28 Sep 2023 00:57:55 -0700 Subject: [PATCH 09/18] basic metrics completed for networking metrics --- crates/hotshot/examples/infra/modDA.rs | 5 +- crates/hotshot/src/traits.rs | 1 + crates/hotshot/src/traits/networking.rs | 147 ++++++++++++++++-- .../src/traits/networking/libp2p_network.rs | 12 +- .../src/traits/networking/memory_network.rs | 11 +- crates/types/src/traits/metrics.rs | 3 +- 6 files changed, 155 insertions(+), 24 deletions(-) diff --git a/crates/hotshot/examples/infra/modDA.rs b/crates/hotshot/examples/infra/modDA.rs index ae0f0e40f5..f888d575ab 100644 --- a/crates/hotshot/examples/infra/modDA.rs +++ b/crates/hotshot/examples/infra/modDA.rs @@ -7,7 +7,8 @@ use futures::StreamExt; use hotshot::{ traits::{ implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryStorage, WebCommChannel, WebServerNetwork, + Libp2pCommChannel, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, + WebCommChannel, WebServerNetwork, }, NodeImplementation, }, @@ -701,7 +702,7 @@ where let node_config = config_builder.build().unwrap(); let underlying_quorum_network = Libp2pNetwork::new( - NoMetrics::boxed(), + NetworkingMetricsValue::new(), node_config, pubkey.clone(), Arc::new(RwLock::new( diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index a0e56cf86d..7d20468b44 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -17,6 +17,7 @@ pub mod implementations { memory_network::{DummyReliability, MasterMap, MemoryCommChannel, MemoryNetwork}, web_server_libp2p_fallback::{CombinedNetworks, WebServerWithFallbackCommChannel}, web_server_network::{WebCommChannel, WebServerNetwork}, + NetworkingMetricsValue, }, storage::memory_storage::MemoryStorage, // atomic_storage::AtomicStorage, }; diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index 3f197d6f94..aeaca0d815 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -9,17 +9,24 @@ pub mod libp2p_network; pub mod memory_network; pub mod web_server_libp2p_fallback; pub mod web_server_network; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + use custom_debug::Debug; -use hotshot_types::traits::metrics::{Counter, Gauge, Metrics}; +use hotshot_types::traits::metrics::{Counter, Gauge, Histogram, Label, Metrics}; pub use hotshot_types::traits::network::{ ChannelSendSnafu, CouldNotDeliverSnafu, FailedToDeserializeSnafu, FailedToSerializeSnafu, NetworkError, NetworkReliability, NoSuchNodeSnafu, ShutDownSnafu, }; -/// Contains the metrics that we're interested in from the networking interfaces +/// Contains several `NetworkingMetrics` that we're interested in from the networking interfaces #[derive(Clone, Debug)] -pub struct NetworkingMetrics { +pub struct NetworkingMetricsValue { #[allow(dead_code)] + /// The values that are being tracked + pub values: Arc>, /// A [`Gauge`] which tracks how many peers are connected pub connected_peers: Box, /// A [`Counter`] which tracks how many messages have been received @@ -36,10 +43,130 @@ pub struct NetworkingMetrics { // pub kademlia_buckets: Box, } +/// The wrapper with a string name for the networking metrics +#[derive(Clone, Debug)] +pub struct NetworkingMetrics { + /// a prefix which tracks the name of the metric + prefix: String, + /// a map of values + values: Arc>, +} + +/// the set of counters and gauges for the networking metrics +#[derive(Clone, Debug, Default)] +pub struct InnerNetworkingMetrics { + /// All the counters of the networking metrics + counters: HashMap, + /// All the gauges of the networking metrics + gauges: HashMap, + /// All the histograms of the networking metrics + histograms: HashMap>, + /// All the labels of the networking metrics + labels: HashMap, +} + impl NetworkingMetrics { - /// Create a new instance of this [`NetworkingMetrics`] struct, setting all the counters and gauges - pub(self) fn new(metrics: &dyn Metrics) -> Self { + /// For the creation and naming of gauge, counter, histogram and label. + pub fn sub(&self, name: String) -> Self { + let prefix = if self.prefix.is_empty() { + name + } else { + format!("{}-{name}", self.prefix) + }; Self { + prefix, + values: Arc::clone(&self.values), + } + } +} + +impl Metrics for NetworkingMetrics { + fn create_counter(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_gauge(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_histogram(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_label(&self, label: String) -> Box { + Box::new(self.sub(label)) + } + + fn subgroup(&self, subgroup_name: String) -> Box { + Box::new(self.sub(subgroup_name)) + } +} + +impl Counter for NetworkingMetrics { + fn add(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .counters + .entry(self.prefix.clone()) + .or_default() += amount; + } +} + +impl Gauge for NetworkingMetrics { + fn set(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .gauges + .entry(self.prefix.clone()) + .or_default() = amount; + } + fn update(&self, delta: i64) { + let mut values = self.values.lock().unwrap(); + let value = values.gauges.entry(self.prefix.clone()).or_default(); + let signed_value = i64::try_from(*value).unwrap_or(i64::MAX); + *value = usize::try_from(signed_value + delta).unwrap_or(0); + } +} + +impl Histogram for NetworkingMetrics { + fn add_point(&self, point: f64) { + self.values + .lock() + .unwrap() + .histograms + .entry(self.prefix.clone()) + .or_default() + .push(point); + } +} + +impl Label for NetworkingMetrics { + fn set(&self, value: String) { + *self + .values + .lock() + .unwrap() + .labels + .entry(self.prefix.clone()) + .or_default() = value; + } +} + +impl NetworkingMetricsValue { + /// Create a new instance of this [`NetworkingMetricsValue`] struct, setting all the counters and gauges + #[must_use] + pub fn new() -> Self { + let values = Arc::default(); + let metrics: Box = Box::new(NetworkingMetrics { + prefix: String::new(), + values: Arc::clone(&values), + }); + Self { + values, connected_peers: metrics.create_gauge(String::from("connected_peers"), None), incoming_message_count: metrics .create_counter(String::from("incoming_message_count"), None), @@ -47,10 +174,12 @@ impl NetworkingMetrics { .create_counter(String::from("outgoing_message_count"), None), message_failed_to_send: metrics .create_counter(String::from("message_failed_to_send"), None), - // gossipsub_mesh_connected: metrics - // .create_gauge(String::from("gossipsub_mesh_connected"), None), - // kademlia_entries: metrics.create_gauge(String::from("kademlia_entries"), None), - // kademlia_buckets: metrics.create_gauge(String::from("kademlia_buckets"), None), } } } + +impl Default for NetworkingMetricsValue { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index c010f8a8da..2ef23e5f2a 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -1,7 +1,7 @@ //! Libp2p based/production networking implementation //! This module provides a libp2p based networking implementation where each node in the //! network forms a tcp or udp connection to a subset of other nodes in the network -use super::NetworkingMetrics; +use super::NetworkingMetricsValue; use crate::NodeImplementation; use async_compatibility_layer::{ art::{async_block_on, async_sleep, async_spawn, async_timeout}, @@ -18,7 +18,6 @@ use hotshot_types::{ message::{Message, MessageKind}, traits::{ election::Membership, - metrics::{Metrics, NoMetrics}, network::{ CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, NetworkError, NetworkMsg, TestableChannelImplementation, @@ -107,7 +106,7 @@ struct Libp2pNetworkInner { /// whether or not we've bootstrapped into the DHT yet is_bootstrapped: Arc, /// The networking metrics we're keeping track of - metrics: NetworkingMetrics, + metrics: NetworkingMetricsValue, /// topic map /// hash(hashset) -> topic /// btreemap ordered so is hashable @@ -228,7 +227,7 @@ where let da = da_keys.clone(); async_block_on(async move { Libp2pNetwork::new( - NoMetrics::boxed(), + NetworkingMetricsValue::new(), config, pubkey, bootstrap_addrs_ref, @@ -275,7 +274,7 @@ impl Libp2pNetwork { /// This will panic if there are less than 5 bootstrap nodes #[allow(clippy::too_many_arguments)] pub async fn new( - metrics: Box, + metrics: NetworkingMetricsValue, config: NetworkNodeConfig, pk: K, bootstrap_addrs: Arc, Multiaddr)>>>, @@ -333,7 +332,7 @@ impl Libp2pNetwork { is_ready: Arc::new(AtomicBool::new(false)), dht_timeout: Duration::from_secs(30), is_bootstrapped: Arc::new(AtomicBool::new(false)), - metrics: NetworkingMetrics::new(&*metrics), + metrics, topic_map, node_lookup_send, cache_gc_shutdown_send, @@ -445,6 +444,7 @@ impl Libp2pNetwork { .metrics .connected_peers .set(connected_num); + while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 4add5dd222..6a53082872 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -3,7 +3,7 @@ //! This module provides an in-memory only simulation of an actual network, useful for unit and //! integration tests. -use super::{FailedToSerializeSnafu, NetworkError, NetworkReliability, NetworkingMetrics}; +use super::{FailedToSerializeSnafu, NetworkError, NetworkReliability, NetworkingMetricsValue}; use crate::NodeImplementation; use async_compatibility_layer::{ art::{async_sleep, async_spawn}, @@ -19,7 +19,6 @@ use hotshot_types::{ message::{Message, MessageKind}, traits::{ election::Membership, - metrics::{Metrics, NoMetrics}, network::{ CommunicationChannel, ConnectedNetwork, NetworkMsg, TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, ViewMessage, @@ -104,7 +103,7 @@ struct MemoryNetworkInner { in_flight_message_count: AtomicUsize, /// The networking metrics we're keeping track of - metrics: NetworkingMetrics, + metrics: NetworkingMetricsValue, } /// In memory only network simulator. @@ -133,7 +132,7 @@ impl MemoryNetwork { #[instrument(skip(metrics))] pub fn new( pub_key: K, - metrics: Box, + metrics: NetworkingMetricsValue, master_map: Arc>, reliability_config: Option>, ) -> MemoryNetwork { @@ -249,7 +248,7 @@ impl MemoryNetwork { direct_output: Mutex::new(direct_output), master_map: master_map.clone(), in_flight_message_count, - metrics: NetworkingMetrics::new(&*metrics), + metrics, }), }; master_map.map.insert(pub_key, mn.clone()); @@ -302,7 +301,7 @@ impl> Box::new(move |node_id| { let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1; let pubkey = TYPES::SignatureKey::from_private(&privkey); - MemoryNetwork::new(pubkey, NoMetrics::boxed(), master.clone(), None) + MemoryNetwork::new(pubkey, NetworkingMetricsValue::new(), master.clone(), None) }) } diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index e326fca9bc..fc69b5c077 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -10,7 +10,7 @@ use dyn_clone::DynClone; use std::fmt::Debug; /// The metrics type. -pub trait Metrics: Send + Sync { +pub trait Metrics: Send + Sync + DynClone + Debug { /// Create a [`Counter`] with an optional `unit_label`. /// /// The `unit_label` can be used to indicate what the unit of the value is, e.g. "kb" or "seconds" @@ -103,6 +103,7 @@ pub trait Label: Send + Sync + DynClone { /// Set the label value fn set(&self, value: String); } +dyn_clone::clone_trait_object!(Metrics); dyn_clone::clone_trait_object!(Gauge); dyn_clone::clone_trait_object!(Counter); dyn_clone::clone_trait_object!(Histogram); From 35c930dfea9205bc62db850fefcd066c1ea1154f Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Thu, 28 Sep 2023 22:30:49 -0700 Subject: [PATCH 10/18] revoke initialization of consensus metrics, no more NoMetrics Initialization --- crates/hotshot/examples/infra/modDA.rs | 4 +- crates/hotshot/src/lib.rs | 19 ++-- crates/testing/src/task_helpers.rs | 5 +- crates/testing/src/test_runner.rs | 4 +- crates/types/src/consensus.rs | 144 +++++++++++++++++++++++-- 5 files changed, 150 insertions(+), 26 deletions(-) diff --git a/crates/hotshot/examples/infra/modDA.rs b/crates/hotshot/examples/infra/modDA.rs index f888d575ab..a9c0402392 100644 --- a/crates/hotshot/examples/infra/modDA.rs +++ b/crates/hotshot/examples/infra/modDA.rs @@ -23,6 +23,7 @@ use hotshot_orchestrator::{ use hotshot_task::task::FilterEvent; use hotshot_types::HotShotConfig; use hotshot_types::{ + consensus::ConsensusMetricsValue, certificate::ViewSyncCertificate, data::{QuorumProposal, SequencingLeaf, TestableLeaf}, event::{Event, EventType}, @@ -31,7 +32,6 @@ use hotshot_types::{ election::{ CommitteeExchange, ConsensusExchange, Membership, QuorumExchange, ViewSyncExchange, }, - metrics::NoMetrics, network::CommunicationChannel, node_implementation::{ CommitteeEx, ExchangesType, NodeType, QuorumEx, SequencingExchanges, @@ -235,7 +235,7 @@ pub trait RunDA< MemoryStorage::empty(), exchanges, initializer, - NoMetrics::boxed(), + ConsensusMetricsValue::new(), ) .await .expect("Could not init hotshot") diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index 117153967e..f9bda9ae4c 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -57,7 +57,7 @@ use hotshot_task_impls::{events::SequencingHotShotEvent, network::NetworkTaskKin use hotshot_types::{ certificate::{DACertificate, ViewSyncCertificate}, - consensus::{BlockStore, Consensus, ConsensusMetrics, View, ViewInner, ViewQueue}, + consensus::{BlockStore, Consensus, ConsensusMetricsValue, View, ViewInner, ViewQueue}, data::{DAProposal, DeltasType, LeafType, QuorumProposal, SequencingLeaf}, error::StorageSnafu, message::{ @@ -67,7 +67,6 @@ use hotshot_types::{ traits::{ consensus_api::{ConsensusSharedApi, SequencingConsensusApi}, election::{ConsensusExchange, Membership, SignedCertificate}, - metrics::Metrics, network::{CommunicationChannel, NetworkError}, node_implementation::{ ChannelMaps, CommitteeEx, ExchangesType, NodeType, SendToTasks, SequencingQuorumEx, @@ -129,8 +128,8 @@ pub struct SystemContextInner> { /// Sender for [`Event`]s event_sender: RwLock>>>, - /// a reference to the metrics that the implementor is using. - _metrics: Box, + /// the metrics that the implementor is using. + _metrics: Arc, /// Transactions /// (this is shared btwn hotshot and `Consensus`) @@ -179,13 +178,11 @@ impl> SystemContext { storage: I::Storage, exchanges: I::Exchanges, initializer: HotShotInitializer, - metrics: Box, + metrics: ConsensusMetricsValue, ) -> Result> { debug!("Creating a new hotshot"); - let consensus_metrics = Arc::new(ConsensusMetrics::new( - &*metrics.subgroup("consensus".to_string()), - )); + let consensus_metrics = Arc::new(metrics); let anchored_leaf = initializer.inner; // insert to storage @@ -226,7 +223,7 @@ impl> SystemContext { // https://github.com/EspressoSystems/HotShot/issues/560 locked_view: anchored_leaf.get_view_number(), high_qc: anchored_leaf.get_justify_qc(), - metrics: consensus_metrics, + metrics: consensus_metrics.clone(), invalid_qc: 0, }; let consensus = Arc::new(RwLock::new(consensus)); @@ -244,7 +241,7 @@ impl> SystemContext { storage, exchanges: Arc::new(exchanges), event_sender: RwLock::default(), - _metrics: metrics, + _metrics: consensus_metrics.clone(), internal_event_stream: ChannelStream::new(), output_event_stream: ChannelStream::new(), }); @@ -403,7 +400,7 @@ impl> SystemContext { storage: I::Storage, exchanges: I::Exchanges, initializer: HotShotInitializer, - metrics: Box, + metrics: ConsensusMetricsValue, ) -> Result< ( SystemContextHandle, diff --git a/crates/testing/src/task_helpers.rs b/crates/testing/src/task_helpers.rs index 8e0f85a95c..515e48ba60 100644 --- a/crates/testing/src/task_helpers.rs +++ b/crates/testing/src/task_helpers.rs @@ -18,11 +18,10 @@ use hotshot_types::{ traits::{ consensus_api::ConsensusSharedApi, election::{ConsensusExchange, Membership, SignedCertificate}, - metrics::NoMetrics, node_implementation::{CommitteeEx, ExchangesType, NodeType, QuorumEx}, signature_key::EncodedSignature, state::ConsensusTime, - }, + }, consensus::ConsensusMetricsValue, }; pub async fn build_system_handle( @@ -81,7 +80,7 @@ pub async fn build_system_handle( storage, exchanges, initializer, - NoMetrics::boxed(), + ConsensusMetricsValue::new(), ) .await .expect("Could not init hotshot") diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 056f0f8a26..cf6932a390 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -12,10 +12,10 @@ use hotshot_task::{ event_stream::ChannelStream, global_registry::GlobalRegistry, task_launcher::TaskRunner, }; use hotshot_types::{ + consensus::ConsensusMetricsValue, message::Message, traits::{ election::{ConsensusExchange, Membership}, - metrics::NoMetrics, network::CommunicationChannel, node_implementation::{ExchangesType, NodeType, QuorumCommChannel, QuorumEx}, signature_key::SignatureKey, @@ -276,7 +276,7 @@ where storage, exchanges, initializer, - NoMetrics::boxed(), + ConsensusMetricsValue::new(), ) .await .expect("Could not init hotshot") diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 0f9a825b87..b73e93b22c 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -11,7 +11,7 @@ use crate::{ data::LeafType, error::HotShotError, traits::{ - metrics::{Counter, Gauge, Histogram, Metrics}, + metrics::{Counter, Gauge, Histogram, Metrics, Label}, node_implementation::NodeType, }, }; @@ -19,7 +19,7 @@ use commit::{Commitment, Committable}; use derivative::Derivative; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, - sync::Arc, + sync::{Arc, Mutex}, }; use tracing::error; @@ -65,16 +65,19 @@ pub struct Consensus> { /// A reference to the metrics trait #[debug(skip)] - pub metrics: Arc, + pub metrics: Arc, /// Amount of invalid QCs we've seen since the last commit /// Used for metrics. This resets to 0 on every decide event. pub invalid_qc: usize, } -/// The metrics being collected for the consensus algorithm -#[derive(Debug)] -pub struct ConsensusMetrics { +/// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces +#[derive(Clone, Debug)] +pub struct ConsensusMetricsValue { + #[allow(dead_code)] + /// The values that are being tracked + pub values: Arc>, /// The current view pub current_view: Box, /// The duration to collect votes in a view (only applies when this insance is the leader) @@ -115,11 +118,130 @@ pub struct ConsensusMetrics { pub failed_to_send_messages: Box, } +/// The wrapper with a string name for the networking metrics +#[derive(Clone, Debug)] +pub struct ConsensusMetrics { + /// a prefix which tracks the name of the metric + prefix: String, + /// a map of values + values: Arc>, +} + +/// the set of counters and gauges for the networking metrics +#[derive(Clone, Debug, Default)] +pub struct InnerConsensusMetrics { + /// All the counters of the networking metrics + counters: HashMap, + /// All the gauges of the networking metrics + gauges: HashMap, + /// All the histograms of the networking metrics + histograms: HashMap>, + /// All the labels of the networking metrics + labels: HashMap, +} + impl ConsensusMetrics { - /// Create a new instance of this [`ConsensusMetrics`] struct, setting all the counters and gauges + /// For the creation and naming of gauge, counter, histogram and label. + pub fn sub(&self, name: String) -> Self { + let prefix = if self.prefix.is_empty() { + name + } else { + format!("{}-{name}", self.prefix) + }; + Self { + prefix, + values: Arc::clone(&self.values), + } + } +} + +impl Metrics for ConsensusMetrics { + fn create_counter(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_gauge(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_histogram(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_label(&self, label: String) -> Box { + Box::new(self.sub(label)) + } + + fn subgroup(&self, subgroup_name: String) -> Box { + Box::new(self.sub(subgroup_name)) + } +} + +impl Counter for ConsensusMetrics { + fn add(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .counters + .entry(self.prefix.clone()) + .or_default() += amount; + } +} + +impl Gauge for ConsensusMetrics { + fn set(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .gauges + .entry(self.prefix.clone()) + .or_default() = amount; + } + fn update(&self, delta: i64) { + let mut values = self.values.lock().unwrap(); + let value = values.gauges.entry(self.prefix.clone()).or_default(); + let signed_value = i64::try_from(*value).unwrap_or(i64::MAX); + *value = usize::try_from(signed_value + delta).unwrap_or(0); + } +} + +impl Histogram for ConsensusMetrics { + fn add_point(&self, point: f64) { + self.values + .lock() + .unwrap() + .histograms + .entry(self.prefix.clone()) + .or_default() + .push(point); + } +} + +impl Label for ConsensusMetrics { + fn set(&self, value: String) { + *self + .values + .lock() + .unwrap() + .labels + .entry(self.prefix.clone()) + .or_default() = value; + } +} + +impl ConsensusMetricsValue { + /// Create a new instance of this [`ConsensusMetricsValue`] struct, setting all the counters and gauges #[must_use] - pub fn new(metrics: &dyn Metrics) -> Self { + pub fn new() -> Self { + let values = Arc::default(); + let metrics: Box = Box::new(ConsensusMetrics { + prefix: String::new(), + values: Arc::clone(&values), + }); Self { + values, current_view: metrics.create_gauge(String::from("current_view"), None), vote_validate_duration: metrics.create_histogram( String::from("vote_validate_duration"), @@ -166,6 +288,12 @@ impl ConsensusMetrics { } } +impl Default for ConsensusMetricsValue { + fn default() -> Self { + Self::new() + } +} + impl> Consensus { /// increment the current view /// NOTE may need to do gc here From 089edb250fd5a78cd5022646dfba3fec3e427cad Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Thu, 28 Sep 2023 22:38:52 -0700 Subject: [PATCH 11/18] fix lint --- crates/hotshot/examples/infra/modDA.rs | 2 +- crates/testing/src/task_helpers.rs | 3 ++- crates/types/src/consensus.rs | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/hotshot/examples/infra/modDA.rs b/crates/hotshot/examples/infra/modDA.rs index a9c0402392..2b63a8a794 100644 --- a/crates/hotshot/examples/infra/modDA.rs +++ b/crates/hotshot/examples/infra/modDA.rs @@ -23,8 +23,8 @@ use hotshot_orchestrator::{ use hotshot_task::task::FilterEvent; use hotshot_types::HotShotConfig; use hotshot_types::{ - consensus::ConsensusMetricsValue, certificate::ViewSyncCertificate, + consensus::ConsensusMetricsValue, data::{QuorumProposal, SequencingLeaf, TestableLeaf}, event::{Event, EventType}, message::{Message, SequencingMessage}, diff --git a/crates/testing/src/task_helpers.rs b/crates/testing/src/task_helpers.rs index 515e48ba60..7418028b53 100644 --- a/crates/testing/src/task_helpers.rs +++ b/crates/testing/src/task_helpers.rs @@ -13,6 +13,7 @@ use hotshot::{ use hotshot_task::event_stream::ChannelStream; use hotshot_task_impls::events::SequencingHotShotEvent; use hotshot_types::{ + consensus::ConsensusMetricsValue, data::{QuorumProposal, SequencingLeaf, VidScheme, ViewNumber}, message::{Message, Proposal}, traits::{ @@ -21,7 +22,7 @@ use hotshot_types::{ node_implementation::{CommitteeEx, ExchangesType, NodeType, QuorumEx}, signature_key::EncodedSignature, state::ConsensusTime, - }, consensus::ConsensusMetricsValue, + }, }; pub async fn build_system_handle( diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index b73e93b22c..6ba49fd211 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -11,7 +11,7 @@ use crate::{ data::LeafType, error::HotShotError, traits::{ - metrics::{Counter, Gauge, Histogram, Metrics, Label}, + metrics::{Counter, Gauge, Histogram, Label, Metrics}, node_implementation::NodeType, }, }; @@ -141,6 +141,7 @@ pub struct InnerConsensusMetrics { } impl ConsensusMetrics { + #[must_use] /// For the creation and naming of gauge, counter, histogram and label. pub fn sub(&self, name: String) -> Self { let prefix = if self.prefix.is_empty() { From ebfc29b8795294a2b463a16f36a3c7f17c90008b Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Sat, 30 Sep 2023 22:57:15 -0700 Subject: [PATCH 12/18] synced most recent block height --- crates/task-impls/src/consensus.rs | 9 +++++++-- crates/task-impls/src/da.rs | 6 ------ crates/types/src/consensus.rs | 10 ++++++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index dccf7919c3..24bccf2059 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -584,7 +584,7 @@ where // TODO ED Insert TC logic here // Construct the leaf. - let justify_qc = proposal.data.justify_qc; + let justify_qc = proposal.clone().data.justify_qc; let parent = if justify_qc.is_genesis() { self.genesis_leaf().await } else { @@ -969,7 +969,6 @@ where let mut consensus = self.consensus.write().await; consensus.high_qc = qc.clone(); - drop(consensus); // View may have already been updated by replica if they voted for this QC @@ -1168,6 +1167,12 @@ where proposer_id: self.api.public_key().to_bytes(), }; + let consensus = self.consensus.read().await; + consensus + .metrics + .last_synced_block_height + .set(usize::try_from(leaf.height).unwrap_or(0)); + let signature = self .quorum_exchange .sign_validating_or_commitment_proposal::(&leaf.commit()); diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index ff048ef469..c7df2854ef 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -646,12 +646,6 @@ where self.event_stream .publish(SequencingHotShotEvent::SendDABlockData(block.clone())) .await; - // if let Err(e) = self.api.send_da_broadcast(message.clone()).await { - // consensus.metrics.failed_to_send_messages.add(1); - // warn!(?message, ?e, "Could not broadcast leader proposal"); - // } else { - // consensus.metrics.outgoing_broadcast_messages.add(1); - // } self.event_stream .publish(SequencingHotShotEvent::DAProposalSend( message.clone(), diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 6ba49fd211..4b6b7bb0d6 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -78,6 +78,8 @@ pub struct ConsensusMetricsValue { #[allow(dead_code)] /// The values that are being tracked pub values: Arc>, + /// The number of last synced synced block height + pub last_synced_block_height: Box, /// The current view pub current_view: Box, /// The duration to collect votes in a view (only applies when this insance is the leader) @@ -114,8 +116,8 @@ pub struct ConsensusMetricsValue { pub direct_messages_received: Box, /// Total broadcast messages received pub broadcast_messages_received: Box, - /// Total number of messages which couldn't be sent - pub failed_to_send_messages: Box, + // Total number of messages which couldn't be sent + // pub failed_to_send_messages: Box, } /// The wrapper with a string name for the networking metrics @@ -243,6 +245,8 @@ impl ConsensusMetricsValue { }); Self { values, + last_synced_block_height: metrics + .create_gauge(String::from("last_synced_block_height"), None), current_view: metrics.create_gauge(String::from("current_view"), None), vote_validate_duration: metrics.create_histogram( String::from("vote_validate_duration"), @@ -281,8 +285,6 @@ impl ConsensusMetricsValue { .create_counter(String::from("direct_messages_received"), None), broadcast_messages_received: metrics .create_counter(String::from("broadcast_messages_received"), None), - failed_to_send_messages: metrics - .create_counter(String::from("failed_to_send_messages"), None), number_of_timeouts: metrics .create_counter(String::from("number_of_views_timed_out"), None), } From dfd61f5b62fcbce380a4c6c68cd321ebfe673715 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 2 Oct 2023 17:01:37 -0700 Subject: [PATCH 13/18] update last_synced_block_height, current_view, incoming_tx, outgoing_tx, num_timeouts etc. for metrics --- crates/hotshot/src/traits/networking.rs | 24 ++++++--- .../src/traits/networking/libp2p_network.rs | 8 +-- .../src/traits/networking/memory_network.rs | 12 ++--- crates/task-impls/src/consensus.rs | 17 ++++--- crates/types/src/consensus.rs | 51 ++++--------------- crates/types/src/data.rs | 4 ++ crates/types/src/traits/state.rs | 2 + 7 files changed, 52 insertions(+), 66 deletions(-) diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index aeaca0d815..17ba29d32d 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -29,10 +29,14 @@ pub struct NetworkingMetricsValue { pub values: Arc>, /// A [`Gauge`] which tracks how many peers are connected pub connected_peers: Box, - /// A [`Counter`] which tracks how many messages have been received - pub incoming_message_count: Box, - /// A [`Counter`] which tracks how many messages have been send - pub outgoing_message_count: Box, + /// A [`Counter`] which tracks how many messages have been received directly + pub incoming_direct_message_count: Box, + /// A [`Counter`] which tracks how many messages have been received by broadcast + pub incoming_broadcast_message_count: Box, + /// A [`Counter`] which tracks how many messages have been send directly + pub outgoing_direct_message_count: Box, + /// A [`Counter`] which tracks how many messages have been send by broadcast + pub outgoing_broadcast_message_count: Box, /// A [`Counter`] which tracks how many messages failed to send pub message_failed_to_send: Box, // A [`Gauge`] which tracks how many connected entries there are in the gossipsub mesh @@ -168,10 +172,14 @@ impl NetworkingMetricsValue { Self { values, connected_peers: metrics.create_gauge(String::from("connected_peers"), None), - incoming_message_count: metrics - .create_counter(String::from("incoming_message_count"), None), - outgoing_message_count: metrics - .create_counter(String::from("outgoing_message_count"), None), + incoming_direct_message_count: metrics + .create_counter(String::from("incoming_direct_message_count"), None), + incoming_broadcast_message_count: metrics + .create_counter(String::from("incoming_broadcast_message_count"), None), + outgoing_direct_message_count: metrics + .create_counter(String::from("outgoing_direct_message_count"), None), + outgoing_broadcast_message_count: metrics + .create_counter(String::from("outgoing_broadcast_message_count"), None), message_failed_to_send: metrics .create_counter(String::from("message_failed_to_send"), None), } diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 2ef23e5f2a..1a5550ef82 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -627,7 +627,7 @@ impl ConnectedNetwork for Libp2p match self.inner.handle.gossip(topic, &message).await { Ok(()) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); Ok(()) } Err(e) => { @@ -675,7 +675,7 @@ impl ConnectedNetwork for Libp2p match self.inner.handle.direct_request(pid, &message).await { Ok(()) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); Ok(()) } Err(e) => { @@ -706,7 +706,7 @@ impl ConnectedNetwork for Libp2p .drain_at_least_one() .await .map_err(|_x| NetworkError::ShutDown)?; - self.inner.metrics.incoming_message_count.add(result.len()); + self.inner.metrics.incoming_direct_message_count.add(result.len()); Ok(result) } TransmitType::Broadcast => { @@ -716,7 +716,7 @@ impl ConnectedNetwork for Libp2p .drain_at_least_one() .await .map_err(|_x| NetworkError::ShutDown)?; - self.inner.metrics.incoming_message_count.add(result.len()); + self.inner.metrics.incoming_direct_message_count.add(result.len()); Ok(result) } } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 6a53082872..cd13aab788 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -264,7 +264,7 @@ impl MemoryNetwork { .fetch_add(1, Ordering::Relaxed); let input = self.inner.broadcast_input.read().await; if let Some(input) = &*input { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); input.send(message).await } else { Err(SendError(message)) @@ -278,7 +278,7 @@ impl MemoryNetwork { .fetch_add(1, Ordering::Relaxed); let input = self.inner.direct_input.read().await; if let Some(input) = &*input { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); input.send(message).await } else { Err(SendError(message)) @@ -355,7 +355,7 @@ impl ConnectedNetwork for Memory let res = node.broadcast_input(vec.clone()).await; match res { Ok(_) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); trace!(?key, "Delivered message to remote"); } Err(e) => { @@ -380,7 +380,7 @@ impl ConnectedNetwork for Memory let res = node.direct_input(vec).await; match res { Ok(_) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); trace!(?recipient, "Delivered message to remote"); Ok(()) } @@ -423,7 +423,7 @@ impl ConnectedNetwork for Memory self.inner .in_flight_message_count .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner.metrics.incoming_message_count.add(ret.len()); + self.inner.metrics.incoming_direct_message_count.add(ret.len()); Ok(ret) } TransmitType::Broadcast => { @@ -438,7 +438,7 @@ impl ConnectedNetwork for Memory self.inner .in_flight_message_count .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner.metrics.incoming_message_count.add(ret.len()); + self.inner.metrics.incoming_broadcast_message_count.add(ret.len()); Ok(ret) } } diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 24bccf2059..c5bbc55933 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -537,6 +537,8 @@ where .await; } }); + let consensus = self.consensus.read().await; + consensus.metrics.current_view.set(*self.cur_view as usize); return true; } @@ -740,6 +742,10 @@ where // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above if new_decide_reached { let mut leaf = leaf.clone(); + consensus + .metrics + .last_synced_block_height + .set(usize::try_from(leaf.height).unwrap_or(0)); // If the full block is available for this leaf, include it in the leaf // chain that we send to the client. @@ -1045,6 +1051,7 @@ where .await; debug!("View changed to {}", *new_view); + // ED Need to update the view here? What does otherwise? // self.update_view(qc.view_number + 1).await; @@ -1061,7 +1068,7 @@ where "Failed to publish proposal on view change. View = {:?}", self.cur_view ); - } + } } SequencingHotShotEvent::Timeout(view) => { // The view sync module will handle updating views in the case of timeout @@ -1074,6 +1081,8 @@ where "We received a timeout event in the consensus task for view {}!", *view ); + let consensus = self.consensus.read().await; + consensus.metrics.number_of_timeouts.add(1); } SequencingHotShotEvent::SendDABlockData(block) => { // ED TODO Should make sure this is actually the most recent block @@ -1167,12 +1176,6 @@ where proposer_id: self.api.public_key().to_bytes(), }; - let consensus = self.consensus.read().await; - consensus - .metrics - .last_synced_block_height - .set(usize::try_from(leaf.height).unwrap_or(0)); - let signature = self .quorum_exchange .sign_validating_or_commitment_proposal::(&leaf.commit()); diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 4b6b7bb0d6..de5921ce53 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -3,6 +3,7 @@ pub use crate::traits::node_implementation::ViewQueue; pub use crate::utils::{View, ViewInner}; use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; +use displaydoc::Display; use std::collections::HashSet; use crate::utils::Terminator; @@ -82,14 +83,14 @@ pub struct ConsensusMetricsValue { pub last_synced_block_height: Box, /// The current view pub current_view: Box, - /// The duration to collect votes in a view (only applies when this insance is the leader) - pub vote_validate_duration: Box, - /// The duration we waited for txns before building the proposal - pub proposal_wait_duration: Box, - /// The duration to build the proposal - pub proposal_build_duration: Box, - /// The duration of each view, in seconds - pub view_duration: Box, + // The duration to collect votes in a view (only applies when this insance is the leader) + // pub vote_validate_duration: Box, + // The duration we waited for txns before building the proposal + // pub proposal_wait_duration: Box, + // The duration to build the proposal + // pub proposal_build_duration: Box, + // The duration of each view, in seconds + // pub view_duration: Box, /// Number of views that are in-flight since the last committed view pub number_of_views_since_last_commit: Box, /// Number of views that are in-flight since the last anchor view @@ -108,16 +109,6 @@ pub struct ConsensusMetricsValue { pub outstanding_transactions_memory_size: Box, /// Number of views that timed out pub number_of_timeouts: Box, - /// Total direct messages this node sent out - pub outgoing_direct_messages: Box, - /// Total broadcasts sent - pub outgoing_broadcast_messages: Box, - /// Total messages received - pub direct_messages_received: Box, - /// Total broadcast messages received - pub broadcast_messages_received: Box, - // Total number of messages which couldn't be sent - // pub failed_to_send_messages: Box, } /// The wrapper with a string name for the networking metrics @@ -130,7 +121,7 @@ pub struct ConsensusMetrics { } /// the set of counters and gauges for the networking metrics -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Display)] pub struct InnerConsensusMetrics { /// All the counters of the networking metrics counters: HashMap, @@ -248,20 +239,6 @@ impl ConsensusMetricsValue { last_synced_block_height: metrics .create_gauge(String::from("last_synced_block_height"), None), current_view: metrics.create_gauge(String::from("current_view"), None), - vote_validate_duration: metrics.create_histogram( - String::from("vote_validate_duration"), - Some(String::from("seconds")), - ), - proposal_build_duration: metrics.create_histogram( - String::from("proposal_build_duration"), - Some(String::from("seconds")), - ), - proposal_wait_duration: metrics.create_histogram( - String::from("proposal_wait_duration"), - Some(String::from("seconds")), - ), - view_duration: metrics - .create_histogram(String::from("view_duration"), Some(String::from("seconds"))), number_of_views_since_last_commit: metrics .create_gauge(String::from("number_of_views_since_last_commit"), None), number_of_views_per_decide_event: metrics @@ -277,14 +254,6 @@ impl ConsensusMetricsValue { .create_gauge(String::from("outstanding_transactions"), None), outstanding_transactions_memory_size: metrics .create_gauge(String::from("outstanding_transactions_memory_size"), None), - outgoing_direct_messages: metrics - .create_counter(String::from("outgoing_direct_messages"), None), - outgoing_broadcast_messages: metrics - .create_counter(String::from("outgoing_broadcast_messages"), None), - direct_messages_received: metrics - .create_counter(String::from("direct_messages_received"), None), - broadcast_messages_received: metrics - .create_counter(String::from("broadcast_messages_received"), None), number_of_timeouts: metrics .create_counter(String::from("number_of_views_timed_out"), None), } diff --git a/crates/types/src/data.rs b/crates/types/src/data.rs index 080355a195..14b180b039 100644 --- a/crates/types/src/data.rs +++ b/crates/types/src/data.rs @@ -64,6 +64,10 @@ impl ConsensusTime for ViewNumber { fn new(n: u64) -> Self { Self(n) } + /// Returen the u64 format + fn get_u64(&self) -> u64 { + self.0 + } } impl Committable for ViewNumber { diff --git a/crates/types/src/traits/state.rs b/crates/types/src/traits/state.rs index 93ff3e6603..8e6c3758ae 100644 --- a/crates/types/src/traits/state.rs +++ b/crates/types/src/traits/state.rs @@ -95,6 +95,8 @@ pub trait ConsensusTime: } /// Create a new instance of this time unit fn new(val: u64) -> Self; + /// Get the u64 format of time + fn get_u64(&self) -> u64; } /// extra functions required on state to be usable by hotshot-testing From b5d77d10727a27a5d0cdd7b5204186e87ef6018e Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 2 Oct 2023 22:52:32 -0700 Subject: [PATCH 14/18] metrics for invalid qc, rejected txs, number of views per decide --- crates/hotshot/src/lib.rs | 1 - crates/task-impls/src/consensus.rs | 9 ++++-- crates/task-impls/src/transactions.rs | 3 ++ crates/types/src/consensus.rs | 44 ++++++++++++--------------- 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index f9bda9ae4c..06dd78bfc5 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -224,7 +224,6 @@ impl> SystemContext { locked_view: anchored_leaf.get_view_number(), high_qc: anchored_leaf.get_justify_qc(), metrics: consensus_metrics.clone(), - invalid_qc: 0, }; let consensus = Arc::new(RwLock::new(consensus)); let txns = consensus.read().await.get_transactions(); diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index c5bbc55933..8656230761 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -539,6 +539,7 @@ where }); let consensus = self.consensus.read().await; consensus.metrics.current_view.set(*self.cur_view as usize); + consensus.metrics.number_of_views_since_last_decide.set((*self.cur_view as usize) - (consensus.last_decided_view.get_u64() as usize)); return true; } @@ -624,7 +625,7 @@ where .is_valid_cert(&justify_qc, parent_commitment) { error!("Invalid justify_qc in proposal!. parent commitment is {:?} justify qc is {:?}", parent_commitment, justify_qc.clone()); - + consensus.metrics.invalid_qc.update(1); message = self.quorum_exchange.create_no_message::( justify_qc_commitment, leaf_commitment, @@ -822,7 +823,11 @@ where .collect_garbage(old_anchor_view, new_anchor_view) .await; consensus.last_decided_view = new_anchor_view; - consensus.invalid_qc = 0; + consensus.metrics.invalid_qc.set(0); + consensus.metrics.last_decided_view.set(consensus.last_decided_view.get_u64() as usize); + let cur_number_of_views_per_decide_event = *self.cur_view - consensus.last_decided_view.get_u64(); + consensus.metrics.number_of_views_per_decide_event.add_point(cur_number_of_views_per_decide_event as f64); + // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index 8609708653..ee14e54d9c 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -116,6 +116,9 @@ where warn!("Conversion failed: {e}. Using the max value."); i64::MAX })); + } else { + // it's more like the calculation of duplicate transactions + consensus.metrics.rejected_transactions.add(1); } } }) diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index de5921ce53..5e9436ae9b 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -67,10 +67,6 @@ pub struct Consensus> { /// A reference to the metrics trait #[debug(skip)] pub metrics: Arc, - - /// Amount of invalid QCs we've seen since the last commit - /// Used for metrics. This resets to 0 on every decide event. - pub invalid_qc: usize, } /// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces @@ -81,6 +77,8 @@ pub struct ConsensusMetricsValue { pub values: Arc>, /// The number of last synced synced block height pub last_synced_block_height: Box, + /// The number of last decided view + pub last_decided_view: Box, /// The current view pub current_view: Box, // The duration to collect votes in a view (only applies when this insance is the leader) @@ -91,17 +89,17 @@ pub struct ConsensusMetricsValue { // pub proposal_build_duration: Box, // The duration of each view, in seconds // pub view_duration: Box, - /// Number of views that are in-flight since the last committed view - pub number_of_views_since_last_commit: Box, + /// Number of views that are in-flight since the last decided view + pub number_of_views_since_last_decide: Box, /// Number of views that are in-flight since the last anchor view pub number_of_views_per_decide_event: Box, - /// Number of invalid QCs between anchors - pub invalid_qc_views: Box, - /// Number of views that were discarded since from one achor to the next - pub discarded_views_per_decide_event: Box, - /// Views where no proposal was seen from one anchor to the next - pub empty_views_per_decide_event: Box, - /// Number of rejected transactions + /// Number of invalid QCs we've seen since the last commit. + pub invalid_qc: Box, + // Number of views that were discarded since from one anchor to the next + // pub discarded_views_per_decide_event: Box, + // Views where no proposal was seen from one anchor to the next + // pub empty_views_per_decide_event: Box, + /// Number of rejected transactions, it's more like duplicated transactions in current implementation pub rejected_transactions: Box, /// Number of outstanding transactions pub outstanding_transactions: Box, @@ -124,13 +122,13 @@ pub struct ConsensusMetrics { #[derive(Clone, Debug, Default, Display)] pub struct InnerConsensusMetrics { /// All the counters of the networking metrics - counters: HashMap, + pub counters: HashMap, /// All the gauges of the networking metrics - gauges: HashMap, + pub gauges: HashMap, /// All the histograms of the networking metrics - histograms: HashMap>, + pub histograms: HashMap>, /// All the labels of the networking metrics - labels: HashMap, + pub labels: HashMap, } impl ConsensusMetrics { @@ -238,16 +236,14 @@ impl ConsensusMetricsValue { values, last_synced_block_height: metrics .create_gauge(String::from("last_synced_block_height"), None), + last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None), current_view: metrics.create_gauge(String::from("current_view"), None), - number_of_views_since_last_commit: metrics - .create_gauge(String::from("number_of_views_since_last_commit"), None), + number_of_views_since_last_decide: metrics + .create_gauge(String::from("number_of_views_since_last_decide"), None), number_of_views_per_decide_event: metrics .create_histogram(String::from("number_of_views_per_decide_event"), None), - invalid_qc_views: metrics.create_histogram(String::from("invalid_qc_views"), None), - discarded_views_per_decide_event: metrics - .create_histogram(String::from("discarded_views_per_decide_event"), None), - empty_views_per_decide_event: metrics - .create_histogram(String::from("empty_views_per_decide_event"), None), + invalid_qc: metrics. + create_gauge(String::from("invalid_qc"), None), rejected_transactions: metrics .create_counter(String::from("rejected_transactions"), None), outstanding_transactions: metrics From aacd9accff959aee60410b54ef5550bfe10335a9 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 2 Oct 2023 23:15:55 -0700 Subject: [PATCH 15/18] fix lint --- .../src/traits/networking/libp2p_network.rs | 10 +++++-- .../src/traits/networking/memory_network.rs | 10 +++++-- crates/task-impls/src/consensus.rs | 26 +++++++++++++------ crates/types/src/consensus.rs | 3 +-- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 1a5550ef82..c7259ec23f 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -706,7 +706,10 @@ impl ConnectedNetwork for Libp2p .drain_at_least_one() .await .map_err(|_x| NetworkError::ShutDown)?; - self.inner.metrics.incoming_direct_message_count.add(result.len()); + self.inner + .metrics + .incoming_direct_message_count + .add(result.len()); Ok(result) } TransmitType::Broadcast => { @@ -716,7 +719,10 @@ impl ConnectedNetwork for Libp2p .drain_at_least_one() .await .map_err(|_x| NetworkError::ShutDown)?; - self.inner.metrics.incoming_direct_message_count.add(result.len()); + self.inner + .metrics + .incoming_direct_message_count + .add(result.len()); Ok(result) } } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index cd13aab788..5fa6530680 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -423,7 +423,10 @@ impl ConnectedNetwork for Memory self.inner .in_flight_message_count .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner.metrics.incoming_direct_message_count.add(ret.len()); + self.inner + .metrics + .incoming_direct_message_count + .add(ret.len()); Ok(ret) } TransmitType::Broadcast => { @@ -438,7 +441,10 @@ impl ConnectedNetwork for Memory self.inner .in_flight_message_count .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner.metrics.incoming_broadcast_message_count.add(ret.len()); + self.inner + .metrics + .incoming_broadcast_message_count + .add(ret.len()); Ok(ret) } } diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 8656230761..eae7854352 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -538,8 +538,14 @@ where } }); let consensus = self.consensus.read().await; - consensus.metrics.current_view.set(*self.cur_view as usize); - consensus.metrics.number_of_views_since_last_decide.set((*self.cur_view as usize) - (consensus.last_decided_view.get_u64() as usize)); + consensus + .metrics + .current_view + .set(usize::try_from(self.cur_view.get_u64()).unwrap()); + consensus.metrics.number_of_views_since_last_decide.set( + usize::try_from(self.cur_view.get_u64()).unwrap() + - usize::try_from(consensus.last_decided_view.get_u64()).unwrap(), + ); return true; } @@ -824,10 +830,15 @@ where .await; consensus.last_decided_view = new_anchor_view; consensus.metrics.invalid_qc.set(0); - consensus.metrics.last_decided_view.set(consensus.last_decided_view.get_u64() as usize); - let cur_number_of_views_per_decide_event = *self.cur_view - consensus.last_decided_view.get_u64(); - consensus.metrics.number_of_views_per_decide_event.add_point(cur_number_of_views_per_decide_event as f64); - + consensus.metrics.last_decided_view.set( + usize::try_from(consensus.last_decided_view.get_u64()).unwrap(), + ); + let cur_number_of_views_per_decide_event = + *self.cur_view - consensus.last_decided_view.get_u64(); + consensus + .metrics + .number_of_views_per_decide_event + .add_point(cur_number_of_views_per_decide_event as f64); // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { @@ -1056,7 +1067,6 @@ where .await; debug!("View changed to {}", *new_view); - // ED Need to update the view here? What does otherwise? // self.update_view(qc.view_number + 1).await; @@ -1073,7 +1083,7 @@ where "Failed to publish proposal on view change. View = {:?}", self.cur_view ); - } + } } SequencingHotShotEvent::Timeout(view) => { // The view sync module will handle updating views in the case of timeout diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 5e9436ae9b..cdf4db9d85 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -242,8 +242,7 @@ impl ConsensusMetricsValue { .create_gauge(String::from("number_of_views_since_last_decide"), None), number_of_views_per_decide_event: metrics .create_histogram(String::from("number_of_views_per_decide_event"), None), - invalid_qc: metrics. - create_gauge(String::from("invalid_qc"), None), + invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None), rejected_transactions: metrics .create_counter(String::from("rejected_transactions"), None), outstanding_transactions: metrics From 03c6bd00e59d48c3fa19740bea2defe434b56dae Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Mon, 2 Oct 2023 23:27:47 -0700 Subject: [PATCH 16/18] fix lint --- crates/testing/src/task_helpers.rs | 2 +- crates/testing/tests/memory_network.rs | 54 +++++++++++++++++++------- crates/types/src/consensus.rs | 2 - 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/crates/testing/src/task_helpers.rs b/crates/testing/src/task_helpers.rs index 2e935dab61..17be6f7dfc 100644 --- a/crates/testing/src/task_helpers.rs +++ b/crates/testing/src/task_helpers.rs @@ -13,8 +13,8 @@ use hotshot::{ use hotshot_task::event_stream::ChannelStream; use hotshot_task_impls::events::SequencingHotShotEvent; use hotshot_types::{ - consensus::ConsensusMetricsValue, block_impl::{VIDBlockPayload, NUM_CHUNKS, NUM_STORAGE_NODES}, + consensus::ConsensusMetricsValue, data::{QuorumProposal, SequencingLeaf, VidScheme, ViewNumber}, message::{Message, Proposal}, traits::{ diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs index d1a231a2c0..74acdcd85b 100644 --- a/crates/testing/tests/memory_network.rs +++ b/crates/testing/tests/memory_network.rs @@ -8,7 +8,7 @@ use hotshot::traits::election::static_committee::{ GeneralStaticCommittee, StaticElectionConfig, StaticVoteToken, }; use hotshot::traits::implementations::{ - MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, + MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, NetworkingMetricsValue, }; use hotshot::traits::NodeImplementation; use hotshot::types::bn254::{BLSPrivKey, BLSPubKey}; @@ -18,7 +18,6 @@ use hotshot_types::certificate::ViewSyncCertificate; use hotshot_types::data::{DAProposal, QuorumProposal, SequencingLeaf}; use hotshot_types::message::{Message, SequencingMessage}; use hotshot_types::traits::election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}; -use hotshot_types::traits::metrics::NoMetrics; use hotshot_types::traits::network::TestableNetworkingImplementation; use hotshot_types::traits::network::{ConnectedNetwork, TransmitType}; use hotshot_types::traits::node_implementation::{ChannelMaps, NodeType, SequencingExchanges}; @@ -168,8 +167,7 @@ async fn memory_network_spawn_single() { let group: Arc, ::SignatureKey>> = MasterMap::new(); trace!(?group); - let pub_key = get_pubkey(); - let _network = MemoryNetwork::new(pub_key, NoMetrics::boxed(), group, Option::None); + let _pub_key = get_pubkey(); } // // Spawning a two MemoryNetworks and connecting them should produce no errors @@ -184,10 +182,8 @@ async fn memory_network_spawn_double() { let group: Arc, ::SignatureKey>> = MasterMap::new(); trace!(?group); - let pub_key_1 = get_pubkey(); - let _network_1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); - let pub_key_2 = get_pubkey(); - let _network_2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let _pub_key_1 = get_pubkey(); + let _pub_key_2 = get_pubkey(); } // Check to make sure direct queue works @@ -207,10 +203,20 @@ async fn memory_network_direct_queue() { trace!(?group); let pub_key_1 = get_pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let network1 = MemoryNetwork::new( + pub_key_1, + NetworkingMetricsValue::new(), + group.clone(), + Option::None, + ); let pub_key_2 = get_pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let network2 = MemoryNetwork::new( + pub_key_2, + NetworkingMetricsValue::new(), + group, + Option::None, + ); let first_messages: Vec> = gen_messages(5, 100, pub_key_1); @@ -263,9 +269,19 @@ async fn memory_network_broadcast_queue() { MasterMap::new(); trace!(?group); let pub_key_1 = get_pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let network1 = MemoryNetwork::new( + pub_key_1, + NetworkingMetricsValue::new(), + group.clone(), + Option::None, + ); let pub_key_2 = get_pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let network2 = MemoryNetwork::new( + pub_key_2, + NetworkingMetricsValue::new(), + group, + Option::None, + ); let first_messages: Vec> = gen_messages(5, 100, pub_key_1); @@ -324,9 +340,19 @@ async fn memory_network_test_in_flight_message_count() { MasterMap::new(); trace!(?group); let pub_key_1 = get_pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let network1 = MemoryNetwork::new( + pub_key_1, + NetworkingMetricsValue::new(), + group.clone(), + Option::None, + ); let pub_key_2 = get_pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let network2 = MemoryNetwork::new( + pub_key_2, + NetworkingMetricsValue::new(), + group, + Option::None, + ); // Create some dummy messages let messages: Vec> = gen_messages(5, 100, pub_key_1); diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 987d8d52af..24a51e5b6c 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -2,9 +2,7 @@ pub use crate::traits::node_implementation::ViewQueue; pub use crate::utils::{View, ViewInner}; -use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; use displaydoc::Display; -use std::collections::HashSet; use crate::utils::Terminator; use crate::{ From fe252cd191a93d4109991208c1a8460a95706612 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Tue, 10 Oct 2023 15:05:15 -0700 Subject: [PATCH 17/18] remove the metric rejected txs and some clean up --- crates/task-impls/src/transactions.rs | 3 --- crates/types/src/consensus.rs | 19 +------------------ 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index b4936bc001..c682f5f38d 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -133,9 +133,6 @@ where warn!("Conversion failed: {e}. Using the max value."); i64::MAX })); - } else { - // it's more like the calculation of duplicate transactions - consensus.metrics.rejected_transactions.add(1); } } }) diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 24a51e5b6c..896c49ae93 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -64,7 +64,6 @@ pub struct Consensus> { /// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces #[derive(Clone, Debug)] pub struct ConsensusMetricsValue { - #[allow(dead_code)] /// The values that are being tracked pub values: Arc>, /// The number of last synced synced block height @@ -73,26 +72,12 @@ pub struct ConsensusMetricsValue { pub last_decided_view: Box, /// The current view pub current_view: Box, - // The duration to collect votes in a view (only applies when this insance is the leader) - // pub vote_validate_duration: Box, - // The duration we waited for txns before building the proposal - // pub proposal_wait_duration: Box, - // The duration to build the proposal - // pub proposal_build_duration: Box, - // The duration of each view, in seconds - // pub view_duration: Box, /// Number of views that are in-flight since the last decided view pub number_of_views_since_last_decide: Box, /// Number of views that are in-flight since the last anchor view pub number_of_views_per_decide_event: Box, /// Number of invalid QCs we've seen since the last commit. pub invalid_qc: Box, - // Number of views that were discarded since from one anchor to the next - // pub discarded_views_per_decide_event: Box, - // Views where no proposal was seen from one anchor to the next - // pub empty_views_per_decide_event: Box, - /// Number of rejected transactions, it's more like duplicated transactions in current implementation - pub rejected_transactions: Box, /// Number of outstanding transactions pub outstanding_transactions: Box, /// Memory size in bytes of the serialized transactions still outstanding @@ -235,14 +220,12 @@ impl ConsensusMetricsValue { number_of_views_per_decide_event: metrics .create_histogram(String::from("number_of_views_per_decide_event"), None), invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None), - rejected_transactions: metrics - .create_counter(String::from("rejected_transactions"), None), outstanding_transactions: metrics .create_gauge(String::from("outstanding_transactions"), None), outstanding_transactions_memory_size: metrics .create_gauge(String::from("outstanding_transactions_memory_size"), None), number_of_timeouts: metrics - .create_counter(String::from("number_of_views_timed_out"), None), + .create_counter(String::from("number_of_timeouts"), None), } } } From 20187f2e4ddb2a60992397547c4ce0b530b20886 Mon Sep 17 00:00:00 2001 From: Sishan Long Date: Wed, 11 Oct 2023 11:27:18 -0700 Subject: [PATCH 18/18] solve conflict --- Cargo.lock | 6 +++--- crates/hotshot/src/traits/networking/memory_network.rs | 6 +++--- crates/task-impls/src/consensus.rs | 8 +++----- crates/types/src/consensus.rs | 3 +-- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 355e077b79..7d6eecec8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2634,7 +2634,7 @@ dependencies = [ "custom_debug", "dashmap", "derivative", - "dyn-clone 1.0.14", + "dyn-clone 1.0.14 (git+https://github.com/dtolnay/dyn-clone?tag=1.0.14)", "either", "embed-doc-image", "espresso-systems-common 0.4.1", @@ -2862,7 +2862,7 @@ dependencies = [ "derivative", "digest 0.10.7", "displaydoc", - "dyn-clone 1.0.14", + "dyn-clone 1.0.14 (git+https://github.com/dtolnay/dyn-clone?tag=1.0.14)", "either", "espresso-systems-common 0.4.1", "ethereum-types", @@ -3376,7 +3376,7 @@ dependencies = [ "derivative", "displaydoc", "downcast-rs", - "dyn-clone 1.0.13", + "dyn-clone 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.13.2", "itertools 0.10.5", "jf-utils", diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 7fd22aa274..c67d90147e 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -92,7 +92,7 @@ struct MemoryNetworkInner { /// The networking metrics we're keeping track of metrics: NetworkingMetricsValue, - + /// config to introduce unreliability to the network reliability_config: Option>>, } @@ -330,7 +330,7 @@ impl ConnectedNetwork for Memory let res = node.broadcast_input(vec.clone()).await; match res { Ok(_) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); trace!(?key, "Delivered message to remote"); } Err(e) => { @@ -374,7 +374,7 @@ impl ConnectedNetwork for Memory let res = node.direct_input(vec).await; match res { Ok(_) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); trace!(?recipient, "Delivered message to remote"); Ok(()) } diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 5f4455ddc4..cac7d4d1f2 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -816,12 +816,10 @@ where // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above if new_decide_reached { let mut leaf = leaf.clone(); - consensus - .metrics - .last_synced_block_height - .set(usize::try_from(leaf.height).unwrap_or(0)); - + .metrics + .last_synced_block_height + .set(usize::try_from(leaf.height).unwrap_or(0)); // If the full block is available for this leaf, include it in the leaf // chain that we send to the client. if let Some(block) = diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 896c49ae93..6574a73f8a 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -224,8 +224,7 @@ impl ConsensusMetricsValue { .create_gauge(String::from("outstanding_transactions"), None), outstanding_transactions_memory_size: metrics .create_gauge(String::from("outstanding_transactions_memory_size"), None), - number_of_timeouts: metrics - .create_counter(String::from("number_of_timeouts"), None), + number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None), } } }