Skip to content

Commit

Permalink
Add explicit bound to amortized clean-up
Browse files Browse the repository at this point in the history
This adds an explicit limit to the number of events processed during
amortized clean-up. Using that, we can (in a future PR) remove the timer
based mechanism for pruning data.
  • Loading branch information
frederikrothenberger committed Jun 3, 2024
1 parent 0aed4b8 commit 07b8ecd
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 59 deletions.
7 changes: 7 additions & 0 deletions src/internet_identity/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ
use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter;
use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter;
use crate::stats::activity_stats::ActivityStats;
use crate::stats::event_stats::EventKey;
use crate::storage::anchor::Anchor;
use crate::storage::MAX_ENTRIES;
use crate::{random_salt, Storage};
Expand Down Expand Up @@ -103,6 +104,11 @@ pub struct PersistentState {
// event_aggregations is expected to have a lot of entries, thus counting by iterating over it is not
// an option.
pub event_aggregations_count: u64,
// Key into the event_data BTreeMap where the 24h tracking window starts.
// This key is used to prune old entries from the 24h event aggregations.
// If it is `none`, then the 24h pruning window starts from the newest entry in the event_data
// BTreeMap minus 24h.
pub event_stats_24h_pruning_start: Option<EventKey>,
}

impl Default for PersistentState {
Expand All @@ -118,6 +124,7 @@ impl Default for PersistentState {
max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS,
event_data_count: 0,
event_aggregations_count: 0,
event_stats_24h_pruning_start: None,
}
}
}
Expand Down
166 changes: 122 additions & 44 deletions src/internet_identity/src/stats/event_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::stats::event_stats::event_aggregations::AGGREGATIONS;
use crate::stats::event_stats::Event::PruneEvent;
use crate::storage::Storage;
use crate::{state, DAY_NS, MINUTE_NS};
use candid::CandidType;
use ic_cdk::api::call::CallResult;
use ic_cdk::api::time;
use ic_cdk::{caller, id, trap};
Expand All @@ -73,12 +74,15 @@ use std::time::Duration;

/// This module defines the aggregations over the events.
mod event_aggregations;

pub use event_aggregations::*;

const MAX_EVENTS_TO_PRUNE: usize = 100;

#[cfg(test)]
mod tests;

#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)]
#[derive(Deserialize, Serialize, CandidType, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub struct EventKey {
/// Timestamp of the event.
pub time: Timestamp,
Expand All @@ -104,6 +108,19 @@ impl EventKey {
counter: u16::MAX,
}
}

pub fn next_key(&self) -> Self {
match self.counter {
u16::MAX => Self {
time: self.time + 1,
counter: 0,
},
_ => Self {
time: self.time,
counter: self.counter + 1,
},
}
}
}

#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -276,69 +293,130 @@ fn update_events_internal<M: Memory>(event: EventData, now: Timestamp, s: &mut S
s.event_data_count += 1;
});

let pruned_24h = if let Some((prev_key, _)) = s.event_data.iter_upper_bound(&current_key).next()
{
// `timestamp` denotes the time of the last event recorded just before `now`
// The difference (now - timestamp) is the time that has passed since the last event
// and hence the last time the daily stats were updated.
// Therefore, we need to prune all events from the daily aggregations that happened in
// that same difference, but 24 hours ago.
//
// |<----------------------- 24h ----------------------->|
// |--|--------------------------------------------------|--| --> time
// ^ ^ ^ ^
// | | | └ now
// | └ now - 24h (prune_window_end) └ timestamp
// └ timestamp - 24h (prune_window_start)

let prune_window_start = prev_key.time - DAY_NS;
let prune_window_end = now - DAY_NS;
s.event_data
.range(EventKey::min_key(prune_window_start)..=EventKey::max_key(prune_window_end))
.collect::<Vec<_>>()
} else {
// there is no event before `now` in the db, so the list of pruned events is empty
vec![]
};

let mut aggregations_db_wrapper = CountingAggregationsWrapper(&mut s.event_aggregations);
// Update 24h aggregations
AGGREGATIONS.iter().for_each(|aggregation| {
update_aggregation(
|(_, data)| aggregation.process_event(AggregationWindow::Day, data),
current_key.clone(),
event.clone(),
&pruned_24h,
&mut CountingAggregationsWrapper(&mut s.event_aggregations),
);
});
update_aggregations(
&current_key,
&event,
&prune_events_24h(now, &current_key, &s.event_data),
AggregationWindow::Day,
&mut aggregations_db_wrapper,
);

// This pruning _deletes_ the data older than 30 days. Do this after the 24h aggregation
// otherwise the daily stats become inaccurate on the unlikely event that there is no activity
// for 30 days.
let pruned_30d = prune_events(&mut s.event_data, now);

