From 25f1d464429fc6bf5ee54ce3b82bbf286bed95ef Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 20 Aug 2024 11:59:20 -0700 Subject: [PATCH] test: demonstrate that the unreachable in SpawnedTask is reachable --- datafusion/common-runtime/Cargo.toml | 3 ++ datafusion/common-runtime/src/common.rs | 53 +++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/datafusion/common-runtime/Cargo.toml b/datafusion/common-runtime/Cargo.toml index c10436087675..b417d29df283 100644 --- a/datafusion/common-runtime/Cargo.toml +++ b/datafusion/common-runtime/Cargo.toml @@ -37,3 +37,6 @@ path = "src/lib.rs" [dependencies] tokio = { workspace = true } + +[dev-dependencies] +tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] } diff --git a/datafusion/common-runtime/src/common.rs b/datafusion/common-runtime/src/common.rs index 2f7ddb972f42..9abf1b577600 100644 --- a/datafusion/common-runtime/src/common.rs +++ b/datafusion/common-runtime/src/common.rs @@ -75,3 +75,56 @@ impl SpawnedTask { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + use std::{ + future::{pending, Pending}, + sync::{Arc, Mutex}, + }; + + use tokio::runtime::Runtime; + + #[tokio::test] + #[should_panic( + expected = "entered unreachable code: SpawnedTask was cancelled unexpectedly" + )] + async fn runtime_shutdown() { + // capture the panic message + let panic_msg = Arc::new(Mutex::new(None)); + let captured_panic_msg = Arc::clone(&panic_msg); + std::panic::set_hook(Box::new(move |e| { + let mut guard = captured_panic_msg.lock().unwrap(); + *guard = Some(e.to_string()); + })); + + for _ in 0..30 { + let rt = Runtime::new().unwrap(); + let join = rt.spawn(async { + let task = SpawnedTask::spawn(async { + let fut: Pending<()> = pending(); + fut.await; + unreachable!("should never return"); + }); + task.join_unwind().await; + }); + + // caller shutdown their DF runtime (e.g. timeout, error in caller, etc) + rt.shutdown_background(); + + // race condition + // poll occurs during shutdown (buffered stream poll calls, etc) + let _ = join.await; + } + + // demonstrate that we hit the unreachable code + let maybe_panic = panic_msg.lock().unwrap().clone(); + assert_eq!( + maybe_panic, None, + "should not have rt thread panic, instead found {:?}", + maybe_panic + ); + } +}