From 0c3b9648c0bb3e5417d4d54ce5522e1f23497f38 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 23 Jul 2024 14:46:24 +0200 Subject: [PATCH 1/3] ref(system): Instrument spawned tasks --- clippy.toml | 4 ++++ relay-system/src/lib.rs | 2 ++ relay-system/src/runtime.rs | 37 +++++++++++++++++++++++++++++++ relay-system/src/statsd.rs | 43 +++++++++++++++++++++++++++++++++++-- 4 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 clippy.toml create mode 100644 relay-system/src/runtime.rs diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000000..8a2a07de38 --- /dev/null +++ b/clippy.toml @@ -0,0 +1,4 @@ +disallowed-methods = [ + { path = "tokio::spawn", reason = "use `relay_system::spawn` instead" }, +] + diff --git a/relay-system/src/lib.rs b/relay-system/src/lib.rs index 7c7a69a7bd..749007b664 100644 --- a/relay-system/src/lib.rs +++ b/relay-system/src/lib.rs @@ -23,8 +23,10 @@ #![allow(clippy::derive_partial_eq_without_eq)] mod controller; +mod runtime; mod service; mod statsd; pub use self::controller::*; +pub use self::runtime::*; pub use self::service::*; diff --git a/relay-system/src/runtime.rs b/relay-system/src/runtime.rs new file mode 100644 index 0000000000..d5c3f1041c --- /dev/null +++ b/relay-system/src/runtime.rs @@ -0,0 +1,37 @@ +use std::panic::Location; +use std::time::Instant; + +use futures::Future; +use tokio::task::JoinHandle; + +use crate::statsd::{SystemCounters, SystemTimers}; + +/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. +/// +/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`]. +#[track_caller] +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + let location = Location::caller(); + let task_id = format!("{}:{}", location.file(), location.line()); + + relay_statsd::metric!( + counter(SystemCounters::RuntimeTasksCreated) += 1, + id = &task_id + ); + + let created = Instant::now(); + tokio::spawn(async move { + let result = future.await; + + relay_statsd::metric!( + timer(SystemTimers::RuntimeTasksFinished) = created.elapsed(), + id = &task_id + ); + + result + }) +} diff --git a/relay-system/src/statsd.rs b/relay-system/src/statsd.rs index c2fbb6b131..5845699f1f 100644 --- a/relay-system/src/statsd.rs +++ b/relay-system/src/statsd.rs @@ -1,4 +1,43 @@ -use relay_statsd::GaugeMetric; +use relay_statsd::{CounterMetric, GaugeMetric, TimerMetric}; + +/// Counter metrics for Relay system components. +pub enum SystemCounters { + /// Number of runtime tasks created/spawned. + /// + /// Every call to [`spawn`](`crate::spawn`) increases this counter by one. + /// + /// This metric is tagged with: + /// - `id`: A unique identifier for the task, derived from its location in code. + RuntimeTasksCreated, +} + +impl CounterMetric for SystemCounters { + fn name(&self) -> &'static str { + match self { + Self::RuntimeTasksCreated => "runtime.task.spawn.created", + } + } +} + +/// Timer metrics for Relay system components. +pub enum SystemTimers { + /// Duration how long a runtime task was alive for. + /// + /// This metric is emitted once for each task passed to [`spawn`](crate::spawn) + /// with the time it took for the passed task to terminate. + /// + /// This metric is tagged with: + /// - `id`: A unique identifier for the task, derived from its location in code. + RuntimeTasksFinished, +} + +impl TimerMetric for SystemTimers { + fn name(&self) -> &'static str { + match self { + Self::RuntimeTasksFinished => "runtime.task.spawn.finished", + } + } +} /// Gauge metrics for Relay system components. pub enum SystemGauges { @@ -16,7 +55,7 @@ pub enum SystemGauges { impl GaugeMetric for SystemGauges { fn name(&self) -> &'static str { match *self { - SystemGauges::ServiceBackPressure => "service.back_pressure", + Self::ServiceBackPressure => "service.back_pressure", } } } From 8abf61f3ee991afd2c26bdaa858bb98e052d3d23 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Wed, 24 Jul 2024 13:59:11 +0200 Subject: [PATCH 2/3] replace all uses --- relay-server/src/services/cogs.rs | 2 +- relay-server/src/services/global_config.rs | 4 ++-- relay-server/src/services/health_check.rs | 4 ++-- relay-server/src/services/metrics/aggregator.rs | 4 ++-- relay-server/src/services/metrics/router.rs | 4 ++-- relay-server/src/services/outcome.rs | 8 ++++---- relay-server/src/services/outcome_aggregator.rs | 2 +- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/project_cache.rs | 6 +++--- relay-server/src/services/project_local.rs | 4 ++-- relay-server/src/services/project_upstream.rs | 4 ++-- relay-server/src/services/relays.rs | 4 ++-- relay-server/src/services/server.rs | 4 ++-- relay-server/src/services/spooler/mod.rs | 6 +++--- relay-server/src/services/stats.rs | 2 +- relay-server/src/services/store.rs | 2 +- relay-server/src/services/test_store.rs | 2 +- relay-server/src/services/upstream.rs | 8 ++++---- relay-system/src/controller.rs | 5 +++-- relay-system/src/runtime.rs | 1 + relay-system/src/service.rs | 4 ++-- relay-test/src/lib.rs | 2 +- 22 files changed, 43 insertions(+), 41 deletions(-) diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index acbeb790ab..453e3de0a3 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -106,7 +106,7 @@ impl Service for CogsService { type Interface = CogsReport; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown = Controller::shutdown_handle(); loop { diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index bc08352626..a194fc9b04 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -284,7 +284,7 @@ impl GlobalConfigService { let upstream_relay = self.upstream.clone(); let internal_tx = self.internal_tx.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { metric!(timer(RelayTimers::GlobalConfigRequestDuration), { let query = GetGlobalConfig::new(); let res = upstream_relay.send(SendQuery(query)).await; @@ -360,7 +360,7 @@ impl Service for GlobalConfigService { type Interface = GlobalConfigManager; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown_handle = Controller::shutdown_handle(); relay_log::info!("global config service starting"); diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index a32750297c..ce8c778cbf 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -198,7 +198,7 @@ impl Service for HealthCheckService { // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); - tokio::spawn(async move { + relay_system::spawn(async move { let shutdown = Controller::shutdown_handle(); while shutdown.get().is_none() { @@ -215,7 +215,7 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(HealthCheck(message, sender)) = rx.recv().await { let update = update_rx.borrow(); diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index e76bf78bda..378f1b04c3 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -229,7 +229,7 @@ impl Service for AggregatorService { type Interface = Aggregator; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -331,7 +331,7 @@ mod tests { type Interface = TestInterface; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; relay_log::debug!(?buckets, "received buckets"); diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 76b063e752..13072c5c88 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -44,7 +44,7 @@ impl Service for RouterService { type Interface = Aggregator; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut router = StartedRouter::start(self); relay_log::info!("metrics router started"); @@ -117,7 +117,7 @@ impl StartedRouter { .chain(Some(self.default.send(AcceptsMetrics))) .collect::>(); - tokio::spawn(async move { + relay_system::spawn(async move { let mut accepts = true; while let Some(req) = requests.next().await { accepts &= req.unwrap_or_default(); diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 55a4a8b73a..d48a696474 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -657,7 +657,7 @@ impl HttpOutcomeProducer { let upstream_relay = self.upstream_relay.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { match upstream_relay.send(SendQuery(request)).await { Ok(_) => relay_log::trace!("outcome batch sent"), Err(error) => { @@ -683,7 +683,7 @@ impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { // Prioritize flush over receiving messages to prevent starving. @@ -776,7 +776,7 @@ impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { // Prioritize flush over receiving messages to prevent starving. @@ -1037,7 +1037,7 @@ impl Service for OutcomeProducerService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let Self { config, inner } = self; - tokio::spawn(async move { + relay_system::spawn(async move { let broker = inner.start(); relay_log::info!("OutcomeProducer started."); diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 0a13cbf361..8728255d4a 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -139,7 +139,7 @@ impl Service for OutcomeAggregator { type Interface = TrackOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown = Controller::shutdown_handle(); relay_log::info!("outcome aggregator started"); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 99de389281..8006fdf691 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2814,7 +2814,7 @@ impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); self.inner diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 910606b265..35189de481 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -724,7 +724,7 @@ impl ProjectCacheBroker { let source = self.source.clone(); let sender = self.state_tx.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { // Wait on the new attempt time when set. if let Some(next_attempt) = next_attempt { tokio::time::sleep_until(next_attempt).await; @@ -1148,7 +1148,7 @@ impl Service for ProjectCacheService { let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); relay_log::info!("project cache started"); @@ -1429,7 +1429,7 @@ mod tests { } // Emulate the project cache service loop. - tokio::task::spawn(async move { + relay_system::spawn(async move { loop { select! { diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index 2307462b3c..e21b13130a 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -146,7 +146,7 @@ async fn spawn_poll_local_states( poll_local_states(&project_path, &tx).await; // Start a background loop that polls periodically: - tokio::spawn(async move { + relay_system::spawn(async move { // To avoid running two load tasks simultaneously at startup, we delay the interval by one period: let start_at = Instant::now() + period; let mut ticker = tokio::time::interval_at(start_at, period); @@ -166,7 +166,7 @@ impl Service for LocalProjectSourceService { // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("project local cache started"); // Start the background task that periodically reloads projects from disk: diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 668d3196ae..1f3d22ade3 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -481,7 +481,7 @@ impl UpstreamProjectSourceService { let channels = self.prepare_batches(); let upstream_relay = self.upstream_relay.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let responses = Self::fetch_states(config, upstream_relay, channels).await; // Send back all resolved responses and also unused channels. // These responses will be handled by `handle_responses` function. @@ -532,7 +532,7 @@ impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("project upstream cache started"); loop { tokio::select! { diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 922b1c7dda..5f2d2c37eb 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -238,7 +238,7 @@ impl RelayCacheService { let fetch_tx = self.fetch_tx(); let upstream_relay = self.upstream_relay.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let request = GetRelays { relay_ids: channels.keys().cloned().collect(), }; @@ -335,7 +335,7 @@ impl Service for RelayCacheService { type Interface = RelayCache; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("key cache started"); loop { diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index 32a7c532c1..b4ae639b3b 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -135,7 +135,7 @@ impl Service for HttpServer { relay_log::info!("spawning http server"); relay_log::info!(" listening on http://{}/", config.listen_addr()); relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1); - tokio::spawn(server.serve(app)); + relay_system::spawn(server.serve(app)); } Err(err) => { relay_log::error!("Failed to start the HTTP server: {err}"); @@ -143,7 +143,7 @@ impl Service for HttpServer { } } - tokio::spawn(async move { + relay_system::spawn(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; relay_log::info!("Shutting down HTTP server"); diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index bce1b33ad5..b99c178117 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1184,7 +1184,7 @@ impl BufferService { BufferState::Disk(ref disk) => { let db = disk.db.clone(); let project_cache = self.services.project_cache.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { match OnDisk::get_spooled_index(&db).await { Ok(index) => { relay_log::trace!( @@ -1255,7 +1255,7 @@ impl Service for BufferService { type Interface = Buffer; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { let mut shutdown = Controller::shutdown_handle(); loop { @@ -1574,7 +1574,7 @@ mod tests { type Interface = TestHealth; fn spawn_handler(self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)), diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a2a1a960d7..2babe2bc10 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -123,7 +123,7 @@ impl Service for RelayStats { return; }; - tokio::spawn(async move { + relay_system::spawn(async move { loop { let _ = tokio::join!( self.upstream_status(), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1e7d3ec2c4..c2cdf2b0d9 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1088,7 +1088,7 @@ impl Service for StoreService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let this = Arc::new(self); - tokio::spawn(async move { + relay_system::spawn(async move { relay_log::info!("store forwarder started"); while let Some(message) = rx.recv().await { diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index c61621fc0c..589c8c8c05 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -135,7 +135,7 @@ impl relay_system::Service for TestStoreService { type Interface = TestStore; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { + relay_system::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); } diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index b2e53e4b29..86efe021a7 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -1344,7 +1344,7 @@ impl ConnectionMonitor { // Only take action if we exceeded the grace period. if first_error + self.client.config.http_outage_grace_period() <= now { let return_tx = return_tx.clone(); - let task = tokio::spawn(Self::connect(self.client.clone(), return_tx)); + let task = relay_system::spawn(Self::connect(self.client.clone(), return_tx)); self.state = ConnectionState::Reconnecting(task); } } @@ -1429,7 +1429,7 @@ impl UpstreamBroker { let client = self.client.clone(); let action_tx = self.action_tx.clone(); - tokio::spawn(async move { + relay_system::spawn(async move { let send_start = Instant::now(); let result = client.send(entry.request.as_mut()).await; emit_response_metrics(send_start, &entry, &result); @@ -1512,7 +1512,7 @@ impl Service for UpstreamRelayService { state: AuthState::Unknown, tx: action_tx.clone(), }; - tokio::spawn(auth.run()); + relay_system::spawn(auth.run()); // Main broker that serializes public and internal messages, as well as maintains connection // and authentication state. @@ -1525,7 +1525,7 @@ impl Service for UpstreamRelayService { action_tx, }; - tokio::spawn(async move { + relay_system::spawn(async move { loop { tokio::select! { biased; diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 60c5c829a1..acec9d0d4e 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -130,7 +130,7 @@ impl ShutdownHandle { /// type Interface = (); /// /// fn spawn_handler(self, mut rx: relay_system::Receiver) { -/// tokio::spawn(async move { +/// relay_system::spawn(async move { /// let mut shutdown = Controller::shutdown_handle(); /// /// loop { @@ -166,8 +166,9 @@ pub struct Controller; impl Controller { /// Starts a controller that monitors shutdown signals. + #[track_caller] pub fn start(shutdown_timeout: Duration) { - tokio::spawn(monitor_shutdown(shutdown_timeout)); + crate::spawn(monitor_shutdown(shutdown_timeout)); } /// Manually initiates the shutdown process of the system. diff --git a/relay-system/src/runtime.rs b/relay-system/src/runtime.rs index d5c3f1041c..abfada72c7 100644 --- a/relay-system/src/runtime.rs +++ b/relay-system/src/runtime.rs @@ -10,6 +10,7 @@ use crate::statsd::{SystemCounters, SystemTimers}; /// /// This is in instrumented spawn variant of Tokio's [`tokio::spawn`]. #[track_caller] +#[allow(clippy::disallowed_methods)] pub fn spawn(future: F) -> JoinHandle where F: Future + Send + 'static, diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index a09d019e13..7d858618fa 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -932,7 +932,7 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// type Interface = MyMessage; /// /// fn spawn_handler(self, mut rx: Receiver) { -/// tokio::spawn(async move { +/// relay_system::spawn(async move { /// while let Some(message) = rx.recv().await { /// // handle the message /// } @@ -1047,7 +1047,7 @@ mod tests { type Interface = MockMessage; fn spawn_handler(self, mut rx: Receiver) { - tokio::spawn(async move { + crate::spawn(async move { while rx.recv().await.is_some() { tokio::time::sleep(BACKLOG_INTERVAL * 2).await; } diff --git a/relay-test/src/lib.rs b/relay-test/src/lib.rs index 138c76724d..6eebc52828 100644 --- a/relay-test/src/lib.rs +++ b/relay-test/src/lib.rs @@ -44,7 +44,7 @@ where { let (addr, mut rx) = channel(name); - let handle = tokio::spawn(async move { + let handle = relay_system::spawn(async move { while let Some(msg) = rx.recv().await { f(&mut state, msg); } From ab73c97b1a888e2e1870fed37fc91e216672fdaa Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 25 Jul 2024 11:59:04 +0200 Subject: [PATCH 3/3] switch to a macro --- Cargo.lock | 1 + relay-server/src/services/cogs.rs | 2 +- relay-server/src/services/global_config.rs | 4 +- relay-server/src/services/health_check.rs | 4 +- .../src/services/metrics/aggregator.rs | 4 +- relay-server/src/services/metrics/router.rs | 4 +- relay-server/src/services/outcome.rs | 8 +-- .../src/services/outcome_aggregator.rs | 2 +- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/project_cache.rs | 6 +-- relay-server/src/services/project_local.rs | 4 +- relay-server/src/services/project_upstream.rs | 4 +- relay-server/src/services/relays.rs | 4 +- relay-server/src/services/server.rs | 4 +- relay-server/src/services/spooler/mod.rs | 6 +-- relay-server/src/services/stats.rs | 2 +- relay-server/src/services/store.rs | 2 +- relay-server/src/services/test_store.rs | 2 +- relay-server/src/services/upstream.rs | 8 +-- relay-system/Cargo.toml | 1 + relay-system/src/controller.rs | 4 +- relay-system/src/runtime.rs | 49 ++++++++++++++++--- relay-system/src/service.rs | 4 +- relay-test/src/lib.rs | 2 +- 24 files changed, 84 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 205363ea1c..154013b224 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4150,6 +4150,7 @@ name = "relay-system" version = "24.7.0" dependencies = [ "futures", + "insta", "once_cell", "relay-log", "relay-statsd", diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index 453e3de0a3..e17f73c72c 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -106,7 +106,7 @@ impl Service for CogsService { type Interface = CogsReport; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut shutdown = Controller::shutdown_handle(); loop { diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index a194fc9b04..b4096da474 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -284,7 +284,7 @@ impl GlobalConfigService { let upstream_relay = self.upstream.clone(); let internal_tx = self.internal_tx.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { metric!(timer(RelayTimers::GlobalConfigRequestDuration), { let query = GetGlobalConfig::new(); let res = upstream_relay.send(SendQuery(query)).await; @@ -360,7 +360,7 @@ impl Service for GlobalConfigService { type Interface = GlobalConfigManager; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut shutdown_handle = Controller::shutdown_handle(); relay_log::info!("global config service starting"); diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index ce8c778cbf..e94983e303 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -198,7 +198,7 @@ impl Service for HealthCheckService { // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); - relay_system::spawn(async move { + relay_system::spawn!(async move { let shutdown = Controller::shutdown_handle(); while shutdown.get().is_none() { @@ -215,7 +215,7 @@ impl Service for HealthCheckService { update_tx.send(StatusUpdate::new(Status::Unhealthy)).ok(); }); - relay_system::spawn(async move { + relay_system::spawn!(async move { while let Some(HealthCheck(message, sender)) = rx.recv().await { let update = update_rx.borrow(); diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 378f1b04c3..c4d656379f 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -229,7 +229,7 @@ impl Service for AggregatorService { type Interface = Aggregator; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); let mut shutdown = Controller::shutdown_handle(); @@ -331,7 +331,7 @@ mod tests { type Interface = TestInterface; fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; relay_log::debug!(?buckets, "received buckets"); diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 13072c5c88..5f44913c94 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -44,7 +44,7 @@ impl Service for RouterService { type Interface = Aggregator; fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut router = StartedRouter::start(self); relay_log::info!("metrics router started"); @@ -117,7 +117,7 @@ impl StartedRouter { .chain(Some(self.default.send(AcceptsMetrics))) .collect::>(); - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut accepts = true; while let Some(req) = requests.next().await { accepts &= req.unwrap_or_default(); diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index d48a696474..53832f00b7 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -657,7 +657,7 @@ impl HttpOutcomeProducer { let upstream_relay = self.upstream_relay.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { match upstream_relay.send(SendQuery(request)).await { Ok(_) => relay_log::trace!("outcome batch sent"), Err(error) => { @@ -683,7 +683,7 @@ impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { loop { tokio::select! { // Prioritize flush over receiving messages to prevent starving. @@ -776,7 +776,7 @@ impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { loop { tokio::select! { // Prioritize flush over receiving messages to prevent starving. @@ -1037,7 +1037,7 @@ impl Service for OutcomeProducerService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let Self { config, inner } = self; - relay_system::spawn(async move { + relay_system::spawn!(async move { let broker = inner.start(); relay_log::info!("OutcomeProducer started."); diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 8728255d4a..91b7561e0c 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -139,7 +139,7 @@ impl Service for OutcomeAggregator { type Interface = TrackOutcome; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut shutdown = Controller::shutdown_handle(); relay_log::info!("outcome aggregator started"); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 8006fdf691..5f74372d64 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2814,7 +2814,7 @@ impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { while let Some(message) = rx.recv().await { let service = self.clone(); self.inner diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 35189de481..ffe897e795 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -724,7 +724,7 @@ impl ProjectCacheBroker { let source = self.source.clone(); let sender = self.state_tx.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { // Wait on the new attempt time when set. if let Some(next_attempt) = next_attempt { tokio::time::sleep_until(next_attempt).await; @@ -1148,7 +1148,7 @@ impl Service for ProjectCacheService { let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); relay_log::info!("project cache started"); @@ -1429,7 +1429,7 @@ mod tests { } // Emulate the project cache service loop. - relay_system::spawn(async move { + relay_system::spawn!(async move { loop { select! { diff --git a/relay-server/src/services/project_local.rs b/relay-server/src/services/project_local.rs index e21b13130a..9584de99ca 100644 --- a/relay-server/src/services/project_local.rs +++ b/relay-server/src/services/project_local.rs @@ -146,7 +146,7 @@ async fn spawn_poll_local_states( poll_local_states(&project_path, &tx).await; // Start a background loop that polls periodically: - relay_system::spawn(async move { + relay_system::spawn!(async move { // To avoid running two load tasks simultaneously at startup, we delay the interval by one period: let start_at = Instant::now() + period; let mut ticker = tokio::time::interval_at(start_at, period); @@ -166,7 +166,7 @@ impl Service for LocalProjectSourceService { // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); - relay_system::spawn(async move { + relay_system::spawn!(async move { relay_log::info!("project local cache started"); // Start the background task that periodically reloads projects from disk: diff --git a/relay-server/src/services/project_upstream.rs b/relay-server/src/services/project_upstream.rs index 1f3d22ade3..b5931317d6 100644 --- a/relay-server/src/services/project_upstream.rs +++ b/relay-server/src/services/project_upstream.rs @@ -481,7 +481,7 @@ impl UpstreamProjectSourceService { let channels = self.prepare_batches(); let upstream_relay = self.upstream_relay.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { let responses = Self::fetch_states(config, upstream_relay, channels).await; // Send back all resolved responses and also unused channels. // These responses will be handled by `handle_responses` function. @@ -532,7 +532,7 @@ impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { relay_log::info!("project upstream cache started"); loop { tokio::select! { diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 5f2d2c37eb..8d10a33b33 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -238,7 +238,7 @@ impl RelayCacheService { let fetch_tx = self.fetch_tx(); let upstream_relay = self.upstream_relay.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { let request = GetRelays { relay_ids: channels.keys().cloned().collect(), }; @@ -335,7 +335,7 @@ impl Service for RelayCacheService { type Interface = RelayCache; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { relay_log::info!("key cache started"); loop { diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index b4ae639b3b..56e1c822b3 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -135,7 +135,7 @@ impl Service for HttpServer { relay_log::info!("spawning http server"); relay_log::info!(" listening on http://{}/", config.listen_addr()); relay_statsd::metric!(counter(RelayCounters::ServerStarting) += 1); - relay_system::spawn(server.serve(app)); + relay_system::spawn!(server.serve(app)); } Err(err) => { relay_log::error!("Failed to start the HTTP server: {err}"); @@ -143,7 +143,7 @@ impl Service for HttpServer { } } - relay_system::spawn(async move { + relay_system::spawn!(async move { let Shutdown { timeout } = Controller::shutdown_handle().notified().await; relay_log::info!("Shutting down HTTP server"); diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index b99c178117..15994368ec 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -1184,7 +1184,7 @@ impl BufferService { BufferState::Disk(ref disk) => { let db = disk.db.clone(); let project_cache = self.services.project_cache.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { match OnDisk::get_spooled_index(&db).await { Ok(index) => { relay_log::trace!( @@ -1255,7 +1255,7 @@ impl Service for BufferService { type Interface = Buffer; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { let mut shutdown = Controller::shutdown_handle(); loop { @@ -1574,7 +1574,7 @@ mod tests { type Interface = TestHealth; fn spawn_handler(self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { loop { tokio::select! { Some(TestHealth::SpoolHealth(sender)) = rx.recv() => self.buffer.send(Health(sender)), diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index 2babe2bc10..de7d952481 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -123,7 +123,7 @@ impl Service for RelayStats { return; }; - relay_system::spawn(async move { + relay_system::spawn!(async move { loop { let _ = tokio::join!( self.upstream_status(), diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index c2cdf2b0d9..95348823d8 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1088,7 +1088,7 @@ impl Service for StoreService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let this = Arc::new(self); - relay_system::spawn(async move { + relay_system::spawn!(async move { relay_log::info!("store forwarder started"); while let Some(message) = rx.recv().await { diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index 589c8c8c05..f9a2cc9d12 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -135,7 +135,7 @@ impl relay_system::Service for TestStoreService { type Interface = TestStore; fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - relay_system::spawn(async move { + relay_system::spawn!(async move { while let Some(message) = rx.recv().await { self.handle_message(message); } diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index 86efe021a7..9fbee8c812 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -1344,7 +1344,7 @@ impl ConnectionMonitor { // Only take action if we exceeded the grace period. if first_error + self.client.config.http_outage_grace_period() <= now { let return_tx = return_tx.clone(); - let task = relay_system::spawn(Self::connect(self.client.clone(), return_tx)); + let task = relay_system::spawn!(Self::connect(self.client.clone(), return_tx)); self.state = ConnectionState::Reconnecting(task); } } @@ -1429,7 +1429,7 @@ impl UpstreamBroker { let client = self.client.clone(); let action_tx = self.action_tx.clone(); - relay_system::spawn(async move { + relay_system::spawn!(async move { let send_start = Instant::now(); let result = client.send(entry.request.as_mut()).await; emit_response_metrics(send_start, &entry, &result); @@ -1512,7 +1512,7 @@ impl Service for UpstreamRelayService { state: AuthState::Unknown, tx: action_tx.clone(), }; - relay_system::spawn(auth.run()); + relay_system::spawn!(auth.run()); // Main broker that serializes public and internal messages, as well as maintains connection // and authentication state. @@ -1525,7 +1525,7 @@ impl Service for UpstreamRelayService { action_tx, }; - relay_system::spawn(async move { + relay_system::spawn!(async move { loop { tokio::select! { biased; diff --git a/relay-system/Cargo.toml b/relay-system/Cargo.toml index d0024e8de1..79c02c83a7 100644 --- a/relay-system/Cargo.toml +++ b/relay-system/Cargo.toml @@ -20,5 +20,6 @@ relay-statsd = { workspace = true } tokio = { workspace = true, features = ["rt", "signal", "macros", "sync", "time"] } [dev-dependencies] +insta = { workspace = true } relay-statsd = { workspace = true, features = ["test"] } tokio = { workspace = true, features = ["test-util"] } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index acec9d0d4e..3997c8c286 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -130,7 +130,7 @@ impl ShutdownHandle { /// type Interface = (); /// /// fn spawn_handler(self, mut rx: relay_system::Receiver) { -/// relay_system::spawn(async move { +/// relay_system::spawn!(async move { /// let mut shutdown = Controller::shutdown_handle(); /// /// loop { @@ -168,7 +168,7 @@ impl Controller { /// Starts a controller that monitors shutdown signals. #[track_caller] pub fn start(shutdown_timeout: Duration) { - crate::spawn(monitor_shutdown(shutdown_timeout)); + crate::spawn!(monitor_shutdown(shutdown_timeout)); } /// Manually initiates the shutdown process of the system. diff --git a/relay-system/src/runtime.rs b/relay-system/src/runtime.rs index abfada72c7..c310541904 100644 --- a/relay-system/src/runtime.rs +++ b/relay-system/src/runtime.rs @@ -1,4 +1,3 @@ -use std::panic::Location; use std::time::Instant; use futures::Future; @@ -9,19 +8,28 @@ use crate::statsd::{SystemCounters, SystemTimers}; /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it. /// /// This is in instrumented spawn variant of Tokio's [`tokio::spawn`]. -#[track_caller] +#[macro_export] +macro_rules! spawn { + ($future:expr) => {{ + static _TASK_ID: ::std::sync::OnceLock = ::std::sync::OnceLock::new(); + let task_id = _TASK_ID.get_or_init(|| { + let location = ::std::panic::Location::caller(); + format!("{}:{}", location.file(), location.line()) + }); + $crate::_spawn_inner(task_id, $future) + }}; +} + +#[doc(hidden)] #[allow(clippy::disallowed_methods)] -pub fn spawn(future: F) -> JoinHandle +pub fn _spawn_inner(task_id: &'static str, future: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { - let location = Location::caller(); - let task_id = format!("{}:{}", location.file(), location.line()); - relay_statsd::metric!( counter(SystemCounters::RuntimeTasksCreated) += 1, - id = &task_id + id = task_id ); let created = Instant::now(); @@ -30,9 +38,34 @@ where relay_statsd::metric!( timer(SystemTimers::RuntimeTasksFinished) = created.elapsed(), - id = &task_id + id = task_id ); result }) } + +#[cfg(test)] +mod tests { + use insta::assert_debug_snapshot; + + #[test] + fn test_spawn_spawns_a_future() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + let captures = relay_statsd::with_capturing_test_client(|| { + rt.block_on(async { + let _ = crate::spawn!(async {}).await; + }) + }); + + assert_debug_snapshot!(captures, @r###" + [ + "runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:60", + "runtime.task.spawn.finished:0|ms|#id:relay-system/src/runtime.rs:60", + ] + "###); + } +} diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index 7d858618fa..388a07459f 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -932,7 +932,7 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// type Interface = MyMessage; /// /// fn spawn_handler(self, mut rx: Receiver) { -/// relay_system::spawn(async move { +/// relay_system::spawn!(async move { /// while let Some(message) = rx.recv().await { /// // handle the message /// } @@ -1047,7 +1047,7 @@ mod tests { type Interface = MockMessage; fn spawn_handler(self, mut rx: Receiver) { - crate::spawn(async move { + crate::spawn!(async move { while rx.recv().await.is_some() { tokio::time::sleep(BACKLOG_INTERVAL * 2).await; } diff --git a/relay-test/src/lib.rs b/relay-test/src/lib.rs index 6eebc52828..7466245e96 100644 --- a/relay-test/src/lib.rs +++ b/relay-test/src/lib.rs @@ -44,7 +44,7 @@ where { let (addr, mut rx) = channel(name); - let handle = relay_system::spawn(async move { + let handle = relay_system::spawn!(async move { while let Some(msg) = rx.recv().await { f(&mut state, msg); }