Skip to content

Commit

Permalink
Use PhysicalExtensionCodec consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
joroKr21 committed Apr 14, 2024
1 parent f60c1bc commit 5836e7e
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 88 deletions.
88 changes: 72 additions & 16 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ impl From<&protobuf::PhysicalColumn> for Column {
/// # Arguments
///
/// * `proto` - Input proto with physical sort expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_sort_expr(
proto: &protobuf::PhysicalSortExprNode,
registry: &dyn FunctionRegistry,
Expand All @@ -102,9 +103,10 @@ pub fn parse_physical_sort_expr(
/// # Arguments
///
/// * `proto` - Input proto with vector of physical sort expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_sort_exprs(
proto: &[protobuf::PhysicalSortExprNode],
registry: &dyn FunctionRegistry,
Expand All @@ -123,25 +125,39 @@ pub fn parse_physical_sort_exprs(
///
/// # Arguments
///
/// * `proto` - Input proto with physical window exprression node.
/// * `proto` - Input proto with physical window expression node.
/// * `name` - Name of the window expression.
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
pub fn parse_physical_window_expr(
proto: &protobuf::PhysicalWindowExprNode,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
) -> Result<Arc<dyn WindowExpr>> {
let codec = DefaultPhysicalExtensionCodec {};
parse_physical_window_expr_ext(
proto,
registry,
input_schema,
&DefaultPhysicalExtensionCodec {},
)
}

// TODO: Make this the public function on next major release.
pub(crate) fn parse_physical_window_expr_ext(
proto: &protobuf::PhysicalWindowExprNode,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn WindowExpr>> {
let window_node_expr =
parse_physical_exprs(&proto.args, registry, input_schema, &codec)?;
parse_physical_exprs(&proto.args, registry, input_schema, codec)?;

let partition_by =
parse_physical_exprs(&proto.partition_by, registry, input_schema, &codec)?;
parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;

let order_by =
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, &codec)?;
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;

let window_frame = proto
.window_frame
Expand Down Expand Up @@ -187,9 +203,10 @@ where
/// # Arguments
///
/// * `proto` - Input proto with physical expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
registry: &dyn FunctionRegistry,
Expand All @@ -213,13 +230,15 @@ pub fn parse_physical_expr(
registry,
"left",
input_schema,
codec,
)?,
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
parse_required_physical_expr(
binary_expr.r.as_deref(),
registry,
"right",
input_schema,
codec,
)?,
)),
ExprType::AggregateExpr(_) => {
Expand All @@ -241,6 +260,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::IsNotNullExpr(e) => {
Expand All @@ -249,20 +269,23 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
e.expr.as_deref(),
registry,
"expr",
input_schema,
codec,
)?)),
ExprType::Negative(e) => {
Arc::new(NegativeExpr::new(parse_required_physical_expr(
e.expr.as_deref(),
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::InList(e) => in_list(
Expand All @@ -271,6 +294,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
parse_physical_exprs(&e.list, registry, input_schema, codec)?,
&e.negated,
Expand All @@ -290,12 +314,14 @@ pub fn parse_physical_expr(
registry,
"when_expr",
input_schema,
codec,
)?,
parse_required_physical_expr(
e.then_expr.as_ref(),
registry,
"then_expr",
input_schema,
codec,
)?,
))
})
Expand All @@ -311,6 +337,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
convert_required!(e.arrow_type)?,
None,
Expand All @@ -321,6 +348,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
convert_required!(e.arrow_type)?,
)),
Expand Down Expand Up @@ -371,12 +399,14 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
parse_required_physical_expr(
like_expr.pattern.as_deref(),
registry,
"pattern",
input_schema,
codec,
)?,
)),
};
Expand All @@ -389,9 +419,9 @@ fn parse_required_physical_expr(
registry: &dyn FunctionRegistry,
field: &str,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn PhysicalExpr>> {
let codec = DefaultPhysicalExtensionCodec {};
expr.map(|e| parse_physical_expr(e, registry, input_schema, &codec))
expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
.transpose()?
.ok_or_else(|| {
DataFusionError::Internal(format!("Missing required field {field:?}"))
Expand Down Expand Up @@ -433,15 +463,29 @@ pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
) -> Result<Option<Partitioning>> {
parse_protobuf_hash_partitioning_ext(
partitioning,
registry,
input_schema,
&DefaultPhysicalExtensionCodec {},
)
}

// TODO: Make this the public function on next major release.
fn parse_protobuf_hash_partitioning_ext(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Option<Partitioning>> {
match partitioning {
Some(hash_part) => {
let codec = DefaultPhysicalExtensionCodec {};
let expr = parse_physical_exprs(
&hash_part.hash_expr,
registry,
input_schema,
&codec,
codec,
)?;

Ok(Some(Partitioning::Hash(
Expand All @@ -456,6 +500,19 @@ pub fn parse_protobuf_hash_partitioning(
pub fn parse_protobuf_file_scan_config(
proto: &protobuf::FileScanExecConf,
registry: &dyn FunctionRegistry,
) -> Result<FileScanConfig> {
parse_protobuf_file_scan_config_ext(
proto,
registry,
&DefaultPhysicalExtensionCodec {},
)
}

// TODO: Make this the public function on next major release.
pub(crate) fn parse_protobuf_file_scan_config_ext(
proto: &protobuf::FileScanExecConf,
registry: &dyn FunctionRegistry,
codec: &dyn PhysicalExtensionCodec,
) -> Result<FileScanConfig> {
let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?);
let projection = proto
Expand Down Expand Up @@ -489,7 +546,7 @@ pub fn parse_protobuf_file_scan_config(
.collect::<Result<Vec<_>>>()?;

// Remove partition columns from the schema after recreating table_partition_cols
// because the partition columns are not in the file. They are present to allow the
// because the partition columns are not in the file. They are present to allow
// the partition column types to be reconstructed after serde.
let file_schema = Arc::new(Schema::new(
schema
Expand All @@ -502,12 +559,11 @@ pub fn parse_protobuf_file_scan_config(

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
let codec = DefaultPhysicalExtensionCodec {};
let sort_expr = parse_physical_sort_exprs(
&node_collection.physical_sort_expr_nodes,
registry,
&schema,
&codec,
codec,
)?;
output_ordering.push(sort_expr);
}
Expand Down
51 changes: 27 additions & 24 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,8 @@ use std::convert::TryInto;
use std::fmt::Debug;
use std::sync::Arc;

use self::from_proto::parse_physical_window_expr;
use self::to_proto::serialize_physical_expr;

use crate::common::{byte_to_string, proto_error, str_to_byte};
use crate::convert_required;
use crate::physical_plan::from_proto::{
parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
parse_protobuf_file_scan_config,
};
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::physical_plan_node::PhysicalPlanType;
use crate::protobuf::repartition_exec_node::PartitionMethod;
use crate::protobuf::{
self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection,
};
use prost::bytes::BufMut;
use prost::Message;

use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -79,8 +65,22 @@ use datafusion::physical_plan::{
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::ScalarUDF;

use prost::bytes::BufMut;
use prost::Message;
use crate::common::{byte_to_string, proto_error, str_to_byte};
use crate::convert_required;
use crate::physical_plan::from_proto::{
parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
parse_physical_window_expr_ext, parse_protobuf_file_scan_config,
parse_protobuf_file_scan_config_ext,
};
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::physical_plan_node::PhysicalPlanType;
use crate::protobuf::repartition_exec_node::PartitionMethod;
use crate::protobuf::{
self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection,
};

use self::to_proto::serialize_physical_expr;

pub mod from_proto;
pub mod to_proto;
Expand Down Expand Up @@ -188,9 +188,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
}
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
parse_protobuf_file_scan_config_ext(
scan.base_conf.as_ref().unwrap(),
registry,
extension_codec,
)?,
scan.has_header,
str_to_byte(&scan.delimiter, "delimiter")?,
Expand Down Expand Up @@ -230,12 +231,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
Default::default(),
)))
}
PhysicalPlanType::AvroScan(scan) => {
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
parse_protobuf_file_scan_config_ext(
scan.base_conf.as_ref().unwrap(),
registry,
)?)))
}
extension_codec,
)?,
))),
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&coalesce_batches.input,
Expand Down Expand Up @@ -334,10 +336,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
.window_expr
.iter()
.map(|window_expr| {
parse_physical_window_expr(
parse_physical_window_expr_ext(
window_expr,
registry,
input_schema.as_ref(),
extension_codec,
)
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down
Loading

0 comments on commit 5836e7e

Please sign in to comment.