diff --git a/src/internet_identity/src/state.rs b/src/internet_identity/src/state.rs index f2a2c6503d..f2dca2c957 100644 --- a/src/internet_identity/src/state.rs +++ b/src/internet_identity/src/state.rs @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter; use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter; use crate::stats::activity_stats::ActivityStats; +use crate::stats::event_stats::EventKey; use crate::storage::anchor::Anchor; use crate::storage::MAX_ENTRIES; use crate::{random_salt, Storage}; @@ -103,6 +104,11 @@ pub struct PersistentState { // event_aggregations is expected to have a lot of entries, thus counting by iterating over it is not // an option. pub event_aggregations_count: u64, + // Key into the event_data BTreeMap where the 24h tracking window starts. + // This key is used to prune old entries from the 24h event aggregations. + // If it is `none`, then the 24h pruning window starts from the newest entry in the event_data + // BTreeMap minus 24h. + pub event_stats_24h_pruning_start: Option, } impl Default for PersistentState { @@ -118,6 +124,7 @@ impl Default for PersistentState { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, + event_stats_24h_pruning_start: None, } } } diff --git a/src/internet_identity/src/stats/event_stats.rs b/src/internet_identity/src/stats/event_stats.rs index df38c05e17..9829e13416 100644 --- a/src/internet_identity/src/stats/event_stats.rs +++ b/src/internet_identity/src/stats/event_stats.rs @@ -59,6 +59,7 @@ use crate::stats::event_stats::event_aggregations::AGGREGATIONS; use crate::stats::event_stats::Event::PruneEvent; use crate::storage::Storage; use crate::{state, DAY_NS, MINUTE_NS}; +use candid::CandidType; use ic_cdk::api::call::CallResult; use ic_cdk::api::time; use ic_cdk::{caller, id, trap}; @@ -73,12 +74,15 @@ use std::time::Duration; /// This module defines the aggregations over the events. mod event_aggregations; + pub use event_aggregations::*; +const MAX_EVENTS_TO_PRUNE: usize = 100; + #[cfg(test)] mod tests; -#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)] +#[derive(Deserialize, Serialize, CandidType, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)] pub struct EventKey { /// Timestamp of the event. pub time: Timestamp, @@ -104,6 +108,19 @@ impl EventKey { counter: u16::MAX, } } + + pub fn next_key(&self) -> Self { + match self.counter { + u16::MAX => Self { + time: self.time + 1, + counter: 0, + }, + _ => Self { + time: self.time, + counter: self.counter + 1, + }, + } + } } #[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug)] @@ -276,62 +293,122 @@ fn update_events_internal(event: EventData, now: Timestamp, s: &mut S s.event_data_count += 1; }); - let pruned_24h = if let Some((prev_key, _)) = s.event_data.iter_upper_bound(¤t_key).next() - { - // `timestamp` denotes the time of the last event recorded just before `now` - // The difference (now - timestamp) is the time that has passed since the last event - // and hence the last time the daily stats were updated. - // Therefore, we need to prune all events from the daily aggregations that happened in - // that same difference, but 24 hours ago. - // - // |<----------------------- 24h ----------------------->| - // |--|--------------------------------------------------|--| --> time - // ^ ^ ^ ^ - // | | | └ now - // | └ now - 24h (prune_window_end) └ timestamp - // └ timestamp - 24h (prune_window_start) - - let prune_window_start = prev_key.time - DAY_NS; - let prune_window_end = now - DAY_NS; - s.event_data - .range(EventKey::min_key(prune_window_start)..=EventKey::max_key(prune_window_end)) - .collect::>() - } else { - // there is no event before `now` in the db, so the list of pruned events is empty - vec![] - }; - + let mut aggregations_db_wrapper = CountingAggregationsWrapper(&mut s.event_aggregations); // Update 24h aggregations - AGGREGATIONS.iter().for_each(|aggregation| { - update_aggregation( - |(_, data)| aggregation.process_event(AggregationWindow::Day, data), - current_key.clone(), - event.clone(), - &pruned_24h, - &mut CountingAggregationsWrapper(&mut s.event_aggregations), - ); - }); + update_aggregations( + ¤t_key, + &event, + &prune_events_24h(now, ¤t_key, &s.event_data), + AggregationWindow::Day, + &mut aggregations_db_wrapper, + ); // This pruning _deletes_ the data older than 30 days. Do this after the 24h aggregation // otherwise the daily stats become inaccurate on the unlikely event that there is no activity // for 30 days. - let pruned_30d = prune_events(&mut s.event_data, now); - + let pruned_30d = prune_events_30d(&mut s.event_data, now); // Update 30d aggregations + update_aggregations( + ¤t_key, + &event, + &pruned_30d, + AggregationWindow::Month, + &mut aggregations_db_wrapper, + ); +} + +/// Iterates over all aggregations and updates them based on the new event and the pruned events. +fn update_aggregations( + event_key: &EventKey, + event_data: &EventData, + pruned_events: &[(EventKey, EventData)], + window: AggregationWindow, + aggregations_db: &mut CountingAggregationsWrapper, +) { AGGREGATIONS.iter().for_each(|aggregation| { update_aggregation( - |(_, data)| aggregation.process_event(AggregationWindow::Month, data), - current_key.clone(), - event.clone(), - &pruned_30d, - &mut CountingAggregationsWrapper(&mut s.event_aggregations), + |(_, data)| aggregation.process_event(window.clone(), data), + event_key.clone(), + event_data.clone(), + pruned_events, + aggregations_db, ); }); } -/// Adds an event to the event_data map and simultaneously removes events older than the retention period (30d). +/// Collects events older than 24h that need to be removed from the 24h aggregations. +/// Given events are kept for 30 days, the events are not deleted from the supplied `db`. +/// Instead, this function simply updates the `event_stats_24h_pruning_start` state variable +/// that denotes the first event that should be pruned in the next call. +/// +/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PRUNE]. +fn prune_events_24h( + now: Timestamp, + current_key: &EventKey, + db: &StableBTreeMap, +) -> Vec<(EventKey, EventData)> { + /// Calculates the 24h window start based on the current key: + /// - The current key is used to find the last event right before it. + /// - The timestamp of that key is then shifted back 24h. + /// + /// This assumes that the 24h window has been correctly pruned in the past, including up to the + /// previous event. + /// + /// ``` + /// |<----------------------- 24h ----------------------->| + /// |--|--------------------------------------------------|--| --> time + /// ^ ^ ^ ^ + /// | | | └ current_key + /// | └ now - 24h (prune_window_end) └ previous event + /// └ previous event timestamp - 24h (prune_window_start) + /// ``` + fn prune_window_start_from_current_key( + current_key: &EventKey, + db: &StableBTreeMap, + ) -> Option { + db.iter_upper_bound(current_key) + .next() + .map(|(k, _)| EventKey::min_key(k.time - DAY_NS)) + } + + let prune_window_start = + // Continue pruning from the last key. This value will be set if the 24h window has been pruned + // before. + state::persistent_state(|s| s.event_stats_24h_pruning_start.clone()).or_else(|| { + // Alternatively, calculate it from the current key. This is necessary in two cases: + // - the events have never been pruned before because they are not yet 24h old. + // - this is the first event after an II upgrade from a version that did not have this + // state variable to track the beginning of the 24h window. + prune_window_start_from_current_key(current_key, db) + }); + + let Some(prune_start_key) = prune_window_start else { + // there is no key to start pruning from, so the list of pruned events is empty. + return vec![]; + }; + + // Always aim to prune up to 24h ago, event if past attempts did not manage due to the + // MAX_EVENTS_TO_PRUNE limit. + let prune_window_end = now - DAY_NS; + let events = db + .range(prune_start_key..=EventKey::max_key(prune_window_end)) + .take(MAX_EVENTS_TO_PRUNE) + .collect::>(); + + // update the persistent state with they key _after_ the one being pointed to by the last event + // so that the next amortized pruning can continue from there. + if let Some((k, _)) = events.last() { + state::persistent_state_mut(|s| { + s.event_stats_24h_pruning_start = Some(k.next_key()); + }); + } + events +} + +/// Removes events older than the retention period (30d). /// Returns a vec of tuples of the pruned events and their timestamps. -fn prune_events( +/// Prunes at most [MAX_EVENTS_TO_PRUNE]. +fn prune_events_30d( db: &mut StableBTreeMap, now: Timestamp, ) -> Vec<(EventKey, EventData)> { @@ -339,6 +416,7 @@ fn prune_events( let pruned_events: Vec<_> = db .range(..=EventKey::max_key(now - RETENTION_PERIOD)) + .take(MAX_EVENTS_TO_PRUNE) .collect(); for entry in &pruned_events { let entry: &(EventKey, EventData) = entry; diff --git a/src/internet_identity/src/stats/event_stats/tests.rs b/src/internet_identity/src/stats/event_stats/tests.rs index b6e9c8047a..a0f8788cf7 100644 --- a/src/internet_identity/src/stats/event_stats/tests.rs +++ b/src/internet_identity/src/stats/event_stats/tests.rs @@ -217,7 +217,7 @@ fn should_store_multiple_events_and_aggregate_expected_weight_count() { PrepareDelegationCount, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 2 @@ -229,7 +229,7 @@ fn should_store_multiple_events_and_aggregate_expected_weight_count() { PrepareDelegationSessionSeconds, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), SESS_DURATION_SEC * 2 @@ -241,7 +241,7 @@ fn should_store_multiple_events_and_aggregate_expected_weight_count() { PrepareDelegationCount, Month, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 2 @@ -253,7 +253,7 @@ fn should_store_multiple_events_and_aggregate_expected_weight_count() { PrepareDelegationSessionSeconds, Month, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), SESS_DURATION_SEC * 2 @@ -286,7 +286,7 @@ fn should_prune_daily_events_after_24h() { PrepareDelegationCount, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 1 @@ -298,7 +298,7 @@ fn should_prune_daily_events_after_24h() { PrepareDelegationSessionSeconds, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), SESS_DURATION_SEC @@ -310,7 +310,7 @@ fn should_prune_daily_events_after_24h() { PrepareDelegationCount, Month, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 2 @@ -322,7 +322,7 @@ fn should_prune_daily_events_after_24h() { PrepareDelegationSessionSeconds, Month, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), SESS_DURATION_SEC * 2 @@ -354,7 +354,7 @@ fn should_prune_monthly_events_after_30d() { PrepareDelegationCount, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 1 @@ -366,7 +366,7 @@ fn should_prune_monthly_events_after_30d() { PrepareDelegationSessionSeconds, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), SESS_DURATION_SEC @@ -378,7 +378,7 @@ fn should_prune_monthly_events_after_30d() { PrepareDelegationCount, Month, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 1 @@ -390,13 +390,97 @@ fn should_prune_monthly_events_after_30d() { PrepareDelegationSessionSeconds, Month, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), SESS_DURATION_SEC ); } +#[test] +fn should_prune_at_most_100_events_24h() { + let mut storage = test_storage(); + let event = EventData { + event: Event::PrepareDelegation(PrepareDelegationEvent { + ii_domain: Some(IIDomain::Ic0App), + frontend: EXAMPLE_URL.to_string(), + session_duration_ns: to_ns(SESS_DURATION_SEC), + }), + }; + let aggregation_key = AggregationKey::new( + PrepareDelegationCount, + Day, + Some(IIDomain::Ic0App), + EXAMPLE_URL.to_string(), + ); + + for _ in 0..107 { + update_events_internal(event.clone(), TIMESTAMP, &mut storage); + } + + update_events_internal(event.clone(), TIMESTAMP + DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + + // Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning + // --> 8 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + assert_eq!( + persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(), + EventKey { + time: TIMESTAMP, + counter: 100 + } + ); + update_events_internal(event.clone(), TIMESTAMP + 2 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + // Prune again, after another 24h leaving only the event that triggered the pruning + // --> 1 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + assert_eq!( + persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(), + EventKey { + time: TIMESTAMP + DAY_NS, + counter: 108 + } + ); +} + +#[test] +fn should_prune_at_most_100_events_30d() { + let mut storage = test_storage(); + let event = EventData { + event: Event::PrepareDelegation(PrepareDelegationEvent { + ii_domain: Some(IIDomain::Ic0App), + frontend: EXAMPLE_URL.to_string(), + session_duration_ns: to_ns(SESS_DURATION_SEC), + }), + }; + let aggregation_key = AggregationKey::new( + PrepareDelegationCount, + Month, + Some(IIDomain::Ic0App), + EXAMPLE_URL.to_string(), + ); + + for _ in 0..107 { + update_events_internal(event.clone(), TIMESTAMP, &mut storage); + } + + update_events_internal(event.clone(), TIMESTAMP + 30 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + + // Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning + // --> 8 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8); + assert_eq!(storage.event_data.len(), 8); + update_events_internal(event.clone(), TIMESTAMP + 60 * DAY_NS, &mut storage); + assert_event_count_consistent(&mut storage); + // Prune again, after another 30d leaving only the event that triggered the pruning + // --> 1 expected events + assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1); + assert_eq!(storage.event_data.len(), 1); +} + #[test] fn should_account_for_dapps_changing_session_lifetime() { let mut storage = test_storage(); @@ -417,7 +501,7 @@ fn should_account_for_dapps_changing_session_lifetime() { PrepareDelegationSessionSeconds, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 1800 @@ -440,7 +524,7 @@ fn should_account_for_dapps_changing_session_lifetime() { PrepareDelegationSessionSeconds, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 2700 @@ -463,7 +547,7 @@ fn should_account_for_dapps_changing_session_lifetime() { PrepareDelegationSessionSeconds, Day, Some(IIDomain::Ic0App), - EXAMPLE_URL.to_string() + EXAMPLE_URL.to_string(), )) .unwrap(), 1 diff --git a/src/internet_identity/src/storage/storable_persistent_state.rs b/src/internet_identity/src/storage/storable_persistent_state.rs index d7d675db90..2b1a081013 100644 --- a/src/internet_identity/src/storage/storable_persistent_state.rs +++ b/src/internet_identity/src/storage/storable_persistent_state.rs @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter; use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter; use crate::stats::activity_stats::ActivityStats; +use crate::stats::event_stats::EventKey; use candid::{CandidType, Deserialize}; use ic_stable_structures::storable::Bound; use ic_stable_structures::Storable; @@ -30,6 +31,7 @@ pub struct StorablePersistentState { event_data_count: Option, // opt of backwards compatibility event_aggregations_count: Option, + event_stats_24h_pruning_start: Option, } impl Storable for StorablePersistentState { @@ -66,6 +68,7 @@ impl From for StorablePersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: Some(s.event_data_count), event_aggregations_count: Some(s.event_aggregations_count), + event_stats_24h_pruning_start: s.event_stats_24h_pruning_start, } } } @@ -82,6 +85,7 @@ impl From for PersistentState { max_inflight_captchas: s.max_inflight_captchas, event_data_count: s.event_data_count.unwrap_or_default(), event_aggregations_count: s.event_aggregations_count.unwrap_or_default(), + event_stats_24h_pruning_start: s.event_stats_24h_pruning_start, } } } @@ -120,6 +124,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: Some(0), event_aggregations_count: Some(0), + event_stats_24h_pruning_start: None, }; assert_eq!(StorablePersistentState::default(), expected_defaults); @@ -137,6 +142,7 @@ mod tests { max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS, event_data_count: 0, event_aggregations_count: 0, + event_stats_24h_pruning_start: None, }; assert_eq!(PersistentState::default(), expected_defaults); } diff --git a/src/internet_identity/tests/integration/aggregation_stats.rs b/src/internet_identity/tests/integration/aggregation_stats.rs index b0913b951b..a404abef10 100644 --- a/src/internet_identity/tests/integration/aggregation_stats.rs +++ b/src/internet_identity/tests/integration/aggregation_stats.rs @@ -122,6 +122,90 @@ fn should_report_at_most_100_entries() -> Result<(), CallError> { Ok(()) } +#[test] +fn should_prune_at_most_100_entries_24h() -> Result<(), CallError> { + let env = env(); + let canister_id = install_ii_canister(&env, II_WASM.clone()); + let ii_origin = "ic0.app"; + let identity_nr = create_identity(&env, canister_id, ii_origin); + + for i in 0..102 { + delegation_for_origin( + &env, + canister_id, + identity_nr, + &format!("https://some-dapp-{}", i), + )?; + } + + env.advance_time(Duration::from_secs(60 * 60 * 24)); + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + + // 100 entries should have been pruned, leaving 3 + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "24h", ii_origin)) + .unwrap() + .len(), + 3 + ); + + // The rest should have been pruned, leaving 1 + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "24h", ii_origin)) + .unwrap() + .len(), + 1 + ); + Ok(()) +} + +#[test] +fn should_prune_at_most_100_entries_30d() -> Result<(), CallError> { + let env = env(); + let canister_id = install_ii_canister(&env, II_WASM.clone()); + let ii_origin = "ic0.app"; + let identity_nr = create_identity(&env, canister_id, ii_origin); + + for i in 0..102 { + delegation_for_origin( + &env, + canister_id, + identity_nr, + &format!("https://some-dapp-{}", i), + )?; + } + + env.advance_time(Duration::from_secs(60 * 60 * 24 * 30)); + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + + // 100 entries should have been pruned, leaving 3 + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "30d", ii_origin)) + .unwrap() + .len(), + 3 + ); + + // The rest should have been pruned, leaving 1 + delegation_for_origin(&env, canister_id, identity_nr, "https://some-dapp.com")?; + let aggregations = api::stats(&env, canister_id)?.event_aggregations; + assert_eq!( + aggregations + .get(&aggregation_key(PD_COUNT, "30d", ii_origin)) + .unwrap() + .len(), + 1 + ); + Ok(()) +} + #[test] fn should_keep_aggregations_across_upgrades() -> Result<(), CallError> { const II_ORIGIN: &str = "ic0.app";