-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(project-cache): Merge instead of replace project state channel in upstream #3952
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,7 +79,10 @@ impl UpstreamQuery for GetProjectStates { | |
/// The wrapper struct for the incoming external requests which also keeps addition information. | ||
#[derive(Debug)] | ||
struct ProjectStateChannel { | ||
// Main broadcast channel. | ||
channel: BroadcastChannel<ProjectFetchState>, | ||
// Additional broadcast channels tracked from merge operations. | ||
other: Vec<BroadcastChannel<ProjectFetchState>>, | ||
deadline: Instant, | ||
no_cache: bool, | ||
attempts: u64, | ||
|
@@ -99,6 +102,7 @@ impl ProjectStateChannel { | |
Self { | ||
no_cache, | ||
channel: sender.into_channel(), | ||
other: Vec::new(), | ||
deadline: now + timeout, | ||
attempts: 0, | ||
errors: 0, | ||
|
@@ -115,12 +119,35 @@ impl ProjectStateChannel { | |
} | ||
|
||
pub fn send(self, state: ProjectFetchState) { | ||
for channel in self.other { | ||
channel.send(state.clone()); | ||
} | ||
self.channel.send(state) | ||
} | ||
|
||
pub fn expired(&self) -> bool { | ||
Instant::now() > self.deadline | ||
} | ||
|
||
pub fn merge(&mut self, channel: ProjectStateChannel) { | ||
let ProjectStateChannel { | ||
channel, | ||
other, | ||
deadline, | ||
no_cache, | ||
attempts, | ||
errors, | ||
pending, | ||
} = channel; | ||
|
||
self.other.push(channel); | ||
self.other.extend(other); | ||
self.deadline = self.deadline.max(deadline); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be the minimum? In theory the deadline could get bumped indefinitely here, which maybe isn't a bad thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think max is correct, matches the previous behavior more closely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
self.no_cache |= no_cache; | ||
self.attempts += attempts; | ||
self.errors += errors; | ||
self.pending += pending; | ||
} | ||
} | ||
|
||
/// The map of project keys with their project state channels. | ||
|
@@ -269,6 +296,23 @@ impl UpstreamProjectSourceService { | |
} | ||
} | ||
|
||
/// Merges a [`ProjectStateChannel`] into the existing list of tracked channels. | ||
/// | ||
/// A channel is removed when querying the upstream for the project, | ||
/// when the upstream returns pending for this project it needs to be returned to | ||
/// the list of channels. If there is already another request for the same project | ||
/// outstanding those two requests must be merged. | ||
fn merge_channel(&mut self, key: ProjectKey, channel: ProjectStateChannel) { | ||
match self.state_channels.entry(key) { | ||
Entry::Vacant(e) => { | ||
e.insert(channel); | ||
} | ||
Entry::Occupied(mut e) => { | ||
e.get_mut().merge(channel); | ||
} | ||
} | ||
} | ||
|
||
/// Executes an upstream request to fetch project configs. | ||
/// | ||
/// This assumes that currently no request is running. If the upstream request fails or new | ||
|
@@ -374,7 +418,7 @@ impl UpstreamProjectSourceService { | |
let mut result = "ok"; | ||
if response.pending.contains(&key) { | ||
channel.pending += 1; | ||
self.state_channels.insert(key, channel); | ||
self.merge_channel(key, channel); | ||
continue; | ||
} | ||
let state = response | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could replace
channel, other
by a singleSmallVec
, to clarify that there is no real difference between the channels.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explicitly have separate fields for this to guarantee there is always at least one channel, otherwise attaching requires a dance of
channels.last().is_some().or_insert()
.