diff --git a/datafusion/core/tests/expr_api/parse_sql_expr.rs b/datafusion/core/tests/expr_api/parse_sql_expr.rs index 991579b5a350..a3defceee247 100644 --- a/datafusion/core/tests/expr_api/parse_sql_expr.rs +++ b/datafusion/core/tests/expr_api/parse_sql_expr.rs @@ -17,10 +17,12 @@ use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::{CsvReadOptions, SessionContext}; +use datafusion_common::DFSchema; use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; +use datafusion_expr::col; +use datafusion_expr::lit; use datafusion_expr::Expr; use datafusion_sql::unparser::Unparser; - /// A schema like: /// /// a: Int32 (possibly with nulls) @@ -85,6 +87,18 @@ async fn round_trip_dataframe(sql: &str) -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_qualified_schema() -> Result<()> { + let sql = "a < 5 OR a = 8"; + let expr = col("t.a").lt(lit(5_i64)).or(col("t.a").eq(lit(8_i64))); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let df_schema = DFSchema::try_from_qualified_schema("t", &schema).unwrap(); + let ctx = SessionContext::new(); + let parsed_expr = ctx.parse_sql_expr(sql, &df_schema)?; + assert_eq!(parsed_expr, expr); + Ok(()) +} + fn unparse_sql_expr(expr: &Expr) -> Result { let unparser = Unparser::default(); diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index c0863839dba1..3c77ffaa17f6 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -109,7 +109,7 @@ fn distribute_by() -> Result<()> { // regression test for https://github.com/apache/datafusion/issues/3234 let sql = "SELECT col_int32, col_utf8 FROM test DISTRIBUTE BY (col_utf8)"; let plan = test_sql(sql)?; - let expected = "Repartition: DistributeBy(col_utf8)\ + let expected = "Repartition: DistributeBy(test.col_utf8)\ \n TableScan: test projection=[col_int32, col_utf8]"; assert_eq!(expected, format!("{plan:?}")); Ok(()) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index f8979bde3086..9b8356701a40 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -26,6 +26,7 @@ use datafusion_expr::planner::PlannerResult; use datafusion_expr::{Case, Expr}; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use datafusion_expr::UNNAMED_TABLE; impl<'a, S: ContextProvider> SqlToRel<'a, S> { pub(super) fn sql_identifier_to_expr( @@ -50,40 +51,35 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // compound identifiers, but this is not a compound // identifier. (e.g. it is "foo.bar" not foo.bar) let normalize_ident = self.normalizer.normalize(id); - match schema.field_with_unqualified_name(normalize_ident.as_str()) { - Ok(_) => { - // found a match without a qualified name, this is a inner table column - Ok(Expr::Column(Column { - relation: None, - name: normalize_ident, - })) - } - Err(_) => { - // check the outer_query_schema and try to find a match - if let Some(outer) = planner_context.outer_query_schema() { - match outer.qualified_field_with_unqualified_name( - normalize_ident.as_str(), - ) { - Ok((qualifier, field)) => { - // found an exact match on a qualified name in the outer plan schema, so this is an outer reference column - Ok(Expr::OuterReferenceColumn( - field.data_type().clone(), - Column::from((qualifier, field)), - )) - } - Err(_) => Ok(Expr::Column(Column { - relation: None, - name: normalize_ident, - })), - } - } else { - Ok(Expr::Column(Column { - relation: None, - name: normalize_ident, - })) - } + + // Check for qualified field with unqualified name + if let Ok((qualifier, _)) = + schema.qualified_field_with_unqualified_name(normalize_ident.as_str()) + { + return Ok(Expr::Column(Column { + relation: qualifier.filter(|q| q.table() != UNNAMED_TABLE).cloned(), + name: normalize_ident, + })); + } + + // Check the outer query schema + if let Some(outer) = planner_context.outer_query_schema() { + if let Ok((qualifier, field)) = + outer.qualified_field_with_unqualified_name(normalize_ident.as_str()) + { + // Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column + return Ok(Expr::OuterReferenceColumn( + field.data_type().clone(), + Column::from((qualifier, field)), + )); } } + + // Default case + Ok(Expr::Column(Column { + relation: None, + name: normalize_ident, + })) } } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 3291560383df..511f97c4750e 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -3274,7 +3274,7 @@ fn test_offset_before_limit() { #[test] fn test_distribute_by() { let sql = "select id from person distribute by state"; - let expected = "Repartition: DistributeBy(state)\ + let expected = "Repartition: DistributeBy(person.state)\ \n Projection: person.id\ \n TableScan: person"; quick_test(sql, expected); diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index a3cc10e1eeb8..b7d466d8bf82 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4077,7 +4077,7 @@ FROM (SELECT c, b, a, SUM(d) as sum1 DISTRIBUTE BY a ---- logical_plan -01)Repartition: DistributeBy(a) +01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a) 02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1 03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]] 04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]