Skip to content

Commit

Permalink
Minor: Add Column::from(Tableref, &FieldRef), Expr::from(Column)
Browse files Browse the repository at this point in the history
…and `Expr::from(Tableref, &FieldRef)` (apache#10178)

* Minor: Add `Column::from(Tableref, &FieldRef)`

* Add Expr::from()

* fix docs

* Fix doc test
  • Loading branch information
alamb committed Apr 23, 2024
1 parent da82cec commit 089a42a
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 45 deletions.
4 changes: 1 addition & 3 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ impl ConvertOpt {
.schema()
.iter()
.take(schema.fields.len() - 1)
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.map(Expr::from)
.collect();

csv = csv.select(selection)?;
Expand Down
11 changes: 10 additions & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Column

use arrow_schema::Field;
use arrow_schema::{Field, FieldRef};

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
Expand Down Expand Up @@ -63,6 +63,8 @@ impl Column {
}

/// Create Column from unqualified name.
///
/// Alias for `Column::new_unqualified`
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
Expand Down Expand Up @@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column {
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &FieldRef)> for Column {
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
Self::new(relation.cloned(), field.name())
}
}

impl FromStr for Column {
type Err = Infallible;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ impl DataFrame {
col_exists = true;
new_column.clone()
} else {
col(Column::from((qualifier, field.as_ref())))
col(Column::from((qualifier, field)))
}
})
.collect();
Expand Down Expand Up @@ -1402,9 +1402,9 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field.as_ref()))).alias(new_name)
col(Column::from((qualifier, field))).alias(new_name)
} else {
col(Column::from((qualifier, field.as_ref())))
col(Column::from((qualifier, field)))
}
})
.collect::<Vec<_>>();
Expand Down
11 changes: 2 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,15 +1261,8 @@ impl DefaultPhysicalPlanner {

// Remove temporary projected columns
if left_projected || right_projected {
let final_join_result = join_schema
.iter()
.map(|(qualifier, field)| {
Expr::Column(datafusion_common::Column::from((
qualifier,
field.as_ref(),
)))
})
.collect::<Vec<_>>();
let final_join_result =
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
let projection = LogicalPlan::Projection(Projection::try_new(
final_join_result,
Arc::new(new_join),
Expand Down
42 changes: 41 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
Signature,
};

use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
Expand Down Expand Up @@ -84,6 +84,29 @@ use sqlparser::ast::NullTreatment;
/// assert_eq!(binary_expr.op, Operator::Eq);
/// }
/// ```
///
/// ## Return a list of [`Expr::Column`] from a schema's columns
/// ```
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion_common::{DFSchema, Column};
/// # use datafusion_expr::Expr;
///
/// let arrow_schema = Schema::new(vec![
/// Field::new("c1", DataType::Int32, false),
/// Field::new("c2", DataType::Float64, false),
/// ]);
/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
///
/// // Form a list of expressions for each item in the schema
/// let exprs: Vec<_> = df_schema.iter()
/// .map(Expr::from)
/// .collect();
///
/// assert_eq!(exprs, vec![
/// Expr::from(Column::from_qualified_name("t1.c1")),
/// Expr::from(Column::from_qualified_name("t1.c2")),
/// ]);
/// ```
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum Expr {
/// An expression with a specific name.
Expand Down Expand Up @@ -190,6 +213,23 @@ impl Default for Expr {
}
}

/// Create an [`Expr`] from a [`Column`]
impl From<Column> for Expr {
fn from(value: Column) -> Self {
Expr::Column(value)
}
}

/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
/// useful for creating [`Expr`] from a [`DFSchema`].
///
/// See example on [`Expr`]
impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr {
fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self {
Expr::from(Column::from(value))
}
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub expr: Box<Expr>,
Expand Down
8 changes: 1 addition & 7 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ pub fn coerce_plan_expr_for_schema(
Ok(LogicalPlan::Projection(projection))
}
_ => {
let exprs: Vec<Expr> = plan
.schema()
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect();
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();

let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
let add_project = new_exprs.iter().any(|expr| expr.try_into_col().is_err());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ pub fn unnest_with_options(
return Ok(input);
}
};
qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref())));
qualified_columns.push(Column::from((unnest_qualifier, &unnested_field)));
unnested_fields.insert(index, unnested_field);
}

Expand Down
9 changes: 2 additions & 7 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,7 @@ fn get_exprs_except_skipped(
columns_to_skip: HashSet<Column>,
) -> Vec<Expr> {
if columns_to_skip.is_empty() {
schema
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect::<Vec<Expr>>()
schema.iter().map(Expr::from).collect::<Vec<Expr>>()
} else {
schema
.columns()
Expand Down Expand Up @@ -855,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
match expr {
Expr::Column(col) => {
let (qualifier, field) = plan.schema().qualified_field_from_column(col)?;
Ok(Expr::Column(Column::from((qualifier, field))))
Ok(Expr::from(Column::from((qualifier, field))))
}
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
}
Expand Down
7 changes: 2 additions & 5 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ fn build_common_expr_project_plan(

for (qualifier, field) in input.schema().iter() {
if fields_set.insert(qualified_name(qualifier, field.name())) {
project_exprs.push(Expr::Column(Column::from((qualifier, field.as_ref()))));
project_exprs.push(Expr::from((qualifier, field)));
}
}

Expand All @@ -525,10 +525,7 @@ fn build_recover_project_plan(
schema: &DFSchema,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let col_exprs = schema
.iter()
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field.as_ref()))))
.collect();
let col_exprs = schema.iter().map(Expr::from).collect();
Ok(LogicalPlan::Projection(Projection::try_new(
col_exprs,
Arc::new(input),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
.skip(on_expr.len())
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier, old_field))| {
Ok(col(Column::from((new_qualifier, new_field.as_ref())))
Ok(col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name()))
})
.collect::<Result<Vec<Expr>>>()?;
Expand Down
8 changes: 3 additions & 5 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator, TrimWhereField, Va
use sqlparser::parser::ParserError::ParserError;

use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema,
Result, ScalarValue,
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
ScalarValue,
};
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::expr::InList;
Expand Down Expand Up @@ -142,9 +142,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
_ => false,
}) {
Some((qualifier, df_field)) => {
Expr::Column(Column::from((qualifier, df_field.as_ref())))
}
Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
None => Expr::Column(col),
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,8 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
))
} else {
datafusion_expr::Expr::Column(Column::from((
qualifier,
field.as_ref(),
qualifier, field,
)))
}
}
Expand Down

0 comments on commit 089a42a

Please sign in to comment.