From 267d13e6a91d1e4735dff4beb1911be9b30d34b2 Mon Sep 17 00:00:00 2001 From: Frederik Rothenberger Date: Mon, 3 Jun 2024 13:50:01 +0200 Subject: [PATCH 1/3] Add explicit bound to amortized clean-up This adds an explicit limit to the number of events processed during amortized clean-up. Using that, we can (in a future PR) remove the timer based mechanism for pruning data. --- src/internet_identity/src/state.rs | 7 + .../src/stats/event_stats.rs | 166 +++++++++++++----- .../src/stats/event_stats/tests.rs | 84 +++++++++ .../src/storage/storable_persistent_state.rs | 6 + .../tests/integration/aggregation_stats.rs | 84 +++++++++ 5 files changed, 303 insertions(+), 44 deletions(-) diff --git a/src/internet_identity/src/state.rs b/src/internet_identity/src/state.rs index f2a2c6503d..f2dca2c957 100644 --- a/src/internet_identity/src/state.rs +++ b/src/internet_identity/src/state.rs @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter; use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter; use crate::stats::activity_stats::ActivityStats; +use crate::stats::event_stats::EventKey; use crate::storage::anchor::Anchor; use crate::storage::MAX_ENTRIES; use crate::{random_salt, Storage}; @@ -103,6 +104,11 @@ pub struct PersistentState { // event_aggregations is expected to have a lot of entries, thus counting by iterating over it is not // an option. pub event_aggregations_count: u64, + // Key into the event_data BTreeMap where the 24h tracking window starts. + // This key is used to prune old entries from the 24h event aggregations. + // If it is `none`, then the 24h pruning window starts from the newest entry in the event_data + // BTreeMap minus 24h. + pub event_stats_24h_pruning_start: Option, } impl Default for PersistentState { @@ -118,6 +124,7 @@ impl Default for PersistentState { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, + event_stats_24h_pruning_start: None, } } } diff --git a/src/internet_identity/src/stats/event_stats.rs b/src/internet_identity/src/stats/event_stats.rs index df38c05e17..9829e13416 100644 --- a/src/internet_identity/src/stats/event_stats.rs +++ b/src/internet_identity/src/stats/event_stats.rs @@ -59,6 +59,7 @@ use crate::stats::event_stats::event_aggregations::AGGREGATIONS; use crate::stats::event_stats::Event::PruneEvent; use crate::storage::Storage; use crate::{state, DAY_NS, MINUTE_NS}; +use candid::CandidType; use ic_cdk::api::call::CallResult; use ic_cdk::api::time; use ic_cdk::{caller, id, trap}; @@ -73,12 +74,15 @@ use std::time::Duration; /// This module defines the aggregations over the events. mod event_aggregations; + pub use event_aggregations::*; +const MAX_EVENTS_TO_PRUNE: usize = 100; + #[cfg(test)] mod tests; -#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)] +#[derive(Deserialize, Serialize, CandidType, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)] pub struct EventKey { /// Timestamp of the event. pub time: Timestamp, @@ -104,6 +108,19 @@ impl EventKey { counter: u16::MAX, } } + + pub fn next_key(&self) -> Self { + match self.counter { + u16::MAX => Self { + time: self.time + 1, + counter: 0, + }, + _ => Self { + time: self.time, + counter: self.counter + 1, + }, + } + } } #[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug)] @@ -276,62 +293,122 @@ fn update_events_internal(event: EventData, now: Timestamp, s: &mut S s.event_data_count += 1; }); - let pruned_24h = if let Some((prev_key, _)) = s.event_data.iter_upper_bound(¤t_key).next() - { - // `timestamp` denotes the time of the last event recorded just before `now` - // The difference (now - timestamp) is the time that has passed since the last event - // and hence the last time the daily stats were updated. - // Therefore, we need to prune all events from the daily aggregations that happened in - // that same difference, but 24 hours ago. - // - // |<----------------------- 24h ----------------------->| - // |--|--------------------------------------------------|--| --> time - // ^ ^ ^ ^ - // | | | └ now - // | └ now - 24h (prune_window_end) └ timestamp - // └ timestamp - 24h (prune_window_start) - - let prune_window_start = prev_key.time - DAY_NS; - let prune_window_end = now - DAY_NS; - s.event_data - .range(EventKey::min_key(prune_window_start)..=EventKey::max_key(prune_window_end)) - .collect::>() - } else { - // there is no event before `now` in the db, so the list of pruned events is empty - vec![] - }; - + let mut aggregations_db_wrapper = CountingAggregationsWrapper(&mut s.event_aggregations); // Update 24h aggregations - AGGREGATIONS.iter().for_each(|aggregation| { - update_aggregation( - |(_, data)| aggregation.process_event(AggregationWindow::Day, data), - current_key.clone(), - event.clone(), - &pruned_24h, - &mut CountingAggregationsWrapper(&mut s.event_aggregations), - ); - }); + update_aggregations( + ¤t_key, + &event, + &prune_events_24h(now, ¤t_key, &s.event_data), + AggregationWindow::Day, + &mut aggregations_db_wrapper, + ); // This pruning _deletes_ the data older than 30 days. Do this after the 24h aggregation // otherwise the daily stats become inaccurate on the unlikely event that there is no activity // for 30 days. - let pruned_30d = prune_events(&mut s.event_data, now); - + let pruned_30d = prune_events_30d(&mut s.event_data, now); // Update 30d aggregations + update_aggregations( + ¤t_key, + &event, + &pruned_30d, + AggregationWindow::Month, + &mut aggregations_db_wrapper, + ); +} + +/// Iterates over all aggregations and updates them based on the new event and the pruned events. +fn update_aggregations( + event_key: &EventKey, + event_data: &EventData, + pruned_events: &[(EventKey, EventData)], + window: AggregationWindow, + aggregations_db: &mut CountingAggregationsWrapper, +) { AGGREGATIONS.iter().for_each(|aggregation| { update_aggregation( - |(_, data)| aggregation.process_event(AggregationWindow::Month, data), - current_key.clone(), - event.clone(), - &pruned_30d, - &mut CountingAggregationsWrapper(&mut s.event_aggregations), + |(_, data)| aggregation.process_event(window.clone(), data), + event_key.clone(), + event_data.clone(), + pruned_events, + aggregations_db, ); }); } -/// Adds an event to the event_data map and simultaneously removes events older than the retention period (30d). +/// Collects events older than 24h that need to be removed from the 24h aggregations. +/// Given events are kept for 30 days, the events are not deleted from the supplied `db`. +/// Instead, this function simply updates the `event_stats_24h_pruning_start` state variable +/// that denotes the first event that should be pruned in the next call. +/// +/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PRUNE]. +fn prune_events_24h( + now: Timestamp, + current_key: &EventKey, + db: &StableBTreeMap, +) -> Vec<(EventKey, EventData)> { + /// Calculates the 24h window start based on the current key: + /// - The current key is used to find the last event right before it. + /// - The timestamp of that key is then shifted back 24h. + /// + /// This assumes that the 24h window has been correctly pruned in the past, including up to the + /// previous event. + /// + /// ``` + /// |<----------------------- 24h ----------------------->| + /// |--|--------------------------------------------------|--| --> time + /// ^ ^ ^ ^ + /// | | | └ current_key + /// | └ now - 24h (prune_window_end) └ previous event + /// └ previous event timestamp - 24h (prune_window_start) + /// ``` + fn prune_window_start_from_current_key( + current_key: &EventKey, + db: &StableBTreeMap, + ) -> Option { + db.iter_upper_bound(current_key) + .next() + .map(|(k, _)| EventKey::min_key(k.time - DAY_NS)) + } + + let prune_window_start = + // Continue pruning from the last key. This value will be set if the 24h window has been pruned + // before. + state::persistent_state(|s| s.event_stats_24h_pruning_start.clone()).or_else(|| { + // Alternatively, calculate it from the current key. This is necessary in two cases: + // - the events have never been pruned before because they are not yet 24h old. + // - this is the first event after an II upgrade from a version that did not have this + // state variable to track the beginning of the 24h window. + prune_window_start_from_current_key(current_key, db) + }); + + let Some(prune_start_key) = prune_window_start else { + // there is no key to start pruning from, so the list of pruned events is empty. + return vec![]; + }; + + // Always aim to prune up to 24h ago, event if past attempts did not manage due to the + // MAX_EVENTS_TO_PRUNE limit. + let prune_window_end = now - DAY_NS; + let events = db + .range(prune_start_key..=EventKey::max_key(prune_window_end)) + .take(MAX_EVENTS_TO_PRUNE) + .collect::>(); + + // update the persistent state with they key _after_ the one being pointed to by the last event + // so that the next amortized pruning can continue from there. + if let Some((k, _)) = events.last() { + state::persistent_state_mut(|s| { + s.event_stats_24h_pruning_start = Some(k.next_key()); + }); + } + events +} + +/// Removes events older than the retention period (30d). /// Returns a vec of tuples of the pruned events and their timestamps. -fn prune_events( +/// Prunes at most [MAX_EVENTS_TO_PRUNE]. +fn prune_events_30d( db: &mut StableBTreeMap, now: Timestamp, ) -> Vec<(EventKey, EventData)> { @@ -339,6 +416,7 @@ fn prune_events( let pruned_events: Vec<_> = db .range(..=EventKey::max_key(now - RETENTION_PERIOD)) + .take(MAX_EVENTS_TO_PRUNE) .collect(); for entry in &pruned_events { let entry: &(EventKey, EventData) = entry; diff --git a/src/internet_identity/src/stats/event_stats/tests.rs b/src/internet_identity/src/stats/event_stats/tests.rs index b6e9c8047a..c520f62837 100644 --- a/src/internet_identity/src/stats/event_stats/tests.rs +++ b/src/internet_identity/src/stats/event_stats/tests.rs @@ -397,6 +397,90 @@ fn should_prune_monthly_events_after_30d() { ); } +#[test] +fn should_prune_at_most_100_events_24h() { + let mut storage = test_storage(); + let event = EventData { + event: Event::PrepareDelegation(PrepareDelegationEvent { + ii_domain: Some(IIDomain::Ic0App), + frontend: EXAMPLE_URL.to_string(), + session_duration_ns: to_ns(SESS_DURATION_SEC), + }), + }; + let aggregation_key = AggregationKey::new( + PrepareDelegationCount, + Day, + Some(IIDomain::Ic0App), + EXAMPLE_URL.to_string(), + ); + + for _ in 0..107 { + update_events_internal(event.clone(), TIMESTAMP, &mut storage); + } + + update_events_internal(event.clone(), TIMESTAMP + DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + + // Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning + // --> 8 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + assert_eq!( + persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(), + EventKey { + time: TIMESTAMP, + counter: 100 + } + ); + update_events_internal(event.clone(), TIMESTAMP + 2 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + // Prune again, after another 24h leaving only the event that triggered the pruning + // --> 1 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + assert_eq!( + persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(), + EventKey { + time: TIMESTAMP + DAY_NS, + counter: 108 + } + ); +} + +#[test] +fn should_prune_at_most_100_events_30d() { + let mut storage = test_storage(); + let event = EventData { + event: Event::PrepareDelegation(PrepareDelegationEvent { + ii_domain: Some(IIDomain::Ic0App), + frontend: EXAMPLE_URL.to_string(), + session_duration_ns: to_ns(SESS_DURATION_SEC), + }), + }; + let aggregation_key = AggregationKey::new( + PrepareDelegationCount, + Month, + Some(IIDomain::Ic0App), + EXAMPLE_URL.to_string(), + ); + + for _ in 0..107 { + update_events_internal(event.clone(), TIMESTAMP, &mut storage); + } + + update_events_internal(event.clone(), TIMESTAMP + 30 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + + // Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning + // --> 8 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + assert_eq!(storage.event_data.len(), 8); + update_events_internal(event.clone(), TIMESTAMP + 60 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + // Prune again, after another 30d leaving only the event that triggered the pruning + // --> 1 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + assert_eq!(storage.event_data.len(), 1); +} + #[test] fn should_account_for_dapps_changing_session_lifetime() { let mut storage = test_storage(); diff --git a/src/internet_identity/src/storage/storable_persistent_state.rs b/src/internet_identity/src/storage/storable_persistent_state.rs index d7d675db90..2b1a081013 100644 --- a/src/internet_identity/src/storage/storable_persistent_state.rs +++ b/src/internet_identity/src/storage/storable_persistent_state.rs @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter; use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter; use crate::stats::activity_stats::ActivityStats; +use crate::stats::event_stats::EventKey; use candid::{CandidType, Deserialize}; use ic_stable_structures::storable::Bound; use ic_stable_structures::Storable; @@ -30,6 +31,7 @@ pub struct StorablePersistentState { event_data_count: Option, // opt of backwards compatibility event_aggregations_count: Option, + event_stats_24h_pruning_start: Option, } impl Storable for StorablePersistentState { @@ -66,6 +68,7 @@ impl From for StorablePersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: Some(s.event_data_count), event_aggregations_count: Some(s.event_aggregations_count), + event_stats_24h_pruning_start: s.event_stats_24h_pruning_start, } } } @@ -82,6 +85,7 @@ impl From for PersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: s.event_data_count.unwrap_or_default(), event_aggregations_count: s.event_aggregations_count.unwrap_or_default(), + event_stats_24h_pruning_start: s.event_stats_24h_pruning_start, } } } @@ -120,6 +124,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: Some(0), event_aggregations_count: Some(0), + event_stats_24h_pruning_start: None, }; assert_eq!(StorablePersistentState::default(), expected_defaults); @@ -137,6 +142,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, + event_stats_24h_pruning_start: None, }; assert_eq!(PersistentState::default(), expected_defaults); } diff --git a/src/internet_identity/tests/integration/aggregation_stats.rs b/src/internet_identity/tests/integration/aggregation_stats.rs index b0913b951b..a404abef10 100644 --- a/src/internet_identity/tests/integration/aggregation_stats.rs +++ b/src/internet_identity/tests/integration/aggregation_stats.rs @@ -122,6 +122,90 @@ fn should_report_at_most_100_entries() -> Result<(), CallError> { Ok(()) } +#[test] +fn should_prune_at_most_100_entries_24h() -> Result<(), CallError> { + let env = env(); + let canister_id = install_ii_canister(&env, II_WASM.clone()); + let ii_origin = "ic0.app"; + let identity_nr = create_identity(&env, canister_id, ii_origin); + + for i in 0..102 { + delegation_for_origin( + &env, + canister_id, + identity_nr, + &format!("https://some-dapp-{}", i), + )?; + } + + env.advance_time(Duration::from_secs(60 * 60 * 24)); + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + + // 100 entries should have been pruned, leaving 3 + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "24h", ii_origin)) + .unwrap() + .len(), + 3 + ); + + // The rest should have been pruned, leaving 1 + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "24h", ii_origin)) + .unwrap() + .len(), + 1 + ); + Ok(()) +} + +#[test] +fn should_prune_at_most_100_entries_30d() -> Result<(), CallError> { + let env = env(); + let canister_id = install_ii_canister(&env, II_WASM.clone()); + let ii_origin = "ic0.app"; + let identity_nr = create_identity(&env, canister_id, ii_origin); + + for i in 0..102 { + delegation_for_origin( + &env, + canister_id, + identity_nr, + &format!("https://some-dapp-{}", i), + )?; + } + + env.advance_time(Duration::from_secs(60 * 60 * 24 * 30)); + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + + // 100 entries should have been pruned, leaving 3 + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "30d", ii_origin)) + .unwrap() + .len(), + 3 + ); + + // The rest should have been pruned, leaving 1 + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "30d", ii_origin)) + .unwrap() + .len(), + 1 + ); + Ok(()) +} + #[test] fn should_keep_aggregations_across_upgrades() -> Result<(), CallError> { const II_ORIGIN: &str = "ic0.app"; From 2052b1305acc4e09d2b2fbfffad7dfeaf36421e6 Mon Sep 17 00:00:00 2001 From: Frederik Rothenberger Date: Tue, 4 Jun 2024 11:19:45 +0200 Subject: [PATCH 2/3] Remove notion of pruning for 24h event aggregations window Since the underlying event data is not deleted, the word "prune" is misleading in the context. It is replaced with "remove from aggregation". --- src/internet_identity/src/state.rs | 8 +-- .../src/stats/event_stats.rs | 55 ++++++++++--------- .../src/stats/event_stats/tests.rs | 28 ++++++++-- .../src/storage/storable_persistent_state.rs | 10 ++-- .../tests/integration/aggregation_stats.rs | 2 +- 5 files changed, 62 insertions(+), 41 deletions(-) diff --git a/src/internet_identity/src/state.rs b/src/internet_identity/src/state.rs index f2dca2c957..72345847a7 100644 --- a/src/internet_identity/src/state.rs +++ b/src/internet_identity/src/state.rs @@ -105,10 +105,10 @@ pub struct PersistentState { // an option. pub event_aggregations_count: u64, // Key into the event_data BTreeMap where the 24h tracking window starts. - // This key is used to prune old entries from the 24h event aggregations. - // If it is `none`, then the 24h pruning window starts from the newest entry in the event_data + // This key is used to remove old entries from the 24h event aggregations. + // If it is `none`, then the 24h window starts from the newest entry in the event_data // BTreeMap minus 24h. - pub event_stats_24h_pruning_start: Option, + pub event_stats_24h_start: Option, } impl Default for PersistentState { @@ -124,7 +124,7 @@ impl Default for PersistentState { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, - event_stats_24h_pruning_start: None, + event_stats_24h_start: None, } } } diff --git a/src/internet_identity/src/stats/event_stats.rs b/src/internet_identity/src/stats/event_stats.rs index 9829e13416..277418b70c 100644 --- a/src/internet_identity/src/stats/event_stats.rs +++ b/src/internet_identity/src/stats/event_stats.rs @@ -77,7 +77,9 @@ mod event_aggregations; pub use event_aggregations::*; -const MAX_EVENTS_TO_PRUNE: usize = 100; +/// Number of past events to process per aggregation window in a single update call +/// This limit will be applied twice, to the 24h and 30d aggregations. +const MAX_EVENTS_TO_PROCESS: usize = 100; #[cfg(test)] mod tests; @@ -294,11 +296,14 @@ fn update_events_internal(event: EventData, now: Timestamp, s: &mut S }); let mut aggregations_db_wrapper = CountingAggregationsWrapper(&mut s.event_aggregations); + // Collect events that should no longer be reflected in the 24h aggregations. Does _not_ delete + // the underlying events. + let removed_24h = events_to_remove_from_24h_aggregations(now, ¤t_key, &s.event_data); // Update 24h aggregations update_aggregations( ¤t_key, &event, - &prune_events_24h(now, ¤t_key, &s.event_data), + &removed_24h, AggregationWindow::Day, &mut aggregations_db_wrapper, ); @@ -306,7 +311,7 @@ fn update_events_internal(event: EventData, now: Timestamp, s: &mut S // This pruning _deletes_ the data older than 30 days. Do this after the 24h aggregation // otherwise the daily stats become inaccurate on the unlikely event that there is no activity // for 30 days. - let pruned_30d = prune_events_30d(&mut s.event_data, now); + let pruned_30d = prune_events(&mut s.event_data, now); // Update 30d aggregations update_aggregations( ¤t_key, @@ -338,11 +343,11 @@ fn update_aggregations( /// Collects events older than 24h that need to be removed from the 24h aggregations. /// Given events are kept for 30 days, the events are not deleted from the supplied `db`. -/// Instead, this function simply updates the `event_stats_24h_pruning_start` state variable -/// that denotes the first event that should be pruned in the next call. +/// Instead, this function simply updates the `event_stats_24h_start` state variable +/// that denotes the first event that should be removed from the 24h window in the next call. /// -/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PRUNE]. -fn prune_events_24h( +/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PROCESS]. +fn events_to_remove_from_24h_aggregations( now: Timestamp, current_key: &EventKey, db: &StableBTreeMap, @@ -359,10 +364,10 @@ fn prune_events_24h( /// |--|--------------------------------------------------|--| --> time /// ^ ^ ^ ^ /// | | | └ current_key - /// | └ now - 24h (prune_window_end) └ previous event - /// └ previous event timestamp - 24h (prune_window_start) + /// | └ now - 24h └ previous event + /// └ previous event timestamp - 24h /// ``` - fn prune_window_start_from_current_key( + fn removal_start_from_current_key( current_key: &EventKey, db: &StableBTreeMap, ) -> Option { @@ -371,35 +376,35 @@ fn prune_events_24h( .map(|(k, _)| EventKey::min_key(k.time - DAY_NS)) } - let prune_window_start = + let window_start = // Continue pruning from the last key. This value will be set if the 24h window has been pruned // before. - state::persistent_state(|s| s.event_stats_24h_pruning_start.clone()).or_else(|| { + state::persistent_state(|s| s.event_stats_24h_start.clone()).or_else(|| { // Alternatively, calculate it from the current key. This is necessary in two cases: // - the events have never been pruned before because they are not yet 24h old. // - this is the first event after an II upgrade from a version that did not have this // state variable to track the beginning of the 24h window. - prune_window_start_from_current_key(current_key, db) + removal_start_from_current_key(current_key, db) }); - let Some(prune_start_key) = prune_window_start else { - // there is no key to start pruning from, so the list of pruned events is empty. + let Some(start_key) = window_start else { + // there is no key to start from, so the list of removed events is empty. return vec![]; }; - // Always aim to prune up to 24h ago, event if past attempts did not manage due to the - // MAX_EVENTS_TO_PRUNE limit. - let prune_window_end = now - DAY_NS; + // Always aim to collect events up to 24h ago, even if past update calls did not manage due to + // the MAX_EVENTS_TO_PROCESS limit. + let window_end_timestamp = now - DAY_NS; let events = db - .range(prune_start_key..=EventKey::max_key(prune_window_end)) - .take(MAX_EVENTS_TO_PRUNE) + .range(start_key..=EventKey::max_key(window_end_timestamp)) + .take(MAX_EVENTS_TO_PROCESS) .collect::>(); // update the persistent state with they key _after_ the one being pointed to by the last event - // so that the next amortized pruning can continue from there. + // so that the next amortized clean-up can continue from there. if let Some((k, _)) = events.last() { state::persistent_state_mut(|s| { - s.event_stats_24h_pruning_start = Some(k.next_key()); + s.event_stats_24h_start = Some(k.next_key()); }); } events @@ -407,8 +412,8 @@ fn prune_events_24h( /// Removes events older than the retention period (30d). /// Returns a vec of tuples of the pruned events and their timestamps. -/// Prunes at most [MAX_EVENTS_TO_PRUNE]. -fn prune_events_30d( +/// Prunes at most [MAX_EVENTS_TO_PROCESS]. +fn prune_events( db: &mut StableBTreeMap, now: Timestamp, ) -> Vec<(EventKey, EventData)> { @@ -416,7 +421,7 @@ fn prune_events_30d( let pruned_events: Vec<_> = db .range(..=EventKey::max_key(now - RETENTION_PERIOD)) - .take(MAX_EVENTS_TO_PRUNE) + .take(MAX_EVENTS_TO_PROCESS) .collect(); for entry in &pruned_events { let entry: &(EventKey, EventData) = entry; diff --git a/src/internet_identity/src/stats/event_stats/tests.rs b/src/internet_identity/src/stats/event_stats/tests.rs index c520f62837..f09f7e294f 100644 --- a/src/internet_identity/src/stats/event_stats/tests.rs +++ b/src/internet_identity/src/stats/event_stats/tests.rs @@ -262,7 +262,7 @@ fn should_store_multiple_events_and_aggregate_expected_weight_count() { } #[test] -fn should_prune_daily_events_after_24h() { +fn should_remove_daily_events_after_24h() { let mut storage = test_storage(); let event = EventData { event: Event::PrepareDelegation(PrepareDelegationEvent { @@ -398,7 +398,7 @@ fn should_prune_monthly_events_after_30d() { } #[test] -fn should_prune_at_most_100_events_24h() { +fn should_remove_at_most_100_events_24h() { let mut storage = test_storage(); let event = EventData { event: Event::PrepareDelegation(PrepareDelegationEvent { @@ -421,11 +421,15 @@ fn should_prune_at_most_100_events_24h() { update_events_internal(event.clone(), TIMESTAMP + DAY_NS, &mut storage); assert_event_count_consistent(&mut storage); - // Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning + // Of the 107 initial events, 100 should be removed, 1 was added to trigger the clean-up // --> 8 expected events assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + + // Since the number of events exceeds the amortized clean-up limit of 100 events, the 24h window + // is expected to start at the time of the first event not cleaned-up, which is the event with + // counter 100 at time TIMESTAMP. assert_eq!( - persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(), + persistent_state(|s| s.event_stats_24h_start.clone()).unwrap(), EventKey { time: TIMESTAMP, counter: 100 @@ -433,11 +437,13 @@ fn should_prune_at_most_100_events_24h() { ); update_events_internal(event.clone(), TIMESTAMP + 2 * DAY_NS, &mut storage); assert_event_count_consistent(&mut storage); - // Prune again, after another 24h leaving only the event that triggered the pruning + // Clean-up again, after another 24h leaving only the event that triggered the clean-up // --> 1 expected events assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + + // the 24h window is now expected to be up-to-date starting 24h before the last event added assert_eq!( - persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(), + persistent_state(|s| s.event_stats_24h_start.clone()).unwrap(), EventKey { time: TIMESTAMP + DAY_NS, counter: 108 @@ -785,6 +791,16 @@ fn should_prune_zero_weighted_events() { assert_eq!(storage.event_data.len(), 1); } +#[test] +fn should_wrap_event_key_counter_correctly() { + let key = EventKey { + time: TIMESTAMP, + counter: u16::MAX, + }; + let next_key = key.next_key(); + assert!(next_key > key); +} + /// Make sure the cached count values are consistent with the actual data fn assert_event_count_consistent(storage: &mut Storage>>>) { assert_eq!( diff --git a/src/internet_identity/src/storage/storable_persistent_state.rs b/src/internet_identity/src/storage/storable_persistent_state.rs index 2b1a081013..b186729819 100644 --- a/src/internet_identity/src/storage/storable_persistent_state.rs +++ b/src/internet_identity/src/storage/storable_persistent_state.rs @@ -31,7 +31,7 @@ pub struct StorablePersistentState { event_data_count: Option, // opt of backwards compatibility event_aggregations_count: Option, - event_stats_24h_pruning_start: Option, + event_stats_24h_start: Option, } impl Storable for StorablePersistentState { @@ -68,7 +68,7 @@ impl From for StorablePersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: Some(s.event_data_count), event_aggregations_count: Some(s.event_aggregations_count), - event_stats_24h_pruning_start: s.event_stats_24h_pruning_start, + event_stats_24h_start: s.event_stats_24h_start, } } } @@ -85,7 +85,7 @@ impl From for PersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: s.event_data_count.unwrap_or_default(), event_aggregations_count: s.event_aggregations_count.unwrap_or_default(), - event_stats_24h_pruning_start: s.event_stats_24h_pruning_start, + event_stats_24h_start: s.event_stats_24h_start, } } } @@ -124,7 +124,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: Some(0), event_aggregations_count: Some(0), - event_stats_24h_pruning_start: None, + event_stats_24h_start: None, }; assert_eq!(StorablePersistentState::default(), expected_defaults); @@ -142,7 +142,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, - event_stats_24h_pruning_start: None, + event_stats_24h_start: None, }; assert_eq!(PersistentState::default(), expected_defaults); } diff --git a/src/internet_identity/tests/integration/aggregation_stats.rs b/src/internet_identity/tests/integration/aggregation_stats.rs index a404abef10..2ef1516df9 100644 --- a/src/internet_identity/tests/integration/aggregation_stats.rs +++ b/src/internet_identity/tests/integration/aggregation_stats.rs @@ -123,7 +123,7 @@ fn should_report_at_most_100_entries() -> Result<(), CallError> { } #[test] -fn should_prune_at_most_100_entries_24h() -> Result<(), CallError> { +fn should_remove_at_most_100_entries_24h() -> Result<(), CallError> { let env = env(); let canister_id = install_ii_canister(&env, II_WASM.clone()); let ii_origin = "ic0.app"; From 9229b91b75ffb602bfed91b1c2c1de400807bae3 Mon Sep 17 00:00:00 2001 From: Frederik Rothenberger Date: Tue, 4 Jun 2024 14:12:33 +0200 Subject: [PATCH 3/3] Clarify function name and comments --- src/internet_identity/src/stats/event_stats.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/internet_identity/src/stats/event_stats.rs b/src/internet_identity/src/stats/event_stats.rs index 277418b70c..53f8ef04d4 100644 --- a/src/internet_identity/src/stats/event_stats.rs +++ b/src/internet_identity/src/stats/event_stats.rs @@ -367,7 +367,7 @@ fn events_to_remove_from_24h_aggregations( /// | └ now - 24h └ previous event /// └ previous event timestamp - 24h /// ``` - fn removal_start_from_current_key( + fn window_start_from_current_key( current_key: &EventKey, db: &StableBTreeMap, ) -> Option { @@ -377,14 +377,14 @@ fn events_to_remove_from_24h_aggregations( } let window_start = - // Continue pruning from the last key. This value will be set if the 24h window has been pruned - // before. + // Load the window start from persistent state. This value will be set if events have been + // removed from the 24h window before. state::persistent_state(|s| s.event_stats_24h_start.clone()).or_else(|| { // Alternatively, calculate it from the current key. This is necessary in two cases: - // - the events have never been pruned before because they are not yet 24h old. + // - the events have never been removed before because they are not yet 24h old. // - this is the first event after an II upgrade from a version that did not have this // state variable to track the beginning of the 24h window. - removal_start_from_current_key(current_key, db) + window_start_from_current_key(current_key, db) }); let Some(start_key) = window_start else {