let pruned_30d = prune_events_30d(&mut s.event_data, now);
// Update 30d aggregations
update_aggregations(
&current_key,
&event,
&pruned_30d,
AggregationWindow::Month,
&mut aggregations_db_wrapper,
);
}

/// Iterates over all aggregations and updates them based on the new event and the pruned events.
fn update_aggregations<M: Memory>(
event_key: &EventKey,
event_data: &EventData,
pruned_events: &[(EventKey, EventData)],
window: AggregationWindow,
aggregations_db: &mut CountingAggregationsWrapper<M>,
) {
AGGREGATIONS.iter().for_each(|aggregation| {
update_aggregation(
|(_, data)| aggregation.process_event(AggregationWindow::Month, data),
current_key.clone(),
event.clone(),
&pruned_30d,
&mut CountingAggregationsWrapper(&mut s.event_aggregations),
|(_, data)| aggregation.process_event(window.clone(), data),
event_key.clone(),
event_data.clone(),
pruned_events,
aggregations_db,
);
});
}

/// Adds an event to the event_data map and simultaneously removes events older than the retention period (30d).
/// Collects events older than 24h that need to be removed from the 24h aggregations.
/// Given events are kept for 30 days, the events are not deleted from the supplied `db`.
/// Instead, this function simply updates the `event_stats_24h_pruning_start` state variable
/// that denotes the first event that should be pruned in the next call.
///
/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PRUNE].
fn prune_events_24h<M: Memory>(
now: Timestamp,
current_key: &EventKey,
db: &StableBTreeMap<EventKey, EventData, M>,
) -> Vec<(EventKey, EventData)> {
/// Calculates the 24h window start based on the current key:
/// - The current key is used to find the last event right before it.
/// - The timestamp of that key is then shifted back 24h.
///
/// This assumes that the 24h window has been correctly pruned in the past, including up to the
/// previous event.
///
/// ```
/// |<----------------------- 24h ----------------------->|
/// |--|--------------------------------------------------|--| --> time
/// ^ ^ ^ ^
/// | | | └ current_key
/// | └ now - 24h (prune_window_end) └ previous event
/// └ previous event timestamp - 24h (prune_window_start)
/// ```
fn prune_window_start_from_current_key<M: Memory>(
current_key: &EventKey,
db: &StableBTreeMap<EventKey, EventData, M>,
) -> Option<EventKey> {
db.iter_upper_bound(current_key)
.next()
.map(|(k, _)| EventKey::min_key(k.time - DAY_NS))
}

let prune_window_start =
// Continue pruning from the last key. This value will be set if the 24h window has been pruned
// before.
state::persistent_state(|s| s.event_stats_24h_pruning_start.clone()).or_else(|| {
// Alternatively, calculate it from the current key. This is necessary in two cases:
// - the events have never been pruned before because they are not yet 24h old.
// - this is the first event after an II upgrade from a version that did not have this
// state variable to track the beginning of the 24h window.
prune_window_start_from_current_key(current_key, db)
});

let Some(prune_start_key) = prune_window_start else {
// there is no key to start pruning from, so the list of pruned events is empty.
return vec![];
};

// Always aim to prune up to 24h ago, event if past attempts did not manage due to the
// MAX_EVENTS_TO_PRUNE limit.
let prune_window_end = now - DAY_NS;
let events = db
.range(prune_start_key..=EventKey::max_key(prune_window_end))
.take(MAX_EVENTS_TO_PRUNE)
.collect::<Vec<_>>();

// update the persistent state with they key _after_ the one being pointed to by the last event
// so that the next amortized pruning can continue from there.
if let Some((k, _)) = events.last() {
state::persistent_state_mut(|s| {
s.event_stats_24h_pruning_start = Some(k.next_key());
});
}
events
}

/// Removes events older than the retention period (30d).
/// Returns a vec of tuples of the pruned events and their timestamps.
fn prune_events<M: Memory>(
/// Prunes at most [MAX_EVENTS_TO_PRUNE].
fn prune_events_30d<M: Memory>(
db: &mut StableBTreeMap<EventKey, EventData, M>,
now: Timestamp,
) -> Vec<(EventKey, EventData)> {
const RETENTION_PERIOD: u64 = 30 * DAY_NS;

let pruned_events: Vec<_> = db
.range(..=EventKey::max_key(now - RETENTION_PERIOD))
.take(MAX_EVENTS_TO_PRUNE)
.collect();
for entry in &pruned_events {
let entry: &(EventKey, EventData) = entry;
Expand Down
Loading

0 comments on commit 07b8ecd

Please sign in to comment.