Skip to content

Commit

Permalink
add tokio_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mpurins-coralogix committed Sep 21, 2023
1 parent 63adfb6 commit ace60bb
Show file tree
Hide file tree
Showing 9 changed files with 14,949 additions and 109 deletions.
201 changes: 107 additions & 94 deletions datafusion-cli/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
zstd = { version = "0.12", optional = true, default-features = false }

tokio-metrics = "0.3.0"

[dev-dependencies]
async-trait = "0.1.53"
Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ fn build_file_list_recurse(
pub(crate) fn spawn_buffered(
mut input: SendableRecordBatchStream,
buffer: usize,
monitor: Option<Arc<tokio_metrics::TaskMonitor>>,
) -> SendableRecordBatchStream {
// Use tokio only if running from a tokio context (#2201)
if tokio::runtime::Handle::try_current().is_err() {
Expand All @@ -108,13 +109,19 @@ pub(crate) fn spawn_buffered(

let sender = builder.tx();

builder.spawn(async move {
let task = async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
return;
}
}
});
};

if let Some(monitor) = monitor {
builder.spawn(monitor.instrument(task));
} else {
builder.spawn(task);
}

builder.build()
}
Expand Down
25 changes: 17 additions & 8 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ impl ExecutionPlan for RepartitionExec {
self.name(),
partition
);

let monitor = context
.session_config()
.get_extension::<tokio_metrics::TaskMonitor>();
// lock mutexes
let mut state = self.state.lock();

Expand Down Expand Up @@ -522,15 +526,20 @@ impl ExecutionPlan for RepartitionExec {

let r_metrics = RepartitionMetrics::new(i, partition, &self.metrics);

let task = Self::pull_from_input(
self.input.clone(),
i,
txs.clone(),
self.partitioning.clone(),
r_metrics,
context.clone(),
);
let input_task: JoinHandle<Result<()>> =
tokio::spawn(Self::pull_from_input(
self.input.clone(),
i,
txs.clone(),
self.partitioning.clone(),
r_metrics,
context.clone(),
));
if let Some(monitor) = monitor.as_deref() {
tokio::spawn(monitor.instrument(task))
} else {
tokio::spawn(task)
};

// In a separate task, wait for each input to be done
// (and pass along any errors, including panic!s)
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ impl ExternalSorter {
.into_iter()
.map(|batch| {
let metrics = self.metrics.baseline.intermediate();
Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
Ok(spawn_buffered(
self.sort_batch_stream(batch, metrics)?,
1,
None,
))
})
.collect::<Result<_>>()?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion_physical_expr::{
};

use log::{debug, trace};
use tokio_metrics::TaskMonitor;

/// Sort preserving merge execution plan
///
Expand Down Expand Up @@ -193,6 +194,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
"Start SortPreservingMergeExec::execute for partition: {}",
partition
);
let monitor = context.session_config().get_extension::<TaskMonitor>();
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"SortPreservingMergeExec invalid partition {partition}"
Expand Down Expand Up @@ -221,7 +223,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
let receivers = (0..input_partitions)
.map(|partition| {
let stream = self.input.execute(partition, context.clone())?;
Ok(spawn_buffered(stream, 1))
Ok(spawn_buffered(stream, 1, monitor.clone()))
})
.collect::<Result<_>>()?;

Expand Down
14 changes: 12 additions & 2 deletions datafusion/core/src/physical_plan/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ impl RecordBatchReceiverStreamBuilder {
) {
let output = self.tx();

self.spawn(async move {
let monitor = context
.session_config()
.get_extension::<tokio_metrics::TaskMonitor>();

let task = async move {
let mut stream = match input.execute(partition, context) {
Err(e) => {
// If send fails, the plan being torn down, there
Expand Down Expand Up @@ -146,7 +150,13 @@ impl RecordBatchReceiverStreamBuilder {
return;
}
}
});
};

if let Some(monitor) = monitor {
self.spawn(monitor.instrument(task));
} else {
self.spawn(task);
}
}

/// Create a stream of all `RecordBatch`es written to `tx`
Expand Down
Loading

0 comments on commit ace60bb

Please sign in to comment.