Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unhandled hook to PruningPredicate #12606

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

adriangb
Copy link
Contributor

I have a secondary index with min/max stats columns that is compatible with PruningPredicate's rewrites.

I now want to add an index for point lookups (I plan on implementing it as a column with distinct array values, but that's a bit of an implementation detail).

The point is that when PruningPredicate encounters this column (for which there are no stats, and which it doesn't recognize because I only pass in Fields for which there are stats) it currently returns true such that a_column_with_stats = 123 and a_point_lookup_column = 'abc' becomes a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and true (ignoring nulls, maybe simplifying other bits) but I want it to become a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and a_point_lookup_column @> '{abc}'::text[] or something like that.

I don't think it's reasonable to add APIs to DataFusion for this specific case since it depends on implementation details outside of DataFusion's control, but I also can't easily work around it on my end (I'd have to re-implement all of PruningPredicate). So I'm hoping that adding this hook is acceptable 😄

@github-actions github-actions bot added the core Core DataFusion crate label Sep 24, 2024
@adriangb
Copy link
Contributor Author

cc @alamb would appreciate a review!

Comment on lines +483 to +487
pub trait UnhandledPredicateHook {
/// Called when a predicate can not be handled by DataFusion's transformation rules
/// or is referencing a column that is not in the schema.
fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a closure but I had issues with lifetimes, etc. Having the trait also gives it a useful name 😄

The other API questions are:

  • Should this be mutable? I think implementers can just use interior mutability if needed.
  • Should this make it easier to say "use the existing expression"? I don't think that's a common case, and the current APIs use &Arc<dyn PhysicalExpr> -> Arc<dyn PhysicalExpr> as well. Plus it's as easy as a Clone on an Arc.

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

I now want to add an index for point lookups (I plan on implementing it as a column with distinct array values, but that's a bit of an implementation detail).

The point is that when PruningPredicate encounters this column (for which there are no stats, and which it doesn't recognize because I only pass in Fields for which there are stats) it currently returns true such that a_column_with_stats = 123 and a_point_lookup_column = 'abc' becomes a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and true (ignoring nulls, maybe simplifying other bits) but I want it to become a_column_with_stats_min <= 123 and a_column_with_stats_max >= 123 and a_point_lookup_column @> '{abc}'::text[] or something like that.

Perhaps you can rewrite the predicate before passing it to the parquet exec or the PruningPredicate? I don't fully understand what a_point_lookup_column @> '{abc}'::text[] means but it seems like you could easily do that rewrite / substitution before PruningPredicate.

I don't understand the benefit that is obtained by doing the rewrite during the pruning predicate rewrite 🤔

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

It might also be good to look at https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html#method.literal_guarantees which you might be able to use to apply you index

@adriangb
Copy link
Contributor Author

The issue is that PruningPredicate discards (by returning true) any predicates it doesn't itself know how to rewrite. So if I do the rewrite before calling PruningPredicate then that rewrite is lost.

@adriangb
Copy link
Contributor Author

I don't fully understand what a_point_lookup_column @> '{abc}'::text[]

Basically I want to take the predicate a_point_lookup_column = 'abc' and transform that into a filter in my index.
I've chosen to store this is a_point_lookup_column_distinct text[] for the case of a UTF8 column called a_point_lookup_column. This column is then stored alongside other stats columns so you end up with something like (file_path text, row_group int, a_stats_column_min double, a_stats_column_max double, a_stats_column_null_count int, a_point_lookup_column_distinct text[]).

@adriangb
Copy link
Contributor Author

It might also be good to look at https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html#method.literal_guarantees which you might be able to use to apply you index

I admit I'm still a bit confused about LiteralGuarantees but it seems to me that to use them I'd have to have all of the data in memory.
The whole point of this is that I can store a relatively large amount of data (say 1M 16 character strings) and rip through filtering them by letting the system storing them (in my case a Postgres database with a GIN index on the array column) give me back just the row groups that matched the predicate, without ever moving all of that data over the wire to build LiteralGuarantees from it.

@adriangb
Copy link
Contributor Author

adriangb commented Sep 26, 2024

I'll add that I've been using this (as in this change + an actual implementation that uses it) in production for a couple days now and it works amazingly. It's taken some queries from >3s to <1s (from downloading all of a column for all of time to a <100ms lookup in a Postgres index).

@alamb
Copy link
Contributor

alamb commented Sep 27, 2024

Basically I want to take the predicate a_point_lookup_column = 'abc' and transform that into a filter in my index.
I've chosen to store this is a_point_lookup_column_distinct text[] for the case of a UTF8 column called a_point_lookup_column. This column is then stored alongside other stats columns so you end up with something like (file_path text, row_group int, a_stats_column_min double, a_stats_column_max double, a_stats_column_null_count int, a_point_lookup_column_distinct text[]).

I see -- what I am not understanding is why you need to do this rewrite as part of the PruningPredicate logic (which is already complicated). WHy can't you do the rewrite/transformation before passing the predicate to PruningPredicate ?

@adriangb
Copy link
Contributor Author

adriangb commented Sep 27, 2024

Here's an example:

use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::{common::DFSchema, physical_optimizer::pruning::PruningPredicate, prelude::*};

fn main() {
    let ctx = SessionContext::new();
    let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, true)]));
    let df_schema = DFSchema::try_from(schema.clone()).unwrap();

    // An expression that PruningPredicate doesn't understand becomes `true`
    let expr = ctx.parse_sql_expr("col = ANY([1, 2])", &df_schema).unwrap();
    println!("expr: {:?}", expr);
    let phys_expr = ctx.create_physical_expr(expr, &df_schema).unwrap();
    println!("phys_expr: {:?}", phys_expr);
    let pruning = PruningPredicate::try_new(phys_expr, schema.clone()).unwrap();
    let pruning_expr = pruning.predicate_expr().clone();
    println!("pruning_expr: {:?}", pruning_expr);
    // pruning_expr: Literal { value: Boolean(true) }

    // An expression referencing columns that don't have statistics collected (i.e. aren't int the schema)
    // causes an Err
    let expr = ctx.parse_sql_expr("other = 1", &df_schema).unwrap();
    println!("expr: {:?}", expr);
    let phys_expr = ctx.create_physical_expr(expr, &df_schema).unwrap();
    println!("phys_expr: {:?}", phys_expr);
    PruningPredicate::try_new(phys_expr, schema.clone()).unwrap();
    // SchemaError(FieldNotFound { field: Column { relation: None, name: "other" }, valid_fields: [Column { relation: None, name: "col" }] }, Some(""))
}

If I do the rewrite before PruningPredicate then I end up with just true.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants