diff --git a/Cargo.lock b/Cargo.lock index a306cd66a7..7d6eecec8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2003,6 +2003,11 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23d2f3407d9a573d666de4b5bdf10569d73ca9478087346697dcbae6244bfbcd" +[[package]] +name = "dyn-clone" +version = "1.0.14" +source = "git+https://github.com/dtolnay/dyn-clone?tag=1.0.14#cee99471c46f9f512640aa03c680a547ac72c22c" + [[package]] name = "ed25519" version = "2.2.2" @@ -2629,6 +2634,7 @@ dependencies = [ "custom_debug", "dashmap", "derivative", + "dyn-clone 1.0.14 (git+https://github.com/dtolnay/dyn-clone?tag=1.0.14)", "either", "embed-doc-image", "espresso-systems-common 0.4.1", @@ -2856,6 +2862,7 @@ dependencies = [ "derivative", "digest 0.10.7", "displaydoc", + "dyn-clone 1.0.14 (git+https://github.com/dtolnay/dyn-clone?tag=1.0.14)", "either", "espresso-systems-common 0.4.1", "ethereum-types", @@ -3369,7 +3376,7 @@ dependencies = [ "derivative", "displaydoc", "downcast-rs", - "dyn-clone", + "dyn-clone 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.13.2", "itertools 0.10.5", "jf-utils", diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index d7816208f1..77f9f63d4e 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -111,6 +111,7 @@ serde = { workspace = true, features = ["rc"] } snafu = { workspace = true } surf-disco = { workspace = true } time = { workspace = true } +dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.14" } tracing = { workspace = true } typenum = { workspace = true } diff --git a/crates/hotshot/examples/infra/modDA.rs b/crates/hotshot/examples/infra/modDA.rs index ef188a7849..a06f023159 100644 --- a/crates/hotshot/examples/infra/modDA.rs +++ b/crates/hotshot/examples/infra/modDA.rs @@ -7,7 +7,8 @@ use futures::StreamExt; use hotshot::{ traits::{ implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryStorage, WebCommChannel, WebServerNetwork, + Libp2pCommChannel, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue, + WebCommChannel, WebServerNetwork, }, NodeImplementation, }, @@ -23,6 +24,7 @@ use hotshot_task::task::FilterEvent; use hotshot_types::{ block_impl::{VIDBlockPayload, VIDTransaction}, certificate::ViewSyncCertificate, + consensus::ConsensusMetricsValue, data::{QuorumProposal, SequencingLeaf, TestableLeaf}, event::{Event, EventType}, message::{Message, SequencingMessage}, @@ -30,7 +32,6 @@ use hotshot_types::{ election::{ CommitteeExchange, ConsensusExchange, Membership, QuorumExchange, ViewSyncExchange, }, - metrics::NoMetrics, network::CommunicationChannel, node_implementation::{ CommitteeEx, ExchangesType, NodeType, QuorumEx, SequencingExchanges, @@ -249,7 +250,7 @@ pub trait RunDA< MemoryStorage::empty(), exchanges, initializer, - NoMetrics::boxed(), + ConsensusMetricsValue::new(), ) .await .expect("Could not init hotshot") @@ -690,7 +691,7 @@ where let node_config = config_builder.build().unwrap(); let underlying_quorum_network = Libp2pNetwork::new( - NoMetrics::boxed(), + NetworkingMetricsValue::new(), node_config, pubkey.clone(), Arc::new(RwLock::new( diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index 4c59597bdd..ae6e438147 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -57,7 +57,7 @@ use hotshot_task_impls::{events::SequencingHotShotEvent, network::NetworkTaskKin use hotshot_types::{ block_impl::{VIDBlockPayload, VIDTransaction}, certificate::{DACertificate, ViewSyncCertificate}, - consensus::{BlockStore, Consensus, ConsensusMetrics, View, ViewInner, ViewQueue}, + consensus::{BlockStore, Consensus, ConsensusMetricsValue, View, ViewInner, ViewQueue}, data::{DAProposal, DeltasType, LeafType, QuorumProposal, SequencingLeaf}, error::StorageSnafu, message::{ @@ -67,7 +67,6 @@ use hotshot_types::{ traits::{ consensus_api::{ConsensusSharedApi, SequencingConsensusApi}, election::{ConsensusExchange, Membership, SignedCertificate}, - metrics::Metrics, network::{CommunicationChannel, NetworkError}, node_implementation::{ ChannelMaps, CommitteeEx, ExchangesType, NodeType, SendToTasks, SequencingQuorumEx, @@ -129,8 +128,8 @@ pub struct SystemContextInner> { /// Sender for [`Event`]s event_sender: RwLock>>>, - /// a reference to the metrics that the implementor is using. - _metrics: Box, + /// the metrics that the implementor is using. + _metrics: Arc, /// The hotstuff implementation consensus: Arc>>, @@ -174,13 +173,11 @@ impl> SystemContext { storage: I::Storage, exchanges: I::Exchanges, initializer: HotShotInitializer, - metrics: Box, + metrics: ConsensusMetricsValue, ) -> Result> { debug!("Creating a new hotshot"); - let consensus_metrics = Arc::new(ConsensusMetrics::new( - &*metrics.subgroup("consensus".to_string()), - )); + let consensus_metrics = Arc::new(metrics); let anchored_leaf = initializer.inner; // insert to storage @@ -219,8 +216,7 @@ impl> SystemContext { // https://github.com/EspressoSystems/HotShot/issues/560 locked_view: anchored_leaf.get_view_number(), high_qc: anchored_leaf.get_justify_qc(), - metrics: consensus_metrics, - invalid_qc: 0, + metrics: consensus_metrics.clone(), }; let consensus = Arc::new(RwLock::new(consensus)); @@ -234,7 +230,7 @@ impl> SystemContext { storage, exchanges: Arc::new(exchanges), event_sender: RwLock::default(), - _metrics: metrics, + _metrics: consensus_metrics.clone(), internal_event_stream: ChannelStream::new(), output_event_stream: ChannelStream::new(), }); @@ -393,7 +389,7 @@ impl> SystemContext { storage: I::Storage, exchanges: I::Exchanges, initializer: HotShotInitializer, - metrics: Box, + metrics: ConsensusMetricsValue, ) -> Result< ( SystemContextHandle, diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index e29c33a999..4452420213 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -17,6 +17,7 @@ pub mod implementations { libp2p_network::{Libp2pCommChannel, Libp2pNetwork, PeerInfoVec}, memory_network::{MasterMap, MemoryCommChannel, MemoryNetwork}, web_server_network::{WebCommChannel, WebServerNetwork}, + NetworkingMetricsValue, }, storage::memory_storage::MemoryStorage, // atomic_storage::AtomicStorage, }; diff --git a/crates/hotshot/src/traits/networking.rs b/crates/hotshot/src/traits/networking.rs index 357e7a4da3..a14e4ead54 100644 --- a/crates/hotshot/src/traits/networking.rs +++ b/crates/hotshot/src/traits/networking.rs @@ -9,23 +9,34 @@ pub mod combined_network; pub mod libp2p_network; pub mod memory_network; pub mod web_server_network; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; +use custom_debug::Debug; +use hotshot_types::traits::metrics::{Counter, Gauge, Histogram, Label, Metrics}; pub use hotshot_types::traits::network::{ ChannelSendSnafu, CouldNotDeliverSnafu, FailedToDeserializeSnafu, FailedToSerializeSnafu, NetworkError, NetworkReliability, NoSuchNodeSnafu, ShutDownSnafu, }; -use hotshot_types::traits::metrics::{Counter, Gauge, Metrics}; - -/// Contains the metrics that we're interested in from the networking interfaces -pub struct NetworkingMetrics { +/// Contains several `NetworkingMetrics` that we're interested in from the networking interfaces +#[derive(Clone, Debug)] +pub struct NetworkingMetricsValue { #[allow(dead_code)] + /// The values that are being tracked + pub values: Arc>, /// A [`Gauge`] which tracks how many peers are connected pub connected_peers: Box, - /// A [`Counter`] which tracks how many messages have been received - pub incoming_message_count: Box, - /// A [`Counter`] which tracks how many messages have been send - pub outgoing_message_count: Box, + /// A [`Counter`] which tracks how many messages have been received directly + pub incoming_direct_message_count: Box, + /// A [`Counter`] which tracks how many messages have been received by broadcast + pub incoming_broadcast_message_count: Box, + /// A [`Counter`] which tracks how many messages have been send directly + pub outgoing_direct_message_count: Box, + /// A [`Counter`] which tracks how many messages have been send by broadcast + pub outgoing_broadcast_message_count: Box, /// A [`Counter`] which tracks how many messages failed to send pub message_failed_to_send: Box, // A [`Gauge`] which tracks how many connected entries there are in the gossipsub mesh @@ -36,21 +47,147 @@ pub struct NetworkingMetrics { // pub kademlia_buckets: Box, } +/// The wrapper with a string name for the networking metrics +#[derive(Clone, Debug)] +pub struct NetworkingMetrics { + /// a prefix which tracks the name of the metric + prefix: String, + /// a map of values + values: Arc>, +} + +/// the set of counters and gauges for the networking metrics +#[derive(Clone, Debug, Default)] +pub struct InnerNetworkingMetrics { + /// All the counters of the networking metrics + counters: HashMap, + /// All the gauges of the networking metrics + gauges: HashMap, + /// All the histograms of the networking metrics + histograms: HashMap>, + /// All the labels of the networking metrics + labels: HashMap, +} + impl NetworkingMetrics { - /// Create a new instance of this [`NetworkingMetrics`] struct, setting all the counters and gauges - pub(self) fn new(metrics: &dyn Metrics) -> Self { + /// For the creation and naming of gauge, counter, histogram and label. + pub fn sub(&self, name: String) -> Self { + let prefix = if self.prefix.is_empty() { + name + } else { + format!("{}-{name}", self.prefix) + }; + Self { + prefix, + values: Arc::clone(&self.values), + } + } +} + +impl Metrics for NetworkingMetrics { + fn create_counter(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_gauge(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_histogram(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_label(&self, label: String) -> Box { + Box::new(self.sub(label)) + } + + fn subgroup(&self, subgroup_name: String) -> Box { + Box::new(self.sub(subgroup_name)) + } +} + +impl Counter for NetworkingMetrics { + fn add(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .counters + .entry(self.prefix.clone()) + .or_default() += amount; + } +} + +impl Gauge for NetworkingMetrics { + fn set(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .gauges + .entry(self.prefix.clone()) + .or_default() = amount; + } + fn update(&self, delta: i64) { + let mut values = self.values.lock().unwrap(); + let value = values.gauges.entry(self.prefix.clone()).or_default(); + let signed_value = i64::try_from(*value).unwrap_or(i64::MAX); + *value = usize::try_from(signed_value + delta).unwrap_or(0); + } +} + +impl Histogram for NetworkingMetrics { + fn add_point(&self, point: f64) { + self.values + .lock() + .unwrap() + .histograms + .entry(self.prefix.clone()) + .or_default() + .push(point); + } +} + +impl Label for NetworkingMetrics { + fn set(&self, value: String) { + *self + .values + .lock() + .unwrap() + .labels + .entry(self.prefix.clone()) + .or_default() = value; + } +} + +impl NetworkingMetricsValue { + /// Create a new instance of this [`NetworkingMetricsValue`] struct, setting all the counters and gauges + #[must_use] + pub fn new() -> Self { + let values = Arc::default(); + let metrics: Box = Box::new(NetworkingMetrics { + prefix: String::new(), + values: Arc::clone(&values), + }); Self { + values, connected_peers: metrics.create_gauge(String::from("connected_peers"), None), - incoming_message_count: metrics - .create_counter(String::from("incoming_message_count"), None), - outgoing_message_count: metrics - .create_counter(String::from("outgoing_message_count"), None), + incoming_direct_message_count: metrics + .create_counter(String::from("incoming_direct_message_count"), None), + incoming_broadcast_message_count: metrics + .create_counter(String::from("incoming_broadcast_message_count"), None), + outgoing_direct_message_count: metrics + .create_counter(String::from("outgoing_direct_message_count"), None), + outgoing_broadcast_message_count: metrics + .create_counter(String::from("outgoing_broadcast_message_count"), None), message_failed_to_send: metrics .create_counter(String::from("message_failed_to_send"), None), - // gossipsub_mesh_connected: metrics - // .create_gauge(String::from("gossipsub_mesh_connected"), None), - // kademlia_entries: metrics.create_gauge(String::from("kademlia_entries"), None), - // kademlia_buckets: metrics.create_gauge(String::from("kademlia_buckets"), None), } } } + +impl Default for NetworkingMetricsValue { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 52ed2b73bc..20e6718955 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -1,8 +1,7 @@ //! Libp2p based/production networking implementation //! This module provides a libp2p based networking implementation where each node in the //! network forms a tcp or udp connection to a subset of other nodes in the network - -use super::NetworkingMetrics; +use super::NetworkingMetricsValue; use crate::NodeImplementation; use async_compatibility_layer::{ art::{async_block_on, async_sleep, async_spawn}, @@ -19,7 +18,6 @@ use hotshot_types::{ message::{Message, MessageKind}, traits::{ election::Membership, - metrics::{Metrics, NoMetrics}, network::{ CommunicationChannel, ConnectedNetwork, ConsensusIntentEvent, FailedToSerializeSnafu, NetworkError, NetworkMsg, TestableChannelImplementation, @@ -78,6 +76,7 @@ impl Debug for Libp2pNetwork { pub type PeerInfoVec = Arc, Multiaddr)>>>; /// The underlying state of the libp2p network +#[derive(Debug)] struct Libp2pNetworkInner { /// this node's public key pk: K, @@ -105,7 +104,7 @@ struct Libp2pNetworkInner { /// whether or not we've bootstrapped into the DHT yet is_bootstrapped: Arc, /// The networking metrics we're keeping track of - metrics: NetworkingMetrics, + metrics: NetworkingMetricsValue, /// topic map /// hash(hashset) -> topic /// btreemap ordered so is hashable @@ -226,7 +225,7 @@ where let da = da_keys.clone(); async_block_on(async move { Libp2pNetwork::new( - NoMetrics::boxed(), + NetworkingMetricsValue::new(), config, pubkey, bootstrap_addrs_ref, @@ -273,7 +272,7 @@ impl Libp2pNetwork { /// This will panic if there are less than 5 bootstrap nodes #[allow(clippy::too_many_arguments)] pub async fn new( - metrics: Box, + metrics: NetworkingMetricsValue, config: NetworkNodeConfig, pk: K, bootstrap_addrs: Arc, Multiaddr)>>>, @@ -317,7 +316,7 @@ impl Libp2pNetwork { let (broadcast_send, broadcast_recv) = unbounded(); let (node_lookup_send, node_lookup_recv) = unbounded(); - let result = Libp2pNetwork { + let mut result = Libp2pNetwork { inner: Arc::new(Libp2pNetworkInner { handle: network_handle, broadcast_recv, @@ -330,7 +329,7 @@ impl Libp2pNetwork { is_ready: Arc::new(AtomicBool::new(false)), dht_timeout: Duration::from_secs(30), is_bootstrapped: Arc::new(AtomicBool::new(false)), - metrics: NetworkingMetrics::new(&*metrics), + metrics, topic_map, node_lookup_send, // Start the latest view from 0. "Latest" refers to "most recent view we are polling for @@ -376,13 +375,14 @@ impl Libp2pNetwork { } /// Initiates connection to the outside world - fn spawn_connect(&self, id: usize) { + fn spawn_connect(&mut self, id: usize) { let pk = self.inner.pk.clone(); let bootstrap_ref = self.inner.bootstrap_addrs.clone(); let num_bootstrap = self.inner.bootstrap_addrs_len; let handle = self.inner.handle.clone(); let is_bootstrapped = self.inner.is_bootstrapped.clone(); let node_type = self.inner.handle.config().node_type; + let metrics_connected_peers = self.inner.clone(); async_spawn({ let is_ready = self.inner.is_ready.clone(); async move { @@ -411,6 +411,12 @@ impl Libp2pNetwork { .await .unwrap(); + let connected_num = handle.num_connected().await?; + metrics_connected_peers + .metrics + .connected_peers + .set(connected_num); + while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; } @@ -592,7 +598,7 @@ impl ConnectedNetwork for Libp2p match self.inner.handle.gossip(topic, &message).await { Ok(()) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); Ok(()) } Err(e) => { @@ -640,7 +646,7 @@ impl ConnectedNetwork for Libp2p match self.inner.handle.direct_request(pid, &message).await { Ok(()) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); Ok(()) } Err(e) => { @@ -671,7 +677,10 @@ impl ConnectedNetwork for Libp2p .drain_at_least_one() .await .map_err(|_x| NetworkError::ShutDown)?; - self.inner.metrics.incoming_message_count.add(result.len()); + self.inner + .metrics + .incoming_direct_message_count + .add(result.len()); Ok(result) } TransmitType::Broadcast => { @@ -681,7 +690,10 @@ impl ConnectedNetwork for Libp2p .drain_at_least_one() .await .map_err(|_x| NetworkError::ShutDown)?; - self.inner.metrics.incoming_message_count.add(result.len()); + self.inner + .metrics + .incoming_direct_message_count + .add(result.len()); Ok(result) } } diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index 9d9e751ee5..c67d90147e 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -3,7 +3,7 @@ //! This module provides an in-memory only simulation of an actual network, useful for unit and //! integration tests. -use super::{FailedToSerializeSnafu, NetworkError, NetworkReliability, NetworkingMetrics}; +use super::{FailedToSerializeSnafu, NetworkError, NetworkReliability, NetworkingMetricsValue}; use crate::NodeImplementation; use async_compatibility_layer::{ art::async_spawn, @@ -19,7 +19,6 @@ use hotshot_types::{ message::{Message, MessageKind}, traits::{ election::Membership, - metrics::{Metrics, NoMetrics}, network::{ CommunicationChannel, ConnectedNetwork, NetworkMsg, TestableChannelImplementation, TestableNetworkingImplementation, TransmitType, ViewMessage, @@ -75,6 +74,7 @@ enum Combo { } /// Internal state for a `MemoryNetwork` instance +#[derive(Debug)] struct MemoryNetworkInner { /// Input for broadcast messages broadcast_input: RwLock>>>, @@ -91,7 +91,7 @@ struct MemoryNetworkInner { in_flight_message_count: AtomicUsize, /// The networking metrics we're keeping track of - metrics: NetworkingMetrics, + metrics: NetworkingMetricsValue, /// config to introduce unreliability to the network reliability_config: Option>>, @@ -123,7 +123,7 @@ impl MemoryNetwork { #[instrument(skip(metrics))] pub fn new( pub_key: K, - metrics: Box, + metrics: NetworkingMetricsValue, master_map: Arc>, reliability_config: Option>>, ) -> MemoryNetwork { @@ -203,7 +203,7 @@ impl MemoryNetwork { direct_output: Mutex::new(direct_output), master_map: master_map.clone(), in_flight_message_count, - metrics: NetworkingMetrics::new(&*metrics), + metrics, reliability_config, }), }; @@ -220,7 +220,7 @@ impl MemoryNetwork { .fetch_add(1, Ordering::Relaxed); let input = self.inner.broadcast_input.read().await; if let Some(input) = &*input { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); input.send(message).await } else { Err(SendError(message)) @@ -234,7 +234,7 @@ impl MemoryNetwork { .fetch_add(1, Ordering::Relaxed); let input = self.inner.direct_input.read().await; if let Some(input) = &*input { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); input.send(message).await } else { Err(SendError(message)) @@ -257,7 +257,7 @@ impl> Box::new(move |node_id| { let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1; let pubkey = TYPES::SignatureKey::from_private(&privkey); - MemoryNetwork::new(pubkey, NoMetrics::boxed(), master.clone(), None) + MemoryNetwork::new(pubkey, NetworkingMetricsValue::new(), master.clone(), None) }) } @@ -330,7 +330,7 @@ impl ConnectedNetwork for Memory let res = node.broadcast_input(vec.clone()).await; match res { Ok(_) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_broadcast_message_count.add(1); trace!(?key, "Delivered message to remote"); } Err(e) => { @@ -374,7 +374,7 @@ impl ConnectedNetwork for Memory let res = node.direct_input(vec).await; match res { Ok(_) => { - self.inner.metrics.outgoing_message_count.add(1); + self.inner.metrics.outgoing_direct_message_count.add(1); trace!(?recipient, "Delivered message to remote"); Ok(()) } @@ -418,7 +418,10 @@ impl ConnectedNetwork for Memory self.inner .in_flight_message_count .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner.metrics.incoming_message_count.add(ret.len()); + self.inner + .metrics + .incoming_direct_message_count + .add(ret.len()); Ok(ret) } TransmitType::Broadcast => { @@ -433,7 +436,10 @@ impl ConnectedNetwork for Memory self.inner .in_flight_message_count .fetch_sub(ret.len(), Ordering::Relaxed); - self.inner.metrics.incoming_message_count.add(ret.len()); + self.inner + .metrics + .incoming_broadcast_message_count + .add(ret.len()); Ok(ret) } } diff --git a/crates/task-impls/src/consensus.rs b/crates/task-impls/src/consensus.rs index 11a98de85c..cac7d4d1f2 100644 --- a/crates/task-impls/src/consensus.rs +++ b/crates/task-impls/src/consensus.rs @@ -544,6 +544,15 @@ where .await; } }); + let consensus = self.consensus.read().await; + consensus + .metrics + .current_view + .set(usize::try_from(self.cur_view.get_u64()).unwrap()); + consensus.metrics.number_of_views_since_last_decide.set( + usize::try_from(self.cur_view.get_u64()).unwrap() + - usize::try_from(consensus.last_decided_view.get_u64()).unwrap(), + ); return true; } @@ -590,7 +599,7 @@ where // TODO ED Insert TC logic here // Construct the leaf. - let justify_qc = proposal.data.justify_qc; + let justify_qc = proposal.clone().data.justify_qc; let parent = if justify_qc.is_genesis() { self.genesis_leaf().await } else { @@ -623,7 +632,7 @@ where if invalid { error!("Invalid justify_qc in proposal! parent commitment is {:?} justify qc is {:?}", parent_commitment, justify_qc.clone()); - + consensus.metrics.invalid_qc.update(1); message = self.quorum_exchange.create_no_message::( justify_qc_commitment, leaf_commitment, @@ -807,7 +816,10 @@ where // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above if new_decide_reached { let mut leaf = leaf.clone(); - + consensus + .metrics + .last_synced_block_height + .set(usize::try_from(leaf.height).unwrap_or(0)); // If the full block is available for this leaf, include it in the leaf // chain that we send to the client. if let Some(block) = @@ -884,7 +896,16 @@ where .collect_garbage(old_anchor_view, new_anchor_view) .await; consensus.last_decided_view = new_anchor_view; - consensus.invalid_qc = 0; + consensus.metrics.invalid_qc.set(0); + consensus.metrics.last_decided_view.set( + usize::try_from(consensus.last_decided_view.get_u64()).unwrap(), + ); + let cur_number_of_views_per_decide_event = + *self.cur_view - consensus.last_decided_view.get_u64(); + consensus + .metrics + .number_of_views_per_decide_event + .add_point(cur_number_of_views_per_decide_event as f64); // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { @@ -1033,7 +1054,6 @@ where let mut consensus = self.consensus.write().await; consensus.high_qc = qc.clone(); - drop(consensus); // View may have already been updated by replica if they voted for this QC @@ -1139,6 +1159,8 @@ where "We received a timeout event in the consensus task for view {}!", *view ); + let consensus = self.consensus.read().await; + consensus.metrics.number_of_timeouts.add(1); } SequencingHotShotEvent::SendDABlockData(block) => { // ED TODO Should make sure this is actually the most recent block diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index 1100f4e078..5aadf0e162 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -646,7 +646,6 @@ where self.event_stream .publish(SequencingHotShotEvent::SendDABlockData(block.clone())) .await; - self.event_stream .publish(SequencingHotShotEvent::DAProposalSend( message.clone(), diff --git a/crates/testing/src/task_helpers.rs b/crates/testing/src/task_helpers.rs index f4e7fe49a8..17be6f7dfc 100644 --- a/crates/testing/src/task_helpers.rs +++ b/crates/testing/src/task_helpers.rs @@ -14,12 +14,12 @@ use hotshot_task::event_stream::ChannelStream; use hotshot_task_impls::events::SequencingHotShotEvent; use hotshot_types::{ block_impl::{VIDBlockPayload, NUM_CHUNKS, NUM_STORAGE_NODES}, + consensus::ConsensusMetricsValue, data::{QuorumProposal, SequencingLeaf, VidScheme, ViewNumber}, message::{Message, Proposal}, traits::{ consensus_api::ConsensusSharedApi, election::{ConsensusExchange, Membership, SignedCertificate}, - metrics::NoMetrics, node_implementation::{CommitteeEx, ExchangesType, NodeType, QuorumEx}, signature_key::EncodedSignature, state::{ConsensusTime, TestableBlock}, @@ -82,7 +82,7 @@ pub async fn build_system_handle( storage, exchanges, initializer, - NoMetrics::boxed(), + ConsensusMetricsValue::new(), ) .await .expect("Could not init hotshot") diff --git a/crates/testing/src/test_runner.rs b/crates/testing/src/test_runner.rs index 056f0f8a26..cf6932a390 100644 --- a/crates/testing/src/test_runner.rs +++ b/crates/testing/src/test_runner.rs @@ -12,10 +12,10 @@ use hotshot_task::{ event_stream::ChannelStream, global_registry::GlobalRegistry, task_launcher::TaskRunner, }; use hotshot_types::{ + consensus::ConsensusMetricsValue, message::Message, traits::{ election::{ConsensusExchange, Membership}, - metrics::NoMetrics, network::CommunicationChannel, node_implementation::{ExchangesType, NodeType, QuorumCommChannel, QuorumEx}, signature_key::SignatureKey, @@ -276,7 +276,7 @@ where storage, exchanges, initializer, - NoMetrics::boxed(), + ConsensusMetricsValue::new(), ) .await .expect("Could not init hotshot") diff --git a/crates/testing/tests/memory_network.rs b/crates/testing/tests/memory_network.rs index 27f6ae6531..a4ee4282e7 100644 --- a/crates/testing/tests/memory_network.rs +++ b/crates/testing/tests/memory_network.rs @@ -8,7 +8,7 @@ use hotshot::traits::election::static_committee::{ GeneralStaticCommittee, StaticElectionConfig, StaticVoteToken, }; use hotshot::traits::implementations::{ - MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, + MasterMap, MemoryCommChannel, MemoryNetwork, MemoryStorage, NetworkingMetricsValue, }; use hotshot::traits::NodeImplementation; use hotshot::types::bn254::{BLSPrivKey, BLSPubKey}; @@ -18,7 +18,6 @@ use hotshot_types::certificate::ViewSyncCertificate; use hotshot_types::data::{DAProposal, QuorumProposal, SequencingLeaf}; use hotshot_types::message::{Message, SequencingMessage}; use hotshot_types::traits::election::{CommitteeExchange, QuorumExchange, ViewSyncExchange}; -use hotshot_types::traits::metrics::NoMetrics; use hotshot_types::traits::network::TestableNetworkingImplementation; use hotshot_types::traits::network::{ConnectedNetwork, TransmitType}; use hotshot_types::traits::node_implementation::{ChannelMaps, NodeType, SequencingExchanges}; @@ -168,8 +167,7 @@ async fn memory_network_spawn_single() { let group: Arc, ::SignatureKey>> = MasterMap::new(); trace!(?group); - let pub_key = get_pubkey(); - let _network = MemoryNetwork::new(pub_key, NoMetrics::boxed(), group, Option::None); + let _pub_key = get_pubkey(); } // // Spawning a two MemoryNetworks and connecting them should produce no errors @@ -184,10 +182,8 @@ async fn memory_network_spawn_double() { let group: Arc, ::SignatureKey>> = MasterMap::new(); trace!(?group); - let pub_key_1 = get_pubkey(); - let _network_1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); - let pub_key_2 = get_pubkey(); - let _network_2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let _pub_key_1 = get_pubkey(); + let _pub_key_2 = get_pubkey(); } // Check to make sure direct queue works @@ -207,10 +203,20 @@ async fn memory_network_direct_queue() { trace!(?group); let pub_key_1 = get_pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let network1 = MemoryNetwork::new( + pub_key_1, + NetworkingMetricsValue::new(), + group.clone(), + Option::None, + ); let pub_key_2 = get_pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let network2 = MemoryNetwork::new( + pub_key_2, + NetworkingMetricsValue::new(), + group, + Option::None, + ); let first_messages: Vec> = gen_messages(5, 100, pub_key_1); @@ -263,9 +269,19 @@ async fn memory_network_broadcast_queue() { MasterMap::new(); trace!(?group); let pub_key_1 = get_pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let network1 = MemoryNetwork::new( + pub_key_1, + NetworkingMetricsValue::new(), + group.clone(), + Option::None, + ); let pub_key_2 = get_pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let network2 = MemoryNetwork::new( + pub_key_2, + NetworkingMetricsValue::new(), + group, + Option::None, + ); let first_messages: Vec> = gen_messages(5, 100, pub_key_1); @@ -324,9 +340,19 @@ async fn memory_network_test_in_flight_message_count() { MasterMap::new(); trace!(?group); let pub_key_1 = get_pubkey(); - let network1 = MemoryNetwork::new(pub_key_1, NoMetrics::boxed(), group.clone(), Option::None); + let network1 = MemoryNetwork::new( + pub_key_1, + NetworkingMetricsValue::new(), + group.clone(), + Option::None, + ); let pub_key_2 = get_pubkey(); - let network2 = MemoryNetwork::new(pub_key_2, NoMetrics::boxed(), group, Option::None); + let network2 = MemoryNetwork::new( + pub_key_2, + NetworkingMetricsValue::new(), + group, + Option::None, + ); // Create some dummy messages let messages: Vec> = gen_messages(5, 100, pub_key_1); diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index c0eb7a11e0..e0532c2e66 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -50,6 +50,7 @@ time = { workspace = true } tracing = { workspace = true } ethereum-types = { workspace = true } typenum = { workspace = true } +dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.14" } [dev-dependencies] serde_json = "1.0.107" diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index e10900aa7e..6574a73f8a 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -2,6 +2,7 @@ pub use crate::traits::node_implementation::ViewQueue; pub use crate::utils::{View, ViewInner}; +use displaydoc::Display; use crate::utils::Terminator; use crate::{ @@ -9,7 +10,7 @@ use crate::{ data::LeafType, error::HotShotError, traits::{ - metrics::{Counter, Gauge, Histogram, Metrics}, + metrics::{Counter, Gauge, Histogram, Label, Metrics}, node_implementation::NodeType, }, }; @@ -17,7 +18,7 @@ use commit::{Commitment, Committable}; use derivative::Derivative; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, - sync::Arc, + sync::{Arc, Mutex}, }; use tracing::error; @@ -57,106 +58,183 @@ pub struct Consensus> { /// A reference to the metrics trait #[debug(skip)] - pub metrics: Arc, - - /// Amount of invalid QCs we've seen since the last commit - /// Used for metrics. This resets to 0 on every decide event. - pub invalid_qc: usize, + pub metrics: Arc, } -/// The metrics being collected for the consensus algorithm -pub struct ConsensusMetrics { +/// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces +#[derive(Clone, Debug)] +pub struct ConsensusMetricsValue { + /// The values that are being tracked + pub values: Arc>, + /// The number of last synced synced block height + pub last_synced_block_height: Box, + /// The number of last decided view + pub last_decided_view: Box, /// The current view pub current_view: Box, - /// The duration to collect votes in a view (only applies when this insance is the leader) - pub vote_validate_duration: Box, - /// The duration we waited for txns before building the proposal - pub proposal_wait_duration: Box, - /// The duration to build the proposal - pub proposal_build_duration: Box, - /// The duration of each view, in seconds - pub view_duration: Box, - /// Number of views that are in-flight since the last committed view - pub number_of_views_since_last_commit: Box, + /// Number of views that are in-flight since the last decided view + pub number_of_views_since_last_decide: Box, /// Number of views that are in-flight since the last anchor view pub number_of_views_per_decide_event: Box, - /// Number of invalid QCs between anchors - pub invalid_qc_views: Box, - /// Number of views that were discarded since from one achor to the next - pub discarded_views_per_decide_event: Box, - /// Views where no proposal was seen from one anchor to the next - pub empty_views_per_decide_event: Box, - /// Number of rejected transactions - pub rejected_transactions: Box, + /// Number of invalid QCs we've seen since the last commit. + pub invalid_qc: Box, /// Number of outstanding transactions pub outstanding_transactions: Box, /// Memory size in bytes of the serialized transactions still outstanding pub outstanding_transactions_memory_size: Box, /// Number of views that timed out pub number_of_timeouts: Box, - /// Total direct messages this node sent out - pub outgoing_direct_messages: Box, - /// Total broadcasts sent - pub outgoing_broadcast_messages: Box, - /// Total messages received - pub direct_messages_received: Box, - /// Total broadcast messages received - pub broadcast_messages_received: Box, - /// Total number of messages which couldn't be sent - pub failed_to_send_messages: Box, +} + +/// The wrapper with a string name for the networking metrics +#[derive(Clone, Debug)] +pub struct ConsensusMetrics { + /// a prefix which tracks the name of the metric + prefix: String, + /// a map of values + values: Arc>, +} + +/// the set of counters and gauges for the networking metrics +#[derive(Clone, Debug, Default, Display)] +pub struct InnerConsensusMetrics { + /// All the counters of the networking metrics + pub counters: HashMap, + /// All the gauges of the networking metrics + pub gauges: HashMap, + /// All the histograms of the networking metrics + pub histograms: HashMap>, + /// All the labels of the networking metrics + pub labels: HashMap, } impl ConsensusMetrics { - /// Create a new instance of this [`ConsensusMetrics`] struct, setting all the counters and gauges #[must_use] - pub fn new(metrics: &dyn Metrics) -> Self { + /// For the creation and naming of gauge, counter, histogram and label. + pub fn sub(&self, name: String) -> Self { + let prefix = if self.prefix.is_empty() { + name + } else { + format!("{}-{name}", self.prefix) + }; + Self { + prefix, + values: Arc::clone(&self.values), + } + } +} + +impl Metrics for ConsensusMetrics { + fn create_counter(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_gauge(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_histogram(&self, label: String, _unit_label: Option) -> Box { + Box::new(self.sub(label)) + } + + fn create_label(&self, label: String) -> Box { + Box::new(self.sub(label)) + } + + fn subgroup(&self, subgroup_name: String) -> Box { + Box::new(self.sub(subgroup_name)) + } +} + +impl Counter for ConsensusMetrics { + fn add(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .counters + .entry(self.prefix.clone()) + .or_default() += amount; + } +} + +impl Gauge for ConsensusMetrics { + fn set(&self, amount: usize) { + *self + .values + .lock() + .unwrap() + .gauges + .entry(self.prefix.clone()) + .or_default() = amount; + } + fn update(&self, delta: i64) { + let mut values = self.values.lock().unwrap(); + let value = values.gauges.entry(self.prefix.clone()).or_default(); + let signed_value = i64::try_from(*value).unwrap_or(i64::MAX); + *value = usize::try_from(signed_value + delta).unwrap_or(0); + } +} + +impl Histogram for ConsensusMetrics { + fn add_point(&self, point: f64) { + self.values + .lock() + .unwrap() + .histograms + .entry(self.prefix.clone()) + .or_default() + .push(point); + } +} + +impl Label for ConsensusMetrics { + fn set(&self, value: String) { + *self + .values + .lock() + .unwrap() + .labels + .entry(self.prefix.clone()) + .or_default() = value; + } +} + +impl ConsensusMetricsValue { + /// Create a new instance of this [`ConsensusMetricsValue`] struct, setting all the counters and gauges + #[must_use] + pub fn new() -> Self { + let values = Arc::default(); + let metrics: Box = Box::new(ConsensusMetrics { + prefix: String::new(), + values: Arc::clone(&values), + }); Self { + values, + last_synced_block_height: metrics + .create_gauge(String::from("last_synced_block_height"), None), + last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None), current_view: metrics.create_gauge(String::from("current_view"), None), - vote_validate_duration: metrics.create_histogram( - String::from("vote_validate_duration"), - Some(String::from("seconds")), - ), - proposal_build_duration: metrics.create_histogram( - String::from("proposal_build_duration"), - Some(String::from("seconds")), - ), - proposal_wait_duration: metrics.create_histogram( - String::from("proposal_wait_duration"), - Some(String::from("seconds")), - ), - view_duration: metrics - .create_histogram(String::from("view_duration"), Some(String::from("seconds"))), - number_of_views_since_last_commit: metrics - .create_gauge(String::from("number_of_views_since_last_commit"), None), + number_of_views_since_last_decide: metrics + .create_gauge(String::from("number_of_views_since_last_decide"), None), number_of_views_per_decide_event: metrics .create_histogram(String::from("number_of_views_per_decide_event"), None), - invalid_qc_views: metrics.create_histogram(String::from("invalid_qc_views"), None), - discarded_views_per_decide_event: metrics - .create_histogram(String::from("discarded_views_per_decide_event"), None), - empty_views_per_decide_event: metrics - .create_histogram(String::from("empty_views_per_decide_event"), None), - rejected_transactions: metrics - .create_counter(String::from("rejected_transactions"), None), + invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None), outstanding_transactions: metrics .create_gauge(String::from("outstanding_transactions"), None), outstanding_transactions_memory_size: metrics .create_gauge(String::from("outstanding_transactions_memory_size"), None), - outgoing_direct_messages: metrics - .create_counter(String::from("outgoing_direct_messages"), None), - outgoing_broadcast_messages: metrics - .create_counter(String::from("outgoing_broadcast_messages"), None), - direct_messages_received: metrics - .create_counter(String::from("direct_messages_received"), None), - broadcast_messages_received: metrics - .create_counter(String::from("broadcast_messages_received"), None), - failed_to_send_messages: metrics - .create_counter(String::from("failed_to_send_messages"), None), - number_of_timeouts: metrics - .create_counter(String::from("number_of_views_timed_out"), None), + number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None), } } } +impl Default for ConsensusMetricsValue { + fn default() -> Self { + Self::new() + } +} + impl> Consensus { /// increment the current view /// NOTE may need to do gc here diff --git a/crates/types/src/data.rs b/crates/types/src/data.rs index 44a1937161..1a02a6b9a7 100644 --- a/crates/types/src/data.rs +++ b/crates/types/src/data.rs @@ -63,6 +63,10 @@ impl ConsensusTime for ViewNumber { fn new(n: u64) -> Self { Self(n) } + /// Returen the u64 format + fn get_u64(&self) -> u64 { + self.0 + } } impl Committable for ViewNumber { diff --git a/crates/types/src/traits/metrics.rs b/crates/types/src/traits/metrics.rs index 0253e129d5..fc69b5c077 100644 --- a/crates/types/src/traits/metrics.rs +++ b/crates/types/src/traits/metrics.rs @@ -6,8 +6,11 @@ //! - [`Histogram`]: stores multiple float values based for a graph (example usage: CPU %) //! - [`Label`]: Stores the last string (example usage: current version, network online/offline) +use dyn_clone::DynClone; +use std::fmt::Debug; + /// The metrics type. -pub trait Metrics: Send + Sync { +pub trait Metrics: Send + Sync + DynClone + Debug { /// Create a [`Counter`] with an optional `unit_label`. /// /// The `unit_label` can be used to indicate what the unit of the value is, e.g. "kb" or "seconds" @@ -76,12 +79,12 @@ impl Label for NoMetrics { } /// An ever-incrementing counter -pub trait Counter: Send + Sync { +pub trait Counter: Send + Sync + Debug + DynClone { /// Add a value to the counter fn add(&self, amount: usize); } /// A gauge that stores the latest value. -pub trait Gauge: Send + Sync { +pub trait Gauge: Send + Sync + Debug + DynClone { /// Set the gauge value fn set(&self, amount: usize); @@ -90,16 +93,21 @@ pub trait Gauge: Send + Sync { } /// A histogram which will record a series of points. -pub trait Histogram: Send + Sync { +pub trait Histogram: Send + Sync + Debug + DynClone { /// Add a point to this histogram. fn add_point(&self, point: f64); } /// A label that stores the last string value. -pub trait Label: Send + Sync { +pub trait Label: Send + Sync + DynClone { /// Set the label value fn set(&self, value: String); } +dyn_clone::clone_trait_object!(Metrics); +dyn_clone::clone_trait_object!(Gauge); +dyn_clone::clone_trait_object!(Counter); +dyn_clone::clone_trait_object!(Histogram); +dyn_clone::clone_trait_object!(Label); #[cfg(test)] mod test { @@ -109,6 +117,7 @@ mod test { sync::{Arc, Mutex}, }; + #[derive(Debug, Clone)] struct TestMetrics { prefix: String, values: Arc>, diff --git a/crates/types/src/traits/state.rs b/crates/types/src/traits/state.rs index bd8fe7d2ed..1db9e0754e 100644 --- a/crates/types/src/traits/state.rs +++ b/crates/types/src/traits/state.rs @@ -90,6 +90,8 @@ pub trait ConsensusTime: } /// Create a new instance of this time unit fn new(val: u64) -> Self; + /// Get the u64 format of time + fn get_u64(&self) -> u64; } /// extra functions required on state to be usable by hotshot-testing