Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(project-cache): Merge instead of replace project state channel in upstream #3952

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion relay-server/src/services/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ impl UpstreamQuery for GetProjectStates {
/// The wrapper struct for the incoming external requests which also keeps addition information.
#[derive(Debug)]
struct ProjectStateChannel {
// Main broadcast channel.
channel: BroadcastChannel<ProjectFetchState>,
// Additional broadcast channels tracked from merge operations.
other: Vec<BroadcastChannel<ProjectFetchState>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could replace channel, other by a single SmallVec, to clarify that there is no real difference between the channels.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explicitly have separate fields for this to guarantee there is always at least one channel, otherwise attaching requires a dance of channels.last().is_some().or_insert().

deadline: Instant,
no_cache: bool,
attempts: u64,
Expand All @@ -99,6 +102,7 @@ impl ProjectStateChannel {
Self {
no_cache,
channel: sender.into_channel(),
other: Vec::new(),
deadline: now + timeout,
attempts: 0,
errors: 0,
Expand All @@ -115,12 +119,35 @@ impl ProjectStateChannel {
}

pub fn send(self, state: ProjectFetchState) {
for channel in self.other {
channel.send(state.clone());
}
self.channel.send(state)
}

pub fn expired(&self) -> bool {
Instant::now() > self.deadline
}

pub fn merge(&mut self, channel: ProjectStateChannel) {
let ProjectStateChannel {
channel,
other,
deadline,
no_cache,
attempts,
errors,
pending,
} = channel;

self.other.push(channel);
self.other.extend(other);
self.deadline = self.deadline.max(deadline);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the minimum? In theory the deadline could get bumped indefinitely here, which maybe isn't a bad thing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think max is correct, matches the previous behavior more closely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think min matches the previous behaviour more closely since we'd replace the newer entry with the older entry and the deadline is only computed once.

self.no_cache |= no_cache;
self.attempts += attempts;
self.errors += errors;
self.pending += pending;
}
}

/// The map of project keys with their project state channels.
Expand Down Expand Up @@ -269,6 +296,23 @@ impl UpstreamProjectSourceService {
}
}

/// Merges a [`ProjectStateChannel`] into the existing list of tracked channels.
///
/// A channel is removed when querying the upstream for the project,
/// when the upstream returns pending for this project it needs to be returned to
/// the list of channels. If there is already another request for the same project
/// outstanding those two requests must be merged.
fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) {
match self.state_channels.entry(key) {
Entry::Vacant(e) => {
e.insert(channel);
}
Entry::Occupied(mut e) => {
e.get_mut().merge(channel);
}
}
}

/// Executes an upstream request to fetch project configs.
///
/// This assumes that currently no request is running. If the upstream request fails or new
Expand Down Expand Up @@ -374,7 +418,7 @@ impl UpstreamProjectSourceService {
let mut result = "ok";
if response.pending.contains(&key) {
channel.pending += 1;
self.state_channels.insert(key, channel);
self.merge_channel(key, channel);
continue;
}
let state = response
Expand Down
Loading