Skip to content

Commit

Permalink
parquet: Add finer metrics on operations covered by `time_elapsed_ope…
Browse files Browse the repository at this point in the history
…ning` (#12585)

* Rename 'pushdown_eval_time' to 'row_pushdown_eval_time'

As it does not measure evaluation of row group-level pushdown filters

* Add statistics_eval_time and bloom_filter_eval_time metrics

* Add metadata_load_time metric

* Add docs

* Move timers to the callees
  • Loading branch information
progval committed Sep 25, 2024
1 parent 6a2d88d commit b821929
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
30 changes: 25 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ pub struct ParquetFileMetrics {
pub pushdown_rows_pruned: Count,
/// Total rows passed predicates pushed into parquet scan
pub pushdown_rows_matched: Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: Time,
/// Total time spent evaluating row-level pushdown filters
pub row_pushdown_eval_time: Time,
/// Total time spent evaluating row group-level statistics filters
pub statistics_eval_time: Time,
/// Total time spent evaluating row group Bloom Filters
pub bloom_filter_eval_time: Time,
/// Total rows filtered out by parquet page index
pub page_index_rows_pruned: Count,
/// Total rows passed through the parquet page index
pub page_index_rows_matched: Count,
/// Total time spent evaluating parquet page index filters
pub page_index_eval_time: Time,
/// Total time spent reading and parsing metadata from the footer
pub metadata_load_time: Time,
}

impl ParquetFileMetrics {
Expand Down Expand Up @@ -91,9 +97,16 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_matched", partition);

let pushdown_eval_time = MetricBuilder::new(metrics)
let row_pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);
.subset_time("row_pushdown_eval_time", partition);
let statistics_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("statistics_eval_time", partition);
let bloom_filter_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("bloom_filter_eval_time", partition);

let page_index_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_pruned", partition);
Expand All @@ -105,6 +118,10 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.subset_time("page_index_eval_time", partition);

let metadata_load_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("metadata_load_time", partition);

Self {
predicate_evaluation_errors,
row_groups_matched_bloom_filter,
Expand All @@ -114,10 +131,13 @@ impl ParquetFileMetrics {
bytes_scanned,
pushdown_rows_pruned,
pushdown_rows_matched,
pushdown_eval_time,
row_pushdown_eval_time,
page_index_rows_pruned,
page_index_rows_matched,
statistics_eval_time,
bloom_filter_eval_time,
page_index_eval_time,
metadata_load_time,
}
}
}
12 changes: 10 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,8 +1873,16 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
assert!(
get_value(&metrics, "pushdown_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
get_value(&metrics, "row_pushdown_eval_time") > 0,
"no pushdown eval time in metrics: {metrics:#?}"
);
assert!(
get_value(&metrics, "statistics_eval_time") > 0,
"no statistics eval time in metrics: {metrics:#?}"
);
assert!(
get_value(&metrics, "bloom_filter_eval_time") > 0,
"no Bloom Filter eval time in metrics: {metrics:#?}"
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl FileOpener for ParquetOpener {
Ok(Box::pin(async move {
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);

let mut metadata_timer = file_metrics.metadata_load_time.timer();
let metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
let mut schema = metadata.schema().clone();
Expand All @@ -133,6 +134,8 @@ impl FileOpener for ParquetOpener {
let metadata =
ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?;

metadata_timer.stop();

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ pub fn build_row_filter(
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.pushdown_eval_time;
let time = &file_metrics.row_pushdown_eval_time;

// Split into conjuncts:
// `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ impl RowGroupAccessPlanFilter {
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) {
// scoped timer updates on drop
let _timer_guard = metrics.statistics_eval_time.timer();

assert_eq!(groups.len(), self.access_plan.len());
// Indexes of row groups still to scan
let row_group_indexes = self.access_plan.row_group_indexes();
Expand Down Expand Up @@ -158,6 +161,9 @@ impl RowGroupAccessPlanFilter {
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) {
// scoped timer updates on drop
let _timer_guard = metrics.bloom_filter_eval_time.timer();

assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len());
for idx in 0..self.access_plan.len() {
if !self.access_plan.should_scan(idx) {
Expand Down
4 changes: 3 additions & 1 deletion docs/source/user-guide/explain-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ When predicate pushdown is enabled, `ParquetExec` gains the following metrics:
- `pushdown_rows_pruned`: rows that were tested by any of the above filtered, and did not pass one of them (this should be sum of `page_index_rows_matched`, `row_groups_pruned_bloom_filter`, and `row_groups_pruned_statistics`)
- `predicate_evaluation_errors`: number of times evaluating the filter expression failed (expected to be zero in normal operation)
- `num_predicate_creation_errors`: number of errors creating predicates (expected to be zero in normal operation)
- `pushdown_eval_time`: time spent evaluating these filters
- `bloom_filter_eval_time`: time spent parsing and evaluating Bloom Filters
- `statistics_eval_time`: time spent parsing and evaluating row group-level statistics
- `row_pushdown_eval_time`: time spent evaluating row-level filters
- `page_index_eval_time`: time required to evaluate the page index filters

## Partitions and Execution
Expand Down

0 comments on commit b821929

Please sign in to comment.