From d2bfb85476f039e290365218769678ba736f8768 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 12 Sep 2024 22:50:51 -0700 Subject: [PATCH] chore: log reservation pool at regular intervals --- datafusion/execution/Cargo.toml | 1 + datafusion/execution/src/memory_pool/pool.rs | 55 +++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index fb2e7e914fe5..3aea88ed88af 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -48,4 +48,5 @@ object_store = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true } url = { workspace = true } diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index cf5f72656859..fa91d7c7e4c7 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -23,7 +23,10 @@ use log::debug; use parking_lot::Mutex; use std::{ num::NonZeroUsize, - sync::atomic::{AtomicU64, AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, }; /// A [`MemoryPool`] that enforces no limit @@ -262,7 +265,7 @@ fn insufficient_capacity_err( pub struct TrackConsumersPool { inner: I, top: NonZeroUsize, - tracked_consumers: Mutex>, + tracked_consumers: Arc>>, } impl TrackConsumersPool { @@ -271,10 +274,30 @@ impl TrackConsumersPool { /// The `top` determines how many Top K [`MemoryConsumer`]s to include /// in the reported [`DataFusionError::ResourcesExhausted`]. pub fn new(inner: I, top: NonZeroUsize) -> Self { + let tracked_consumers = Default::default(); + + let _captured: Arc< + parking_lot::lock_api::Mutex< + parking_lot::RawMutex, + HashMap, + >, + > = Arc::clone(&tracked_consumers); + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + println!( + "REPORT: {}, {}", + Utc::now(), + Self::_report_top(5, Arc::clone(&_captured)) + ); + } + }); + Self { inner, top, - tracked_consumers: Default::default(), + tracked_consumers, } } @@ -282,17 +305,21 @@ impl TrackConsumersPool { /// which have the same name. /// /// This is very tied to the implementation of the memory consumer. - fn has_multiple_consumers(&self, name: &String) -> bool { + fn has_multiple_consumers( + name: &String, + tracked_consumers: &Arc>>, + ) -> bool { let consumer = MemoryConsumer::new(name); let consumer_with_spill = consumer.clone().with_can_spill(true); - let guard = self.tracked_consumers.lock(); + let guard = tracked_consumers.lock(); guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) } - /// The top consumers in a report string. - pub fn report_top(&self, top: usize) -> String { - let mut consumers = self - .tracked_consumers + fn _report_top( + top: usize, + tracked_consumers: Arc>>, + ) -> String { + let mut consumers = tracked_consumers .lock() .iter() .map(|(consumer, reserved)| { @@ -308,7 +335,7 @@ impl TrackConsumersPool { consumers[0..std::cmp::min(top, consumers.len())] .iter() .map(|((name, can_spill), size)| { - if self.has_multiple_consumers(name) { + if Self::has_multiple_consumers(name, &tracked_consumers) { format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size) } else { format!("{name} consumed {:?} bytes", size) @@ -317,6 +344,11 @@ impl TrackConsumersPool { .collect::>() .join(", ") } + + /// The top consumers in a report string. + pub fn report_top(&self, top: usize) -> String { + Self::_report_top(top, Arc::clone(&self.tracked_consumers)) + } } impl MemoryPool for TrackConsumersPool { @@ -348,8 +380,6 @@ impl MemoryPool for TrackConsumersPool { .and_modify(|bytes| { bytes.fetch_add(additional as u64, Ordering::AcqRel); }); - - println!("REPORT: {}, {}", Utc::now(), self.report_top(5)); } fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { @@ -384,7 +414,6 @@ impl MemoryPool for TrackConsumersPool { .and_modify(|bytes| { bytes.fetch_add(additional as u64, Ordering::AcqRel); }); - println!("REPORT: {}, {}", Utc::now(), self.report_top(5)); Ok(()) }