diff --git a/sn_networking/src/metrics/bad_node.rs b/sn_networking/src/metrics/bad_node.rs index 2499c1862e..b9a6ab2877 100644 --- a/sn_networking/src/metrics/bad_node.rs +++ b/sn_networking/src/metrics/bad_node.rs @@ -21,10 +21,24 @@ use strum::IntoEnumIterator; const UPDATE_INTERVAL: Duration = Duration::from_secs(20); +#[cfg(not(test))] +const MAX_EVICTED_CLOSE_GROUP_PEERS: usize = 5 * CLOSE_GROUP_SIZE; +#[cfg(test)] +const MAX_EVICTED_CLOSE_GROUP_PEERS: usize = CLOSE_GROUP_SIZE + 2; + pub struct BadNodeMetrics { shunned_count_across_time_frames: ShunnedCountAcrossTimeFrames, - shunned_by_close_group: Gauge, - shunned_by_old_close_group: Gauge, + shunned_by_close_group: ShunnedByCloseGroup, +} + +pub enum BadNodeMetricsMsg { + ShunnedByPeer(PeerId), + CloseGroupUpdated(Vec), +} + +struct ShunnedByCloseGroup { + metric_current_group: Gauge, + metric_old_group: Gauge, // trackers close_group_peers: Vec, @@ -34,11 +48,6 @@ pub struct BadNodeMetrics { old_close_group_peers_that_have_shunned_us: HashSet, } -pub enum BadNodeMetricsMsg { - ShunnedByPeer(PeerId), - CloseGroupUpdated(Vec), -} - /// A struct to record the the number of reports against our node across different time frames. struct ShunnedCountAcrossTimeFrames { metric: Family, @@ -113,13 +122,15 @@ impl BadNodeMetrics { metric: time_based_shunned_count, shunned_report_tracker: Vec::new(), }, - shunned_by_close_group, - shunned_by_old_close_group, - - close_group_peers: Vec::new(), - old_close_group_peers: Vec::new(), - old_close_group_peers_that_have_shunned_us: HashSet::new(), - close_group_peers_that_have_shunned_us: HashSet::new(), + shunned_by_close_group: ShunnedByCloseGroup { + metric_current_group: shunned_by_close_group, + metric_old_group: shunned_by_old_close_group, + + close_group_peers: Vec::new(), + old_close_group_peers: Vec::new(), + old_close_group_peers_that_have_shunned_us: HashSet::new(), + close_group_peers_that_have_shunned_us: HashSet::new(), + }, }; let (tx, mut rx) = tokio::sync::mpsc::channel(10); @@ -133,35 +144,11 @@ impl BadNodeMetrics { match msg { Some(BadNodeMetricsMsg::ShunnedByPeer(peer)) => { bad_node_metrics.shunned_count_across_time_frames.record_shunned_metric(); - - // increment the metric if the peer is in the close group (new or old) and hasn't shunned us before - if bad_node_metrics.close_group_peers.contains(&peer) { - if !bad_node_metrics - .close_group_peers_that_have_shunned_us - .contains(&peer) - { - bad_node_metrics.shunned_by_close_group.inc(); - bad_node_metrics - .close_group_peers_that_have_shunned_us - .insert(peer); - } - } else if bad_node_metrics - .old_close_group_peers - .iter() - .any(|(p, _)| p == &peer) - && !bad_node_metrics - .old_close_group_peers_that_have_shunned_us - .contains(&peer) - { - bad_node_metrics.shunned_by_old_close_group.inc(); - bad_node_metrics - .old_close_group_peers_that_have_shunned_us - .insert(peer); - } + bad_node_metrics.shunned_by_close_group.record_shunned_metric(peer); } Some(BadNodeMetricsMsg::CloseGroupUpdated(new_closest_peers)) => { - bad_node_metrics.update_close_group_peers(new_closest_peers); + bad_node_metrics.shunned_by_close_group.update_close_group_peers(new_closest_peers); } None => break, } @@ -169,13 +156,32 @@ impl BadNodeMetrics { } _ = update_interval.tick() => { - bad_node_metrics.shunned_count_across_time_frames.try_update(); + bad_node_metrics.shunned_count_across_time_frames.try_update_state(); } } } }); tx } +} + +impl ShunnedByCloseGroup { + pub(crate) fn record_shunned_metric(&mut self, peer: PeerId) { + // increment the metric if the peer is in the close group (new or old) and hasn't shunned us before + if self.close_group_peers.contains(&peer) { + if !self.close_group_peers_that_have_shunned_us.contains(&peer) { + self.metric_current_group.inc(); + self.close_group_peers_that_have_shunned_us.insert(peer); + } + } else if self.old_close_group_peers.iter().any(|(p, _)| p == &peer) + && !self + .old_close_group_peers_that_have_shunned_us + .contains(&peer) + { + self.metric_old_group.inc(); + self.old_close_group_peers_that_have_shunned_us.insert(peer); + } + } pub(crate) fn update_close_group_peers(&mut self, new_closest_peers: Vec) { let new_members: Vec = new_closest_peers @@ -195,11 +201,11 @@ impl BadNodeMetrics { .old_close_group_peers_that_have_shunned_us .contains(new_member) { - self.shunned_by_old_close_group.dec(); + self.metric_old_group.dec(); self.old_close_group_peers_that_have_shunned_us .remove(new_member); - self.shunned_by_close_group.inc(); + self.metric_current_group.inc(); self.close_group_peers_that_have_shunned_us .insert(*new_member); } @@ -214,11 +220,11 @@ impl BadNodeMetrics { .close_group_peers_that_have_shunned_us .contains(evicted_member) { - self.shunned_by_close_group.dec(); + self.metric_current_group.dec(); self.close_group_peers_that_have_shunned_us .remove(evicted_member); - self.shunned_by_old_close_group.inc(); + self.metric_old_group.inc(); self.old_close_group_peers_that_have_shunned_us .insert(*evicted_member); } @@ -228,22 +234,24 @@ impl BadNodeMetrics { debug!("The close group has been updated. The new members are {new_members:?}. The evicted members are {evicted_members:?}"); self.close_group_peers = new_closest_peers; - if self.old_close_group_peers.len() > 5 * CLOSE_GROUP_SIZE { + if self.old_close_group_peers.len() > MAX_EVICTED_CLOSE_GROUP_PEERS { // clean the oldest Instant ones self.old_close_group_peers - .sort_by_key(|(_, instant)| *instant); + .sort_by_key(|(_, instant)| std::cmp::Reverse(*instant)); // get the list of the peers that are about to be truncated - let truncated_peers = self.old_close_group_peers.split_off(5 * CLOSE_GROUP_SIZE); + let truncated_peers = self + .old_close_group_peers + .split_off(MAX_EVICTED_CLOSE_GROUP_PEERS); // remove tracking for the truncated peers for (peer, _) in truncated_peers { if self .old_close_group_peers_that_have_shunned_us .remove(&peer) { - self.shunned_by_old_close_group.dec(); + self.metric_old_group.dec(); } if self.close_group_peers_that_have_shunned_us.remove(&peer) { - self.shunned_by_close_group.dec(); + self.metric_current_group.dec(); } } } @@ -267,7 +275,7 @@ impl ShunnedCountAcrossTimeFrames { } } - fn try_update(&mut self) { + fn try_update_state(&mut self) { let now = Instant::now(); let mut idx_to_remove = Vec::new(); @@ -299,9 +307,10 @@ impl ShunnedCountAcrossTimeFrames { #[cfg(test)] mod tests { use super::*; + use eyre::Result; #[test] - fn update_should_move_to_next_state() -> eyre::Result<()> { + fn update_should_move_to_next_timeframe() -> Result<()> { let mut shunned_metrics = ShunnedCountAcrossTimeFrames { metric: Family::default(), shunned_report_tracker: Vec::new(), @@ -325,7 +334,7 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.try_update(); + shunned_metrics.try_update_state(); let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastHour)); // all the counters except LastTenMinutes should be 1 @@ -347,7 +356,7 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.try_update(); + shunned_metrics.try_update_state(); let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastSixHours)); // all the counters except LastTenMinutes and LastHour should be 1 @@ -369,7 +378,7 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.try_update(); + shunned_metrics.try_update_state(); let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastDay)); // all the counters except LastTenMinutes, LastHour and LastSixHours should be 1 @@ -394,7 +403,7 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.try_update(); + shunned_metrics.try_update_state(); let current_state = shunned_metrics.shunned_report_tracker[0].least_bucket_it_fits_in; assert!(matches!(current_state, TimeFrameType::LastWeek)); // all the counters except LastTenMinutes, LastHour, LastSixHours and LastDay should be 1 @@ -420,7 +429,7 @@ mod tests { std::thread::sleep(std::time::Duration::from_secs( current_state.get_duration_sec() + 1, )); - shunned_metrics.try_update(); + shunned_metrics.try_update_state(); assert_eq!(shunned_metrics.shunned_report_tracker.len(), 0); // all the counters except Indefinite should be 0 for variant in TimeFrameType::iter() { @@ -436,4 +445,222 @@ mod tests { Ok(()) } + + #[test] + fn metrics_should_not_be_updated_if_close_group_is_not_set() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: Vec::new(), + close_group_peers_that_have_shunned_us: HashSet::new(), + old_close_group_peers_that_have_shunned_us: HashSet::new(), + }; + + close_group_shunned.record_shunned_metric(PeerId::random()); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + Ok(()) + } + + #[test] + fn close_group_shunned_metric_should_be_updated_on_new_report() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: Vec::new(), + close_group_peers_that_have_shunned_us: HashSet::new(), + old_close_group_peers_that_have_shunned_us: HashSet::new(), + }; + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + // report by a peer in the close group should increment the metric + close_group_shunned.record_shunned_metric(close_group_shunned.close_group_peers[0]); + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // report by same peer should not increment the metric + close_group_shunned.record_shunned_metric(close_group_shunned.close_group_peers[0]); + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // report by a different peer should increment the metric + close_group_shunned.record_shunned_metric(close_group_shunned.close_group_peers[1]); + assert_eq!(close_group_shunned.metric_current_group.get(), 2); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // report by a peer that is not in the close group should not increment the metric + close_group_shunned.record_shunned_metric(PeerId::random()); + assert_eq!(close_group_shunned.metric_current_group.get(), 2); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + Ok(()) + } + + #[test] + fn change_in_close_group_should_update_the_metrics() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: Vec::new(), + close_group_peers_that_have_shunned_us: HashSet::new(), + old_close_group_peers_that_have_shunned_us: HashSet::new(), + }; + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + let old_member = close_group_shunned.close_group_peers[0]; + close_group_shunned.record_shunned_metric(old_member); + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + // update close group + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + ]); + + // the peer that shunned us before should now be in the old group + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + + // report by the old member should not increment the metric + close_group_shunned.record_shunned_metric(old_member); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + + // update close group with old member + close_group_shunned.update_close_group_peers(vec![ + old_member, + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + ]); + + // the metrics of current_group and old_group should be updated + assert_eq!(close_group_shunned.metric_current_group.get(), 1); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + + Ok(()) + } + + #[test] + fn update_close_group_metrics_on_reaching_max_evicted_peer_count() -> Result<()> { + let mut close_group_shunned = ShunnedByCloseGroup { + metric_current_group: Gauge::default(), + metric_old_group: Gauge::default(), + + close_group_peers: Vec::new(), + old_close_group_peers: Vec::new(), + close_group_peers_that_have_shunned_us: HashSet::new(), + old_close_group_peers_that_have_shunned_us: HashSet::new(), + }; + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + + // evict 1 members + let old_member_1 = close_group_shunned.close_group_peers[0]; + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // evict 1 members + let old_member_2 = close_group_shunned.close_group_peers[0]; + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // report by the evicted members should increment the old group metric + close_group_shunned.record_shunned_metric(old_member_1); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + close_group_shunned.record_shunned_metric(old_member_2); + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 2); + + // evict all the members + close_group_shunned.update_close_group_peers(vec![ + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + PeerId::random(), + ]); + + // the metrics should still remain the same + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 2); + + // evict 1 more members to cross the threshold + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // the metric from the member_1 should be removed + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 1); + assert!(!close_group_shunned + .old_close_group_peers + .iter() + .any(|(p, _)| p == &old_member_1)); + assert!(close_group_shunned + .old_close_group_peers + .iter() + .any(|(p, _)| p == &old_member_2)); + + // evict 1 more member + close_group_shunned.update_close_group_peers(vec![ + close_group_shunned.close_group_peers[1], + close_group_shunned.close_group_peers[2], + close_group_shunned.close_group_peers[3], + close_group_shunned.close_group_peers[4], + PeerId::random(), + ]); + + // the metric from the member_2 should be removed + assert_eq!(close_group_shunned.metric_current_group.get(), 0); + assert_eq!(close_group_shunned.metric_old_group.get(), 0); + assert!(!close_group_shunned + .old_close_group_peers + .iter() + .any(|(p, _)| p == &old_member_1)); + + Ok(()) + } }