Skip to content

Commit

Permalink
feat: Add bloom filter metric to ParquetExec (#8772)
Browse files Browse the repository at this point in the history
* sbbf metric for parquet.

* fix tests.

* integration-test

* fix clippy & tests

* fix clippy.

* add more comments

* rename make_int32_range

* update metric name.
  • Loading branch information
my-vegetable-has-exploded committed Jan 9, 2024
1 parent 72cfc80 commit b3e17e7
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 25 deletions.
17 changes: 12 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use crate::physical_plan::metrics::{
pub struct ParquetFileMetrics {
/// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: Count,
/// Number of row groups pruned using
pub row_groups_pruned: Count,
/// Number of row groups pruned by bloom filters
pub row_groups_pruned_bloom_filter: Count,
/// Number of row groups pruned by statistics
pub row_groups_pruned_statistics: Count,
/// Total number of bytes scanned
pub bytes_scanned: Count,
/// Total rows filtered out by predicates pushed into parquet scan
Expand All @@ -54,9 +56,13 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("predicate_evaluation_errors", partition);

let row_groups_pruned = MetricBuilder::new(metrics)
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("row_groups_pruned", partition);
.counter("row_groups_pruned_bloom_filter", partition);

let row_groups_pruned_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("row_groups_pruned_statistics", partition);

let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
Expand All @@ -79,7 +85,8 @@ impl ParquetFileMetrics {

Self {
predicate_evaluation_errors,
row_groups_pruned,
row_groups_pruned_bloom_filter,
row_groups_pruned_statistics,
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use datafusion_physical_expr::{
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use object_store::path::Path;
use object_store::ObjectStore;
Expand Down Expand Up @@ -278,7 +279,17 @@ impl DisplayAs for ParquetExec {
let pruning_predicate_string = self
.pruning_predicate
.as_ref()
.map(|pre| format!(", pruning_predicate={}", pre.predicate_expr()))
.map(|pre| {
format!(
", pruning_predicate={}, required_guarantees=[{}]",
pre.predicate_expr(),
pre.literal_guarantees()
.iter()
.map(|item| format!("{}", item))
.collect_vec()
.join(", ")
)
})
.unwrap_or_default();

write!(f, "ParquetExec: ")?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
Ok(values) => {
// NB: false means don't scan row group
if !values[0] {
metrics.row_groups_pruned.add(1);
metrics.row_groups_pruned_statistics.add(1);
continue;
}
}
Expand Down Expand Up @@ -159,7 +159,7 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
};

if prune_group {
metrics.row_groups_pruned.add(1);
metrics.row_groups_pruned_bloom_filter.add(1);
} else {
filtered.push(*idx);
}
Expand Down Expand Up @@ -1049,12 +1049,9 @@ mod tests {
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);

let expr = col(r#""String""#).in_list(
vec![
lit("Hello_Not_Exists"),
lit("Hello_Not_Exists2"),
lit("Hello_Not_Exists3"),
lit("Hello_Not_Exist4"),
],
(1..25)
.map(|i| lit(format!("Hello_Not_Exists{}", i)))
.collect::<Vec<_>>(),
false,
);
let expr = logical2physical(&expr, &schema);
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ impl PruningPredicate {
&self.predicate_expr
}

/// Returns a reference to the literal guarantees
pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
&self.literal_guarantees
}

/// Returns true if this pruning predicate can not prune anything.
///
/// This happens if the predicate is a literal `true` and
Expand Down
36 changes: 32 additions & 4 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ enum Scenario {
Timestamps,
Dates,
Int32,
Int32Range,
Float64,
Decimal,
DecimalLargePrecision,
Expand Down Expand Up @@ -113,12 +114,24 @@ impl TestOutput {
self.metric_value("predicate_evaluation_errors")
}

/// The number of times the pruning predicate evaluation errors
/// The number of row_groups pruned by bloom filter
fn row_groups_pruned_bloom_filter(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_bloom_filter")
}

/// The number of row_groups pruned by statistics
fn row_groups_pruned_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_statistics")
}

/// The number of row_groups pruned
fn row_groups_pruned(&self) -> Option<usize> {
self.metric_value("row_groups_pruned")
self.row_groups_pruned_bloom_filter()
.zip(self.row_groups_pruned_statistics())
.map(|(a, b)| a + b)
}

/// The number of times the pruning predicate evaluation errors
/// The number of row pages pruned
fn row_pages_pruned(&self) -> Option<usize> {
self.metric_value("page_index_rows_filtered")
}
Expand All @@ -145,7 +158,11 @@ impl ContextWithParquet {
mut config: SessionConfig,
) -> Self {
let file = match unit {
Unit::RowGroup => make_test_file_rg(scenario).await,
Unit::RowGroup => {
let config = config.options_mut();
config.execution.parquet.bloom_filter_enabled = true;
make_test_file_rg(scenario).await
}
Unit::Page => {
let config = config.options_mut();
config.execution.parquet.enable_page_index = true;
Expand Down Expand Up @@ -360,6 +377,13 @@ fn make_int32_batch(start: i32, end: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_int32_range(start: i32, end: i32) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let v = vec![start, end];
let array = Arc::new(Int32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

/// Return record batch with f64 vector
///
/// Columns are named
Expand Down Expand Up @@ -508,6 +532,9 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_int32_batch(5, 10),
]
}
Scenario::Int32Range => {
vec![make_int32_range(0, 10), make_int32_range(200000, 300000)]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down Expand Up @@ -565,6 +592,7 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile {

let props = WriterProperties::builder()
.set_max_row_group_size(5)
.set_bloom_filter_enabled(true)
.build();

let batches = create_data_batch(scenario);
Expand Down
57 changes: 56 additions & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! expected.
use datafusion::prelude::SessionConfig;
use datafusion_common::ScalarValue;
use itertools::Itertools;

use crate::parquet::Unit::RowGroup;
use crate::parquet::{ContextWithParquet, Scenario};
Expand Down Expand Up @@ -48,6 +49,38 @@ async fn test_prune(
);
}

/// check row group pruning by bloom filter and statistics independently
async fn test_prune_verbose(
case_data_type: Scenario,
sql: &str,
expected_errors: Option<usize>,
expected_row_group_pruned_sbbf: Option<usize>,
expected_row_group_pruned_statistics: Option<usize>,
expected_results: usize,
) {
let output = ContextWithParquet::new(case_data_type, RowGroup)
.await
.query(sql)
.await;

println!("{}", output.description());
assert_eq!(output.predicate_evaluation_errors(), expected_errors);
assert_eq!(
output.row_groups_pruned_bloom_filter(),
expected_row_group_pruned_sbbf
);
assert_eq!(
output.row_groups_pruned_statistics(),
expected_row_group_pruned_statistics
);
assert_eq!(
output.result_rows,
expected_results,
"{}",
output.description()
);
}

#[tokio::test]
async fn prune_timestamps_nanos() {
test_prune(
Expand Down Expand Up @@ -336,16 +369,38 @@ async fn prune_int32_eq_in_list() {
#[tokio::test]
async fn prune_int32_eq_in_list_2() {
// result of sql "SELECT * FROM t where in (1000)", prune all
test_prune(
// test whether statistics works
test_prune_verbose(
Scenario::Int32,
"SELECT * FROM t where i in (1000)",
Some(0),
Some(0),
Some(4),
0,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq_large_in_list() {
// result of sql "SELECT * FROM t where i in (2050...2582)", prune all
// test whether sbbf works
test_prune_verbose(
Scenario::Int32Range,
format!(
"SELECT * FROM t where i in ({})",
(200050..200082).join(",")
)
.as_str(),
Some(0),
Some(1),
// we don't support pruning by statistics for in_list with more than 20 elements currently
Some(0),
0,
)
.await;
}

#[tokio::test]
async fn prune_int32_eq_in_list_negated() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ async fn parquet_explain_analyze() {

// should contain aggregated stats
assert_contains!(&formatted, "output_rows=8");
assert_contains!(&formatted, "row_groups_pruned=0");
assert_contains!(&formatted, "row_groups_pruned_bloom_filter=0");
assert_contains!(&formatted, "row_groups_pruned_statistics=0");
}

#[tokio::test]
Expand All @@ -754,7 +755,8 @@ async fn parquet_explain_analyze_verbose() {
.to_string();

// should contain the raw per file stats (with the label)
assert_contains!(&formatted, "row_groups_pruned{partition=0");
assert_contains!(&formatted, "row_groups_pruned_bloom_filter{partition=0");
assert_contains!(&formatted, "row_groups_pruned_statistics{partition=0");
}

#[tokio::test]
Expand Down
34 changes: 34 additions & 0 deletions datafusion/physical-expr/src/utils/guarantee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{split_conjunction, PhysicalExpr};
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::Operator;
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;

/// Represents a guarantee that must be true for a boolean expression to
Expand Down Expand Up @@ -222,6 +223,33 @@ impl LiteralGuarantee {
}
}

impl Display for LiteralGuarantee {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self.guarantee {
Guarantee::In => write!(
f,
"{} in ({})",
self.column.name,
self.literals
.iter()
.map(|lit| lit.to_string())
.collect::<Vec<_>>()
.join(", ")
),
Guarantee::NotIn => write!(
f,
"{} not in ({})",
self.column.name,
self.literals
.iter()
.map(|lit| lit.to_string())
.collect::<Vec<_>>()
.join(", ")
),
}
}
}

/// Combines conjuncts (aka terms `AND`ed together) into [`LiteralGuarantee`]s,
/// preserving insert order
#[derive(Debug, Default)]
Expand Down Expand Up @@ -398,6 +426,7 @@ mod test {
use datafusion_common::ToDFSchema;
use datafusion_expr::expr_fn::*;
use datafusion_expr::{lit, Expr};
use itertools::Itertools;
use std::sync::OnceLock;

#[test]
Expand Down Expand Up @@ -691,6 +720,11 @@ mod test {
col("b").in_list(vec![lit(1), lit(2), lit(3)], true),
vec![not_in_guarantee("b", [1, 2, 3])],
);
// b IN (1,2,3,4...24)
test_analyze(
col("b").in_list((1..25).map(lit).collect_vec(), false),
vec![in_guarantee("b", 1..25)],
);
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Filter: parquet_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]

# disable round robin repartitioning
statement ok
Expand All @@ -77,7 +77,7 @@ Filter: parquet_table.column1 != Int32(42)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]

# enable round robin repartitioning again
statement ok
Expand All @@ -102,7 +102,7 @@ SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--SortExec: expr=[column1@0 ASC NULLS LAST]
----CoalesceBatchesExec: target_batch_size=8192
------FilterExec: column1@0 != 42
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]


## Read the files as though they are ordered
Expand Down Expand Up @@ -138,7 +138,7 @@ physical_plan
SortPreservingMergeExec: [column1@0 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: column1@0 != 42
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1
------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..197], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..201], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:201..403], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:197..394]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1, required_guarantees=[column1 not in (42)]

# Cleanup
statement ok
Expand Down

0 comments on commit b3e17e7

Please sign in to comment.