Skip to content

Commit

Permalink
chore: log reservation pool at regular intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Sep 13, 2024
1 parent 7b18895 commit d2bfb85
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ object_store = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
55 changes: 42 additions & 13 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use log::debug;
use parking_lot::Mutex;
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
};

/// A [`MemoryPool`] that enforces no limit
Expand Down Expand Up @@ -262,7 +265,7 @@ fn insufficient_capacity_err(
pub struct TrackConsumersPool<I> {
inner: I,
top: NonZeroUsize,
tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
tracked_consumers: Arc<Mutex<HashMap<MemoryConsumer, AtomicU64>>>,
}

impl<I: MemoryPool> TrackConsumersPool<I> {
Expand All @@ -271,28 +274,52 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
/// The `top` determines how many Top K [`MemoryConsumer`]s to include
/// in the reported [`DataFusionError::ResourcesExhausted`].
pub fn new(inner: I, top: NonZeroUsize) -> Self {
let tracked_consumers = Default::default();

let _captured: Arc<
parking_lot::lock_api::Mutex<
parking_lot::RawMutex,
HashMap<MemoryConsumer, AtomicU64>,
>,
> = Arc::clone(&tracked_consumers);
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

println!(
"REPORT: {}, {}",
Utc::now(),
Self::_report_top(5, Arc::clone(&_captured))
);
}
});

Self {
inner,
top,
tracked_consumers: Default::default(),
tracked_consumers,
}
}

/// Determine if there are multiple [`MemoryConsumer`]s registered
/// which have the same name.
///
/// This is very tied to the implementation of the memory consumer.
fn has_multiple_consumers(&self, name: &String) -> bool {
fn has_multiple_consumers(
name: &String,
tracked_consumers: &Arc<Mutex<HashMap<MemoryConsumer, AtomicU64>>>,
) -> bool {
let consumer = MemoryConsumer::new(name);
let consumer_with_spill = consumer.clone().with_can_spill(true);
let guard = self.tracked_consumers.lock();
let guard = tracked_consumers.lock();
guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill)
}

/// The top consumers in a report string.
pub fn report_top(&self, top: usize) -> String {
let mut consumers = self
.tracked_consumers
fn _report_top(
top: usize,
tracked_consumers: Arc<Mutex<HashMap<MemoryConsumer, AtomicU64>>>,
) -> String {
let mut consumers = tracked_consumers
.lock()
.iter()
.map(|(consumer, reserved)| {
Expand All @@ -308,7 +335,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
consumers[0..std::cmp::min(top, consumers.len())]
.iter()
.map(|((name, can_spill), size)| {
if self.has_multiple_consumers(name) {
if Self::has_multiple_consumers(name, &tracked_consumers) {
format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size)
} else {
format!("{name} consumed {:?} bytes", size)
Expand All @@ -317,6 +344,11 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
.collect::<Vec<_>>()
.join(", ")
}

/// The top consumers in a report string.
pub fn report_top(&self, top: usize) -> String {
Self::_report_top(top, Arc::clone(&self.tracked_consumers))
}
}

impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
Expand Down Expand Up @@ -348,8 +380,6 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
.and_modify(|bytes| {
bytes.fetch_add(additional as u64, Ordering::AcqRel);
});

println!("REPORT: {}, {}", Utc::now(), self.report_top(5));
}

fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
Expand Down Expand Up @@ -384,7 +414,6 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
.and_modify(|bytes| {
bytes.fetch_add(additional as u64, Ordering::AcqRel);
});
println!("REPORT: {}, {}", Utc::now(), self.report_top(5));

Ok(())
}
Expand Down

0 comments on commit d2bfb85

Please sign in to comment.