Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(system): Instrument spawned tasks #3852

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
disallowed-methods = [
{ path = "tokio::spawn", reason = "use `relay_system::spawn` instead" },
]

2 changes: 1 addition & 1 deletion relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Service for CogsService {
type Interface = CogsReport;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
let mut shutdown = Controller::shutdown_handle();

loop {
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl GlobalConfigService {
let upstream_relay = self.upstream.clone();
let internal_tx = self.internal_tx.clone();

tokio::spawn(async move {
relay_system::spawn!(async move {
metric!(timer(RelayTimers::GlobalConfigRequestDuration), {
let query = GetGlobalConfig::new();
let res = upstream_relay.send(SendQuery(query)).await;
Expand Down Expand Up @@ -360,7 +360,7 @@ impl Service for GlobalConfigService {
type Interface = GlobalConfigManager;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
let mut shutdown_handle = Controller::shutdown_handle();

relay_log::info!("global config service starting");
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl Service for HealthCheckService {
// Add 10% buffer to the internal timeouts to avoid race conditions.
let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1);

tokio::spawn(async move {
relay_system::spawn!(async move {
let shutdown = Controller::shutdown_handle();

while shutdown.get().is_none() {
Expand All @@ -215,7 +215,7 @@ impl Service for HealthCheckService {
update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok();
});

tokio::spawn(async move {
relay_system::spawn!(async move {
while let Some(HealthCheck(message, sender)) = rx.recv().await {
let update = update_rx.borrow();

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/metrics/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Service for AggregatorService {
type Interface = Aggregator;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms));
let mut shutdown = Controller::shutdown_handle();

Expand Down Expand Up @@ -331,7 +331,7 @@ mod tests {
type Interface = TestInterface;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
while let Some(message) = rx.recv().await {
let buckets = message.0.buckets;
relay_log::debug!(?buckets, "received buckets");
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Service for RouterService {
type Interface = Aggregator;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
let mut router = StartedRouter::start(self);
relay_log::info!("metrics router started");

Expand Down Expand Up @@ -117,7 +117,7 @@ impl StartedRouter {
.chain(Some(self.default.send(AcceptsMetrics)))
.collect::<FuturesUnordered<_>>();

tokio::spawn(async move {
relay_system::spawn!(async move {
let mut accepts = true;
while let Some(req) = requests.next().await {
accepts &= req.unwrap_or_default();
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ impl HttpOutcomeProducer {

let upstream_relay = self.upstream_relay.clone();

tokio::spawn(async move {
relay_system::spawn!(async move {
match upstream_relay.send(SendQuery(request)).await {
Ok(_) => relay_log::trace!("outcome batch sent"),
Err(error) => {
Expand All @@ -683,7 +683,7 @@ impl Service for HttpOutcomeProducer {
type Interface = TrackRawOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
loop {
tokio::select! {
// Prioritize flush over receiving messages to prevent starving.
Expand Down Expand Up @@ -776,7 +776,7 @@ impl Service for ClientReportOutcomeProducer {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
loop {
tokio::select! {
// Prioritize flush over receiving messages to prevent starving.
Expand Down Expand Up @@ -1037,7 +1037,7 @@ impl Service for OutcomeProducerService {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let Self { config, inner } = self;

tokio::spawn(async move {
relay_system::spawn!(async move {
let broker = inner.start();

relay_log::info!("OutcomeProducer started.");
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/outcome_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Service for OutcomeAggregator {
type Interface = TrackOutcome;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
let mut shutdown = Controller::shutdown_handle();
relay_log::info!("outcome aggregator started");

Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2814,7 +2814,7 @@ impl Service for EnvelopeProcessorService {
type Interface = EnvelopeProcessor;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
while let Some(message) = rx.recv().await {
let service = self.clone();
self.inner
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ impl ProjectCacheBroker {
let source = self.source.clone();
let sender = self.state_tx.clone();

tokio::spawn(async move {
relay_system::spawn!(async move {
// Wait on the new attempt time when set.
if let Some(next_attempt) = next_attempt {
tokio::time::sleep_until(next_attempt).await;
Expand Down Expand Up @@ -1148,7 +1148,7 @@ impl Service for ProjectCacheService {
let outcome_aggregator = services.outcome_aggregator.clone();
let test_store = services.test_store.clone();

tokio::spawn(async move {
relay_system::spawn!(async move {
let mut ticker = tokio::time::interval(config.cache_eviction_interval());
relay_log::info!("project cache started");

Expand Down Expand Up @@ -1429,7 +1429,7 @@ mod tests {
}

// Emulate the project cache service loop.
tokio::task::spawn(async move {
relay_system::spawn!(async move {
loop {
select! {

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/project_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn spawn_poll_local_states(
poll_local_states(&project_path, &tx).await;

// Start a background loop that polls periodically:
tokio::spawn(async move {
relay_system::spawn!(async move {
// To avoid running two load tasks simultaneously at startup, we delay the interval by one period:
let start_at = Instant::now() + period;
let mut ticker = tokio::time::interval_at(start_at, period);
Expand All @@ -166,7 +166,7 @@ impl Service for LocalProjectSourceService {
// collect the result, the producer will block, which is acceptable.
let (state_tx, mut state_rx) = mpsc::channel(1);

tokio::spawn(async move {
relay_system::spawn!(async move {
relay_log::info!("project local cache started");

// Start the background task that periodically reloads projects from disk:
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/project_upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ impl UpstreamProjectSourceService {
let channels = self.prepare_batches();
let upstream_relay = self.upstream_relay.clone();

tokio::spawn(async move {
relay_system::spawn!(async move {
let responses = Self::fetch_states(config, upstream_relay, channels).await;
// Send back all resolved responses and also unused channels.
// These responses will be handled by `handle_responses` function.
Expand Down Expand Up @@ -532,7 +532,7 @@ impl Service for UpstreamProjectSourceService {
type Interface = UpstreamProjectSource;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
relay_log::info!("project upstream cache started");
loop {
tokio::select! {
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/relays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl RelayCacheService {

let fetch_tx = self.fetch_tx();
let upstream_relay = self.upstream_relay.clone();
tokio::spawn(async move {
relay_system::spawn!(async move {
let request = GetRelays {
relay_ids: channels.keys().cloned().collect(),
};
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Service for RelayCacheService {
type Interface = RelayCache;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
relay_log::info!("key cache started");

loop {
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ impl Service for HttpServer {
relay_log::info!("spawning http server");
relay_log::info!(" listening on http://{}/", config.listen_addr());
relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1);
tokio::spawn(server.serve(app));
relay_system::spawn!(server.serve(app));
}
Err(err) => {
relay_log::error!("Failed to start the HTTP server: {err}");
std::process::exit(1);
}
}

tokio::spawn(async move {
relay_system::spawn!(async move {
let Shutdown { timeout } = Controller::shutdown_handle().notified().await;
relay_log::info!("Shutting down HTTP server");

Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ impl BufferService {
BufferState::Disk(ref disk) => {
let db = disk.db.clone();
let project_cache = self.services.project_cache.clone();
tokio::spawn(async move {
relay_system::spawn!(async move {
match OnDisk::get_spooled_index(&db).await {
Ok(index) => {
relay_log::trace!(
Expand Down Expand Up @@ -1255,7 +1255,7 @@ impl Service for BufferService {
type Interface = Buffer;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
let mut shutdown = Controller::shutdown_handle();

loop {
Expand Down Expand Up @@ -1574,7 +1574,7 @@ mod tests {
type Interface = TestHealth;

fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
loop {
tokio::select! {
Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl Service for RelayStats {
return;
};

tokio::spawn(async move {
relay_system::spawn!(async move {
loop {
let _ = tokio::join!(
self.upstream_status(),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ impl Service for StoreService {
fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
let this = Arc::new(self);

tokio::spawn(async move {
relay_system::spawn!(async move {
relay_log::info!("store forwarder started");

while let Some(message) = rx.recv().await {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/test_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl relay_system::Service for TestStoreService {
type Interface = TestStore;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_system::spawn!(async move {
while let Some(message) = rx.recv().await {
self.handle_message(message);
}
Expand Down
8 changes: 4 additions & 4 deletions relay-server/src/services/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ impl ConnectionMonitor {
// Only take action if we exceeded the grace period.
if first_error + self.client.config.http_outage_grace_period() <= now {
let return_tx = return_tx.clone();
let task = tokio::spawn(Self::connect(self.client.clone(), return_tx));
let task = relay_system::spawn!(Self::connect(self.client.clone(), return_tx));
self.state = ConnectionState::Reconnecting(task);
}
}
Expand Down Expand Up @@ -1429,7 +1429,7 @@ impl UpstreamBroker {
let client = self.client.clone();
let action_tx = self.action_tx.clone();

tokio::spawn(async move {
relay_system::spawn!(async move {
let send_start = Instant::now();
let result = client.send(entry.request.as_mut()).await;
emit_response_metrics(send_start, &entry, &result);
Expand Down Expand Up @@ -1512,7 +1512,7 @@ impl Service for UpstreamRelayService {
state: AuthState::Unknown,
tx: action_tx.clone(),
};
tokio::spawn(auth.run());
relay_system::spawn!(auth.run());

// Main broker that serializes public and internal messages, as well as maintains connection
// and authentication state.
Expand All @@ -1525,7 +1525,7 @@ impl Service for UpstreamRelayService {
action_tx,
};

tokio::spawn(async move {
relay_system::spawn!(async move {
loop {
tokio::select! {
biased;
Expand Down
1 change: 1 addition & 0 deletions relay-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ relay-statsd = { workspace = true }
tokio = { workspace = true, features = ["rt", "signal", "macros", "sync", "time"] }

[dev-dependencies]
insta = { workspace = true }
relay-statsd = { workspace = true, features = ["test"] }
tokio = { workspace = true, features = ["test-util"] }
5 changes: 3 additions & 2 deletions relay-system/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl ShutdownHandle {
/// type Interface = ();
///
/// fn spawn_handler(self, mut rx: relay_system::Receiver<Self::Interface>) {
/// tokio::spawn(async move {
/// relay_system::spawn!(async move {
/// let mut shutdown = Controller::shutdown_handle();
///
/// loop {
Expand Down Expand Up @@ -166,8 +166,9 @@ pub struct Controller;

impl Controller {
/// Starts a controller that monitors shutdown signals.
#[track_caller]
pub fn start(shutdown_timeout: Duration) {
tokio::spawn(monitor_shutdown(shutdown_timeout));
crate::spawn!(monitor_shutdown(shutdown_timeout));
}

/// Manually initiates the shutdown process of the system.
Expand Down
2 changes: 2 additions & 0 deletions relay-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#![allow(clippy::derive_partial_eq_without_eq)]

mod controller;
mod runtime;
mod service;
mod statsd;

pub use self::controller::*;
pub use self::runtime::*;
pub use self::service::*;
Loading
Loading