Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing SQL strings to Exprs with the qualified schema #11562

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion datafusion/core/tests/expr_api/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::{CsvReadOptions, SessionContext};
use datafusion_common::DFSchema;
use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
use datafusion_expr::col;
use datafusion_expr::lit;
use datafusion_expr::Expr;
use datafusion_sql::unparser::Unparser;

/// A schema like:
///
/// a: Int32 (possibly with nulls)
Expand Down Expand Up @@ -85,6 +87,18 @@ async fn round_trip_dataframe(sql: &str) -> Result<()> {
Ok(())
}

#[tokio::test]
async fn roundtrip_qualified_schema() -> Result<()> {
let sql = "a < 5 OR a = 8";
let expr = col("t.a").lt(lit(5_i64)).or(col("t.a").eq(lit(8_i64)));
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let df_schema = DFSchema::try_from_qualified_schema("t", &schema).unwrap();
let ctx = SessionContext::new();
let parsed_expr = ctx.parse_sql_expr(sql, &df_schema)?;
assert_eq!(parsed_expr, expr);
Ok(())
}

fn unparse_sql_expr(expr: &Expr) -> Result<String> {
let unparser = Unparser::default();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn distribute_by() -> Result<()> {
// regression test for https://github.com/apache/datafusion/issues/3234
let sql = "SELECT col_int32, col_utf8 FROM test DISTRIBUTE BY (col_utf8)";
let plan = test_sql(sql)?;
let expected = "Repartition: DistributeBy(col_utf8)\
let expected = "Repartition: DistributeBy(test.col_utf8)\
\n TableScan: test projection=[col_int32, col_utf8]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
Expand Down
60 changes: 28 additions & 32 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion_expr::planner::PlannerResult;
use datafusion_expr::{Case, Expr};

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_expr::UNNAMED_TABLE;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_identifier_to_expr(
Expand All @@ -50,40 +51,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// compound identifiers, but this is not a compound
// identifier. (e.g. it is "foo.bar" not foo.bar)
let normalize_ident = self.normalizer.normalize(id);
match schema.field_with_unqualified_name(normalize_ident.as_str()) {
Ok(_) => {
// found a match without a qualified name, this is a inner table column
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
}))
}
Err(_) => {
// check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
match outer.qualified_field_with_unqualified_name(
normalize_ident.as_str(),
) {
Ok((qualifier, field)) => {
// found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column::from((qualifier, field)),
))
}
Err(_) => Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
})),
}
} else {
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
}))
}

// Check for qualified field with unqualified name
if let Ok((qualifier, _)) =
schema.qualified_field_with_unqualified_name(normalize_ident.as_str())
Lordworms marked this conversation as resolved.
Show resolved Hide resolved
{
return Ok(Expr::Column(Column {
relation: qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned(),
name: normalize_ident,
}));
}

// Check the outer query schema
if let Some(outer) = planner_context.outer_query_schema() {
if let Ok((qualifier, field)) =
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
return Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column::from((qualifier, field)),
));
}
}

// Default case
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
}))
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3274,7 +3274,7 @@ fn test_offset_before_limit() {
#[test]
fn test_distribute_by() {
let sql = "select id from person distribute by state";
let expected = "Repartition: DistributeBy(state)\
let expected = "Repartition: DistributeBy(person.state)\
\n Projection: person.id\
\n TableScan: person";
quick_test(sql, expected);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4077,7 +4077,7 @@ FROM (SELECT c, b, a, SUM(d) as sum1
DISTRIBUTE BY a
----
logical_plan
01)Repartition: DistributeBy(a)
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
Expand Down