Skip to content

Commit

Permalink
Parsing SQL strings to Exprs with the qualified schema (#11562)
Browse files Browse the repository at this point in the history
* Parsing SQL strings to Exprs wtih the qualified schema

* refactor code
  • Loading branch information
Lordworms committed Jul 25, 2024
1 parent 886e8ac commit c951824
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 36 deletions.
16 changes: 15 additions & 1 deletion datafusion/core/tests/expr_api/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::{CsvReadOptions, SessionContext};
use datafusion_common::DFSchema;
use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
use datafusion_expr::col;
use datafusion_expr::lit;
use datafusion_expr::Expr;
use datafusion_sql::unparser::Unparser;

/// A schema like:
///
/// a: Int32 (possibly with nulls)
Expand Down Expand Up @@ -85,6 +87,18 @@ async fn round_trip_dataframe(sql: &str) -> Result<()> {
Ok(())
}

#[tokio::test]
async fn roundtrip_qualified_schema() -> Result<()> {
let sql = "a < 5 OR a = 8";
let expr = col("t.a").lt(lit(5_i64)).or(col("t.a").eq(lit(8_i64)));
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let df_schema = DFSchema::try_from_qualified_schema("t", &schema).unwrap();
let ctx = SessionContext::new();
let parsed_expr = ctx.parse_sql_expr(sql, &df_schema)?;
assert_eq!(parsed_expr, expr);
Ok(())
}

fn unparse_sql_expr(expr: &Expr) -> Result<String> {
let unparser = Unparser::default();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn distribute_by() -> Result<()> {
// regression test for https://github.com/apache/datafusion/issues/3234
let sql = "SELECT col_int32, col_utf8 FROM test DISTRIBUTE BY (col_utf8)";
let plan = test_sql(sql)?;
let expected = "Repartition: DistributeBy(col_utf8)\
let expected = "Repartition: DistributeBy(test.col_utf8)\
\n TableScan: test projection=[col_int32, col_utf8]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
Expand Down
60 changes: 28 additions & 32 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion_expr::planner::PlannerResult;
use datafusion_expr::{Case, Expr};

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_expr::UNNAMED_TABLE;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_identifier_to_expr(
Expand All @@ -50,40 +51,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// compound identifiers, but this is not a compound
// identifier. (e.g. it is "foo.bar" not foo.bar)
let normalize_ident = self.normalizer.normalize(id);
match schema.field_with_unqualified_name(normalize_ident.as_str()) {
Ok(_) => {
// found a match without a qualified name, this is a inner table column
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
}))
}
Err(_) => {
// check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
match outer.qualified_field_with_unqualified_name(
normalize_ident.as_str(),
) {
Ok((qualifier, field)) => {
// found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column::from((qualifier, field)),
))
}
Err(_) => Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
})),
}
} else {
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
}))
}

// Check for qualified field with unqualified name
if let Ok((qualifier, _)) =
schema.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
return Ok(Expr::Column(Column {
relation: qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned(),
name: normalize_ident,
}));
}

// Check the outer query schema
if let Some(outer) = planner_context.outer_query_schema() {
if let Ok((qualifier, field)) =
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
return Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column::from((qualifier, field)),
));
}
}

// Default case
Ok(Expr::Column(Column {
relation: None,
name: normalize_ident,
}))
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3274,7 +3274,7 @@ fn test_offset_before_limit() {
#[test]
fn test_distribute_by() {
let sql = "select id from person distribute by state";
let expected = "Repartition: DistributeBy(state)\
let expected = "Repartition: DistributeBy(person.state)\
\n Projection: person.id\
\n TableScan: person";
quick_test(sql, expected);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4077,7 +4077,7 @@ FROM (SELECT c, b, a, SUM(d) as sum1
DISTRIBUTE BY a
----
logical_plan
01)Repartition: DistributeBy(a)
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
Expand Down

0 comments on commit c951824

Please sign in to comment.