Skip to content

Commit

Permalink
Merge pull request #1852 from EspressoSystems/sishan/basic_metrics
Browse files Browse the repository at this point in the history
Sishan/basic metrics
  • Loading branch information
dailinsubjam committed Oct 11, 2023
2 parents 3fc6194 + 20187f2 commit cb42183
Show file tree
Hide file tree
Showing 18 changed files with 465 additions and 163 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hotshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ serde = { workspace = true, features = ["rc"] }
snafu = { workspace = true }
surf-disco = { workspace = true }
time = { workspace = true }
dyn-clone = { git = "https://github.com/dtolnay/dyn-clone", tag = "1.0.14" }

tracing = { workspace = true }
typenum = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions crates/hotshot/examples/infra/modDA.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use futures::StreamExt;
use hotshot::{
traits::{
implementations::{
Libp2pCommChannel, Libp2pNetwork, MemoryStorage, WebCommChannel, WebServerNetwork,
Libp2pCommChannel, Libp2pNetwork, MemoryStorage, NetworkingMetricsValue,
WebCommChannel, WebServerNetwork,
},
NodeImplementation,
},
Expand All @@ -23,14 +24,14 @@ use hotshot_task::task::FilterEvent;
use hotshot_types::{
block_impl::{VIDBlockPayload, VIDTransaction},
certificate::ViewSyncCertificate,
consensus::ConsensusMetricsValue,
data::{QuorumProposal, SequencingLeaf, TestableLeaf},
event::{Event, EventType},
message::{Message, SequencingMessage},
traits::{
election::{
CommitteeExchange, ConsensusExchange, Membership, QuorumExchange, ViewSyncExchange,
},
metrics::NoMetrics,
network::CommunicationChannel,
node_implementation::{
CommitteeEx, ExchangesType, NodeType, QuorumEx, SequencingExchanges,
Expand Down Expand Up @@ -249,7 +250,7 @@ pub trait RunDA<
MemoryStorage::empty(),
exchanges,
initializer,
NoMetrics::boxed(),
ConsensusMetricsValue::new(),
)
.await
.expect("Could not init hotshot")
Expand Down Expand Up @@ -690,7 +691,7 @@ where

let node_config = config_builder.build().unwrap();
let underlying_quorum_network = Libp2pNetwork::new(
NoMetrics::boxed(),
NetworkingMetricsValue::new(),
node_config,
pubkey.clone(),
Arc::new(RwLock::new(
Expand Down
20 changes: 8 additions & 12 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use hotshot_task_impls::{events::SequencingHotShotEvent, network::NetworkTaskKin
use hotshot_types::{
block_impl::{VIDBlockPayload, VIDTransaction},
certificate::{DACertificate, ViewSyncCertificate},
consensus::{BlockStore, Consensus, ConsensusMetrics, View, ViewInner, ViewQueue},
consensus::{BlockStore, Consensus, ConsensusMetricsValue, View, ViewInner, ViewQueue},
data::{DAProposal, DeltasType, LeafType, QuorumProposal, SequencingLeaf},
error::StorageSnafu,
message::{
Expand All @@ -67,7 +67,6 @@ use hotshot_types::{
traits::{
consensus_api::{ConsensusSharedApi, SequencingConsensusApi},
election::{ConsensusExchange, Membership, SignedCertificate},
metrics::Metrics,
network::{CommunicationChannel, NetworkError},
node_implementation::{
ChannelMaps, CommitteeEx, ExchangesType, NodeType, SendToTasks, SequencingQuorumEx,
Expand Down Expand Up @@ -129,8 +128,8 @@ pub struct SystemContextInner<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// Sender for [`Event`]s
event_sender: RwLock<Option<BroadcastSender<Event<TYPES, I::Leaf>>>>,

/// a reference to the metrics that the implementor is using.
_metrics: Box<dyn Metrics>,
/// the metrics that the implementor is using.
_metrics: Arc<ConsensusMetricsValue>,

/// The hotstuff implementation
consensus: Arc<RwLock<Consensus<TYPES, I::Leaf>>>,
Expand Down Expand Up @@ -174,13 +173,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
storage: I::Storage,
exchanges: I::Exchanges,
initializer: HotShotInitializer<TYPES, I::Leaf>,
metrics: Box<dyn Metrics>,
metrics: ConsensusMetricsValue,
) -> Result<Self, HotShotError<TYPES>> {
debug!("Creating a new hotshot");

let consensus_metrics = Arc::new(ConsensusMetrics::new(
&*metrics.subgroup("consensus".to_string()),
));
let consensus_metrics = Arc::new(metrics);
let anchored_leaf = initializer.inner;

// insert to storage
Expand Down Expand Up @@ -219,8 +216,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
// https://github.com/EspressoSystems/HotShot/issues/560
locked_view: anchored_leaf.get_view_number(),
high_qc: anchored_leaf.get_justify_qc(),
metrics: consensus_metrics,
invalid_qc: 0,
metrics: consensus_metrics.clone(),
};
let consensus = Arc::new(RwLock::new(consensus));

Expand All @@ -234,7 +230,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
storage,
exchanges: Arc::new(exchanges),
event_sender: RwLock::default(),
_metrics: metrics,
_metrics: consensus_metrics.clone(),
internal_event_stream: ChannelStream::new(),
output_event_stream: ChannelStream::new(),
});
Expand Down Expand Up @@ -393,7 +389,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
storage: I::Storage,
exchanges: I::Exchanges,
initializer: HotShotInitializer<TYPES, I::Leaf>,
metrics: Box<dyn Metrics>,
metrics: ConsensusMetricsValue,
) -> Result<
(
SystemContextHandle<TYPES, I>,
Expand Down
1 change: 1 addition & 0 deletions crates/hotshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod implementations {
libp2p_network::{Libp2pCommChannel, Libp2pNetwork, PeerInfoVec},
memory_network::{MasterMap, MemoryCommChannel, MemoryNetwork},
web_server_network::{WebCommChannel, WebServerNetwork},
NetworkingMetricsValue,
},
storage::memory_storage::MemoryStorage, // atomic_storage::AtomicStorage,
};
Expand Down
173 changes: 155 additions & 18 deletions crates/hotshot/src/traits/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,34 @@ pub mod combined_network;
pub mod libp2p_network;
pub mod memory_network;
pub mod web_server_network;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use custom_debug::Debug;
use hotshot_types::traits::metrics::{Counter, Gauge, Histogram, Label, Metrics};
pub use hotshot_types::traits::network::{
ChannelSendSnafu, CouldNotDeliverSnafu, FailedToDeserializeSnafu, FailedToSerializeSnafu,
NetworkError, NetworkReliability, NoSuchNodeSnafu, ShutDownSnafu,
};

use hotshot_types::traits::metrics::{Counter, Gauge, Metrics};

/// Contains the metrics that we're interested in from the networking interfaces
pub struct NetworkingMetrics {
/// Contains several `NetworkingMetrics` that we're interested in from the networking interfaces
#[derive(Clone, Debug)]
pub struct NetworkingMetricsValue {
#[allow(dead_code)]
/// The values that are being tracked
pub values: Arc<Mutex<InnerNetworkingMetrics>>,
/// A [`Gauge`] which tracks how many peers are connected
pub connected_peers: Box<dyn Gauge>,
/// A [`Counter`] which tracks how many messages have been received
pub incoming_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been send
pub outgoing_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been received directly
pub incoming_direct_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been received by broadcast
pub incoming_broadcast_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been send directly
pub outgoing_direct_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages have been send by broadcast
pub outgoing_broadcast_message_count: Box<dyn Counter>,
/// A [`Counter`] which tracks how many messages failed to send
pub message_failed_to_send: Box<dyn Counter>,
// A [`Gauge`] which tracks how many connected entries there are in the gossipsub mesh
Expand All @@ -36,21 +47,147 @@ pub struct NetworkingMetrics {
// pub kademlia_buckets: Box<dyn Gauge>,
}

/// The wrapper with a string name for the networking metrics
#[derive(Clone, Debug)]
pub struct NetworkingMetrics {
/// a prefix which tracks the name of the metric
prefix: String,
/// a map of values
values: Arc<Mutex<InnerNetworkingMetrics>>,
}

/// the set of counters and gauges for the networking metrics
#[derive(Clone, Debug, Default)]
pub struct InnerNetworkingMetrics {
/// All the counters of the networking metrics
counters: HashMap<String, usize>,
/// All the gauges of the networking metrics
gauges: HashMap<String, usize>,
/// All the histograms of the networking metrics
histograms: HashMap<String, Vec<f64>>,
/// All the labels of the networking metrics
labels: HashMap<String, String>,
}

impl NetworkingMetrics {
/// Create a new instance of this [`NetworkingMetrics`] struct, setting all the counters and gauges
pub(self) fn new(metrics: &dyn Metrics) -> Self {
/// For the creation and naming of gauge, counter, histogram and label.
pub fn sub(&self, name: String) -> Self {
let prefix = if self.prefix.is_empty() {
name
} else {
format!("{}-{name}", self.prefix)
};
Self {
prefix,
values: Arc::clone(&self.values),
}
}
}

impl Metrics for NetworkingMetrics {
fn create_counter(&self, label: String, _unit_label: Option<String>) -> Box<dyn Counter> {
Box::new(self.sub(label))
}

fn create_gauge(&self, label: String, _unit_label: Option<String>) -> Box<dyn Gauge> {
Box::new(self.sub(label))
}

fn create_histogram(&self, label: String, _unit_label: Option<String>) -> Box<dyn Histogram> {
Box::new(self.sub(label))
}

fn create_label(&self, label: String) -> Box<dyn Label> {
Box::new(self.sub(label))
}

fn subgroup(&self, subgroup_name: String) -> Box<dyn Metrics> {
Box::new(self.sub(subgroup_name))
}
}

impl Counter for NetworkingMetrics {
fn add(&self, amount: usize) {
*self
.values
.lock()
.unwrap()
.counters
.entry(self.prefix.clone())
.or_default() += amount;
}
}

impl Gauge for NetworkingMetrics {
fn set(&self, amount: usize) {
*self
.values
.lock()
.unwrap()
.gauges
.entry(self.prefix.clone())
.or_default() = amount;
}
fn update(&self, delta: i64) {
let mut values = self.values.lock().unwrap();
let value = values.gauges.entry(self.prefix.clone()).or_default();
let signed_value = i64::try_from(*value).unwrap_or(i64::MAX);
*value = usize::try_from(signed_value + delta).unwrap_or(0);
}
}

impl Histogram for NetworkingMetrics {
fn add_point(&self, point: f64) {
self.values
.lock()
.unwrap()
.histograms
.entry(self.prefix.clone())
.or_default()
.push(point);
}
}

impl Label for NetworkingMetrics {
fn set(&self, value: String) {
*self
.values
.lock()
.unwrap()
.labels
.entry(self.prefix.clone())
.or_default() = value;
}
}

impl NetworkingMetricsValue {
/// Create a new instance of this [`NetworkingMetricsValue`] struct, setting all the counters and gauges
#[must_use]
pub fn new() -> Self {
let values = Arc::default();
let metrics: Box<dyn Metrics> = Box::new(NetworkingMetrics {
prefix: String::new(),
values: Arc::clone(&values),
});
Self {
values,
connected_peers: metrics.create_gauge(String::from("connected_peers"), None),
incoming_message_count: metrics
.create_counter(String::from("incoming_message_count"), None),
outgoing_message_count: metrics
.create_counter(String::from("outgoing_message_count"), None),
incoming_direct_message_count: metrics
.create_counter(String::from("incoming_direct_message_count"), None),
incoming_broadcast_message_count: metrics
.create_counter(String::from("incoming_broadcast_message_count"), None),
outgoing_direct_message_count: metrics
.create_counter(String::from("outgoing_direct_message_count"), None),
outgoing_broadcast_message_count: metrics
.create_counter(String::from("outgoing_broadcast_message_count"), None),
message_failed_to_send: metrics
.create_counter(String::from("message_failed_to_send"), None),
// gossipsub_mesh_connected: metrics
// .create_gauge(String::from("gossipsub_mesh_connected"), None),
// kademlia_entries: metrics.create_gauge(String::from("kademlia_entries"), None),
// kademlia_buckets: metrics.create_gauge(String::from("kademlia_buckets"), None),
}
}
}

impl Default for NetworkingMetricsValue {
fn default() -> Self {
Self::new()
}
}
Loading

0 comments on commit cb42183

Please sign in to comment.