diff --git a/src/internet_identity/src/state.rs b/src/internet_identity/src/state.rs index f2a2c6503d..72345847a7 100644 --- a/src/internet_identity/src/state.rs +++ b/src/internet_identity/src/state.rs @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter; use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter; use crate::stats::activity_stats::ActivityStats; +use crate::stats::event_stats::EventKey; use crate::storage::anchor::Anchor; use crate::storage::MAX_ENTRIES; use crate::{random_salt, Storage}; @@ -103,6 +104,11 @@ pub struct PersistentState { // event_aggregations is expected to have a lot of entries, thus counting by iterating over it is not // an option. pub event_aggregations_count: u64, + // Key into the event_data BTreeMap where the 24h tracking window starts. + // This key is used to remove old entries from the 24h event aggregations. + // If it is `none`, then the 24h window starts from the newest entry in the event_data + // BTreeMap minus 24h. + pub event_stats_24h_start: Option, } impl Default for PersistentState { @@ -118,6 +124,7 @@ impl Default for PersistentState { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, + event_stats_24h_start: None, } } } diff --git a/src/internet_identity/src/stats/event_stats.rs b/src/internet_identity/src/stats/event_stats.rs index df38c05e17..53f8ef04d4 100644 --- a/src/internet_identity/src/stats/event_stats.rs +++ b/src/internet_identity/src/stats/event_stats.rs @@ -59,6 +59,7 @@ use crate::stats::event_stats::event_aggregations::AGGREGATIONS; use crate::stats::event_stats::Event::PruneEvent; use crate::storage::Storage; use crate::{state, DAY_NS, MINUTE_NS}; +use candid::CandidType; use ic_cdk::api::call::CallResult; use ic_cdk::api::time; use ic_cdk::{caller, id, trap}; @@ -73,12 +74,17 @@ use std::time::Duration; /// This module defines the aggregations over the events. mod event_aggregations; + pub use event_aggregations::*; +/// Number of past events to process per aggregation window in a single update call +/// This limit will be applied twice, to the 24h and 30d aggregations. +const MAX_EVENTS_TO_PROCESS: usize = 100; + #[cfg(test)] mod tests; -#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)] +#[derive(Deserialize, Serialize, CandidType, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)] pub struct EventKey { /// Timestamp of the event. pub time: Timestamp, @@ -104,6 +110,19 @@ impl EventKey { counter: u16::MAX, } } + + pub fn next_key(&self) -> Self { + match self.counter { + u16::MAX => Self { + time: self.time + 1, + counter: 0, + }, + _ => Self { + time: self.time, + counter: self.counter + 1, + }, + } + } } #[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug)] @@ -276,61 +295,124 @@ fn update_events_internal(event: EventData, now: Timestamp, s: &mut S s.event_data_count += 1; }); - let pruned_24h = if let Some((prev_key, _)) = s.event_data.iter_upper_bound(¤t_key).next() - { - // `timestamp` denotes the time of the last event recorded just before `now` - // The difference (now - timestamp) is the time that has passed since the last event - // and hence the last time the daily stats were updated. - // Therefore, we need to prune all events from the daily aggregations that happened in - // that same difference, but 24 hours ago. - // - // |<----------------------- 24h ----------------------->| - // |--|--------------------------------------------------|--| --> time - // ^ ^ ^ ^ - // | | | └ now - // | └ now - 24h (prune_window_end) └ timestamp - // └ timestamp - 24h (prune_window_start) - - let prune_window_start = prev_key.time - DAY_NS; - let prune_window_end = now - DAY_NS; - s.event_data - .range(EventKey::min_key(prune_window_start)..=EventKey::max_key(prune_window_end)) - .collect::>() - } else { - // there is no event before `now` in the db, so the list of pruned events is empty - vec![] - }; - + let mut aggregations_db_wrapper = CountingAggregationsWrapper(&mut s.event_aggregations); + // Collect events that should no longer be reflected in the 24h aggregations. Does _not_ delete + // the underlying events. + let removed_24h = events_to_remove_from_24h_aggregations(now, ¤t_key, &s.event_data); // Update 24h aggregations - AGGREGATIONS.iter().for_each(|aggregation| { - update_aggregation( - |(_, data)| aggregation.process_event(AggregationWindow::Day, data), - current_key.clone(), - event.clone(), - &pruned_24h, - &mut CountingAggregationsWrapper(&mut s.event_aggregations), - ); - }); + update_aggregations( + ¤t_key, + &event, + &removed_24h, + AggregationWindow::Day, + &mut aggregations_db_wrapper, + ); // This pruning _deletes_ the data older than 30 days. Do this after the 24h aggregation // otherwise the daily stats become inaccurate on the unlikely event that there is no activity // for 30 days. let pruned_30d = prune_events(&mut s.event_data, now); - // Update 30d aggregations + update_aggregations( + ¤t_key, + &event, + &pruned_30d, + AggregationWindow::Month, + &mut aggregations_db_wrapper, + ); +} + +/// Iterates over all aggregations and updates them based on the new event and the pruned events. +fn update_aggregations( + event_key: &EventKey, + event_data: &EventData, + pruned_events: &[(EventKey, EventData)], + window: AggregationWindow, + aggregations_db: &mut CountingAggregationsWrapper, +) { AGGREGATIONS.iter().for_each(|aggregation| { update_aggregation( - |(_, data)| aggregation.process_event(AggregationWindow::Month, data), - current_key.clone(), - event.clone(), - &pruned_30d, - &mut CountingAggregationsWrapper(&mut s.event_aggregations), + |(_, data)| aggregation.process_event(window.clone(), data), + event_key.clone(), + event_data.clone(), + pruned_events, + aggregations_db, ); }); } -/// Adds an event to the event_data map and simultaneously removes events older than the retention period (30d). +/// Collects events older than 24h that need to be removed from the 24h aggregations. +/// Given events are kept for 30 days, the events are not deleted from the supplied `db`. +/// Instead, this function simply updates the `event_stats_24h_start` state variable +/// that denotes the first event that should be removed from the 24h window in the next call. +/// +/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PROCESS]. +fn events_to_remove_from_24h_aggregations( + now: Timestamp, + current_key: &EventKey, + db: &StableBTreeMap, +) -> Vec<(EventKey, EventData)> { + /// Calculates the 24h window start based on the current key: + /// - The current key is used to find the last event right before it. + /// - The timestamp of that key is then shifted back 24h. + /// + /// This assumes that the 24h window has been correctly pruned in the past, including up to the + /// previous event. + /// + /// ``` + /// |<----------------------- 24h ----------------------->| + /// |--|--------------------------------------------------|--| --> time + /// ^ ^ ^ ^ + /// | | | └ current_key + /// | └ now - 24h └ previous event + /// └ previous event timestamp - 24h + /// ``` + fn window_start_from_current_key( + current_key: &EventKey, + db: &StableBTreeMap, + ) -> Option { + db.iter_upper_bound(current_key) + .next() + .map(|(k, _)| EventKey::min_key(k.time - DAY_NS)) + } + + let window_start = + // Load the window start from persistent state. This value will be set if events have been + // removed from the 24h window before. + state::persistent_state(|s| s.event_stats_24h_start.clone()).or_else(|| { + // Alternatively, calculate it from the current key. This is necessary in two cases: + // - the events have never been removed before because they are not yet 24h old. + // - this is the first event after an II upgrade from a version that did not have this + // state variable to track the beginning of the 24h window. + window_start_from_current_key(current_key, db) + }); + + let Some(start_key) = window_start else { + // there is no key to start from, so the list of removed events is empty. + return vec![]; + }; + + // Always aim to collect events up to 24h ago, even if past update calls did not manage due to + // the MAX_EVENTS_TO_PROCESS limit. + let window_end_timestamp = now - DAY_NS; + let events = db + .range(start_key..=EventKey::max_key(window_end_timestamp)) + .take(MAX_EVENTS_TO_PROCESS) + .collect::>(); + + // update the persistent state with they key _after_ the one being pointed to by the last event + // so that the next amortized clean-up can continue from there. + if let Some((k, _)) = events.last() { + state::persistent_state_mut(|s| { + s.event_stats_24h_start = Some(k.next_key()); + }); + } + events +} + +/// Removes events older than the retention period (30d). /// Returns a vec of tuples of the pruned events and their timestamps. +/// Prunes at most [MAX_EVENTS_TO_PROCESS]. fn prune_events( db: &mut StableBTreeMap, now: Timestamp, @@ -339,6 +421,7 @@ fn prune_events( let pruned_events: Vec<_> = db .range(..=EventKey::max_key(now - RETENTION_PERIOD)) + .take(MAX_EVENTS_TO_PROCESS) .collect(); for entry in &pruned_events { let entry: &(EventKey, EventData) = entry; diff --git a/src/internet_identity/src/stats/event_stats/tests.rs b/src/internet_identity/src/stats/event_stats/tests.rs index b6e9c8047a..f09f7e294f 100644 --- a/src/internet_identity/src/stats/event_stats/tests.rs +++ b/src/internet_identity/src/stats/event_stats/tests.rs @@ -262,7 +262,7 @@ fn should_store_multiple_events_and_aggregate_expected_weight_count() { } #[test] -fn should_prune_daily_events_after_24h() { +fn should_remove_daily_events_after_24h() { let mut storage = test_storage(); let event = EventData { event: Event::PrepareDelegation(PrepareDelegationEvent { @@ -397,6 +397,96 @@ fn should_prune_monthly_events_after_30d() { ); } +#[test] +fn should_remove_at_most_100_events_24h() { + let mut storage = test_storage(); + let event = EventData { + event: Event::PrepareDelegation(PrepareDelegationEvent { + ii_domain: Some(IIDomain::Ic0App), + frontend: EXAMPLE_URL.to_string(), + session_duration_ns: to_ns(SESS_DURATION_SEC), + }), + }; + let aggregation_key = AggregationKey::new( + PrepareDelegationCount, + Day, + Some(IIDomain::Ic0App), + EXAMPLE_URL.to_string(), + ); + + for _ in 0..107 { + update_events_internal(event.clone(), TIMESTAMP, &mut storage); + } + + update_events_internal(event.clone(), TIMESTAMP + DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + + // Of the 107 initial events, 100 should be removed, 1 was added to trigger the clean-up + // --> 8 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + + // Since the number of events exceeds the amortized clean-up limit of 100 events, the 24h window + // is expected to start at the time of the first event not cleaned-up, which is the event with + // counter 100 at time TIMESTAMP. + assert_eq!( + persistent_state(|s| s.event_stats_24h_start.clone()).unwrap(), + EventKey { + time: TIMESTAMP, + counter: 100 + } + ); + update_events_internal(event.clone(), TIMESTAMP + 2 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + // Clean-up again, after another 24h leaving only the event that triggered the clean-up + // --> 1 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + + // the 24h window is now expected to be up-to-date starting 24h before the last event added + assert_eq!( + persistent_state(|s| s.event_stats_24h_start.clone()).unwrap(), + EventKey { + time: TIMESTAMP + DAY_NS, + counter: 108 + } + ); +} + +#[test] +fn should_prune_at_most_100_events_30d() { + let mut storage = test_storage(); + let event = EventData { + event: Event::PrepareDelegation(PrepareDelegationEvent { + ii_domain: Some(IIDomain::Ic0App), + frontend: EXAMPLE_URL.to_string(), + session_duration_ns: to_ns(SESS_DURATION_SEC), + }), + }; + let aggregation_key = AggregationKey::new( + PrepareDelegationCount, + Month, + Some(IIDomain::Ic0App), + EXAMPLE_URL.to_string(), + ); + + for _ in 0..107 { + update_events_internal(event.clone(), TIMESTAMP, &mut storage); + } + + update_events_internal(event.clone(), TIMESTAMP + 30 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + + // Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning + // --> 8 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + assert_eq!(storage.event_data.len(), 8); + update_events_internal(event.clone(), TIMESTAMP + 60 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + // Prune again, after another 30d leaving only the event that triggered the pruning + // --> 1 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + assert_eq!(storage.event_data.len(), 1); +} + #[test] fn should_account_for_dapps_changing_session_lifetime() { let mut storage = test_storage(); @@ -701,6 +791,16 @@ fn should_prune_zero_weighted_events() { assert_eq!(storage.event_data.len(), 1); } +#[test] +fn should_wrap_event_key_counter_correctly() { + let key = EventKey { + time: TIMESTAMP, + counter: u16::MAX, + }; + let next_key = key.next_key(); + assert!(next_key > key); +} + /// Make sure the cached count values are consistent with the actual data fn assert_event_count_consistent(storage: &mut Storage>>>) { assert_eq!( diff --git a/src/internet_identity/src/storage/storable_persistent_state.rs b/src/internet_identity/src/storage/storable_persistent_state.rs index d7d675db90..b186729819 100644 --- a/src/internet_identity/src/storage/storable_persistent_state.rs +++ b/src/internet_identity/src/storage/storable_persistent_state.rs @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter; use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter; use crate::stats::activity_stats::ActivityStats; +use crate::stats::event_stats::EventKey; use candid::{CandidType, Deserialize}; use ic_stable_structures::storable::Bound; use ic_stable_structures::Storable; @@ -30,6 +31,7 @@ pub struct StorablePersistentState { event_data_count: Option, // opt of backwards compatibility event_aggregations_count: Option, + event_stats_24h_start: Option, } impl Storable for StorablePersistentState { @@ -66,6 +68,7 @@ impl From for StorablePersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: Some(s.event_data_count), event_aggregations_count: Some(s.event_aggregations_count), + event_stats_24h_start: s.event_stats_24h_start, } } } @@ -82,6 +85,7 @@ impl From for PersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: s.event_data_count.unwrap_or_default(), event_aggregations_count: s.event_aggregations_count.unwrap_or_default(), + event_stats_24h_start: s.event_stats_24h_start, } } } @@ -120,6 +124,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: Some(0), event_aggregations_count: Some(0), + event_stats_24h_start: None, }; assert_eq!(StorablePersistentState::default(), expected_defaults); @@ -137,6 +142,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, + event_stats_24h_start: None, }; assert_eq!(PersistentState::default(), expected_defaults); } diff --git a/src/internet_identity/tests/integration/aggregation_stats.rs b/src/internet_identity/tests/integration/aggregation_stats.rs index b0913b951b..2ef1516df9 100644 --- a/src/internet_identity/tests/integration/aggregation_stats.rs +++ b/src/internet_identity/tests/integration/aggregation_stats.rs @@ -122,6 +122,90 @@ fn should_report_at_most_100_entries() -> Result<(), CallError> { Ok(()) } +#[test] +fn should_remove_at_most_100_entries_24h() -> Result<(), CallError> { + let env = env(); + let canister_id = install_ii_canister(&env, II_WASM.clone()); + let ii_origin = "ic0.app"; + let identity_nr = create_identity(&env, canister_id, ii_origin); + + for i in 0..102 { + delegation_for_origin( + &env, + canister_id, + identity_nr, + &format!("https://some-dapp-{}", i), + )?; + } + + env.advance_time(Duration::from_secs(60 * 60 * 24)); + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + + // 100 entries should have been pruned, leaving 3 + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "24h", ii_origin)) + .unwrap() + .len(), + 3 + ); + + // The rest should have been pruned, leaving 1 + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "24h", ii_origin)) + .unwrap() + .len(), + 1 + ); + Ok(()) +} + +#[test] +fn should_prune_at_most_100_entries_30d() -> Result<(), CallError> { + let env = env(); + let canister_id = install_ii_canister(&env, II_WASM.clone()); + let ii_origin = "ic0.app"; + let identity_nr = create_identity(&env, canister_id, ii_origin); + + for i in 0..102 { + delegation_for_origin( + &env, + canister_id, + identity_nr, + &format!("https://some-dapp-{}", i), + )?; + } + + env.advance_time(Duration::from_secs(60 * 60 * 24 * 30)); + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + + // 100 entries should have been pruned, leaving 3 + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "30d", ii_origin)) + .unwrap() + .len(), + 3 + ); + + // The rest should have been pruned, leaving 1 + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "30d", ii_origin)) + .unwrap() + .len(), + 1 + ); + Ok(()) +} + #[test] fn should_keep_aggregations_across_upgrades() -> Result<(), CallError> { const II_ORIGIN: &str = "ic0.app";