Skip to content

Commit

Permalink
refactor(11523): enable TrackConsumersPool to be used in runtime metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jul 30, 2024
1 parent c3ce60f commit c8c0196
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
}

/// The top consumers in a report string.
fn report_top(&self) -> String {
pub fn report_top(&self, top: usize) -> String {
let mut consumers = self
.tracked_consumers
.lock()
Expand All @@ -303,7 +303,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
.collect::<Vec<_>>();
consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering

consumers[0..std::cmp::min(self.top.into(), consumers.len())]
consumers[0..std::cmp::min(top, consumers.len())]
.iter()
.map(|((name, can_spill), size)| {
if self.has_multiple_consumers(name) {
Expand Down Expand Up @@ -367,7 +367,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
DataFusionError::ResourcesExhausted(
provide_top_memory_consumers_to_error_msg(
e.to_owned(),
self.report_top(),
self.report_top(self.top.into()),
),
)
}
Expand Down Expand Up @@ -652,4 +652,38 @@ mod tests {
));
test_per_pool_type(tracked_greedy_pool);
}

#[test]
fn test_tracked_consumers_pool_use_beyond_errors() {
let upcasted: Arc<dyn std::any::Any + Send + Sync> =
Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
.downcast::<TrackConsumersPool<GreedyMemoryPool>>()
.unwrap();
// set r1=20
let mut r1 = MemoryConsumer::new("r1").register(&pool);
r1.grow(20);
// set r2=15
let mut r2 = MemoryConsumer::new("r2").register(&pool);
r2.grow(15);
// set r3=45
let mut r3 = MemoryConsumer::new("r3").register(&pool);
r3.grow(45);

let downcasted = upcasted
.downcast::<TrackConsumersPool<GreedyMemoryPool>>()
.unwrap();

// Test: can get runtime metrics, even without an error thrown
let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes";
let res = downcasted.report_top(2);
assert_eq!(
res, expected,
"should provide list of top memory consumers, instead found {:?}",
res
);
}
}

0 comments on commit c8c0196

Please sign in to comment.