Skip to content

Commit

Permalink
Use PhysicalExtensionCodec consisdently also when serializing
Browse files Browse the repository at this point in the history
  • Loading branch information
joroKr21 committed Apr 14, 2024
1 parent 5836e7e commit d5447f2
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 279 deletions.
60 changes: 37 additions & 23 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,22 @@ use crate::physical_plan::from_proto::{
parse_physical_window_expr_ext, parse_protobuf_file_scan_config,
parse_protobuf_file_scan_config_ext,
};
use crate::physical_plan::to_proto::{
serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
serialize_physical_window_expr,
};
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::physical_plan_node::PhysicalPlanType;
use crate::protobuf::repartition_exec_node::PartitionMethod;
use crate::protobuf::{
self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection,
};
use crate::protobuf::{self, window_agg_exec_node};

use self::to_proto::serialize_physical_expr;

pub mod from_proto;
pub mod to_proto;

impl AsExecutionPlan for PhysicalPlanNode {
impl AsExecutionPlan for protobuf::PhysicalPlanNode {
fn try_decode(buf: &[u8]) -> Result<Self>
where
Self: Sized,
Expand Down Expand Up @@ -1448,14 +1450,17 @@ impl AsExecutionPlan for PhysicalPlanNode {
let filter = exec
.filter_expr()
.iter()
.map(|expr| expr.to_owned().try_into())
.map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
.collect::<Result<Vec<_>>>()?;

let agg = exec
.aggr_expr()
.iter()
.map(|expr| expr.to_owned().try_into())
.map(|expr| {
serialize_physical_aggr_expr(expr.to_owned(), extension_codec)
})
.collect::<Result<Vec<_>>>()?;

let agg_names = exec
.aggr_expr()
.iter()
Expand Down Expand Up @@ -1555,7 +1560,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CsvScan(
protobuf::CsvScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
base_conf: Some(serialize_file_scan_config(
exec.base_config(),
extension_codec,
)?),
has_header: exec.has_header(),
delimiter: byte_to_string(exec.delimiter(), "delimiter")?,
quote: byte_to_string(exec.quote(), "quote")?,
Expand All @@ -1580,7 +1588,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
base_conf: Some(serialize_file_scan_config(
exec.base_config(),
extension_codec,
)?),
predicate,
},
)),
Expand All @@ -1591,7 +1602,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AvroScan(
protobuf::AvroScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
base_conf: Some(serialize_file_scan_config(
exec.base_config(),
extension_codec,
)?),
},
)),
});
Expand Down Expand Up @@ -1687,7 +1701,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
}

if let Some(union) = plan.downcast_ref::<UnionExec>() {
let mut inputs: Vec<PhysicalPlanNode> = vec![];
let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
for input in union.inputs() {
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
input.to_owned(),
Expand All @@ -1702,7 +1716,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
}

if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
let mut inputs: Vec<PhysicalPlanNode> = vec![];
let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
for input in interleave.inputs() {
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
input.to_owned(),
Expand Down Expand Up @@ -1808,11 +1822,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;

let window_expr =
exec.window_expr()
.iter()
.map(|e| e.clone().try_into())
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(e.clone(), extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
Expand All @@ -1838,11 +1852,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;

let window_expr =
exec.window_expr()
.iter()
.map(|e| e.clone().try_into())
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(e.clone(), extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;

let partition_keys = exec
.partition_keys
Expand Down Expand Up @@ -1900,7 +1914,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(sort_expr)
})
.collect::<Result<Vec<_>>>()?;
Some(PhysicalSortExprNodeCollection {
Some(protobuf::PhysicalSortExprNodeCollection {
physical_sort_expr_nodes: expr,
})
}
Expand Down Expand Up @@ -2043,7 +2057,7 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
}

fn into_physical_plan(
node: &Option<Box<PhysicalPlanNode>>,
node: &Option<Box<protobuf::PhysicalPlanNode>>,
registry: &dyn FunctionRegistry,
runtime: &RuntimeEnv,
extension_codec: &dyn PhysicalExtensionCodec,
Expand Down
Loading

0 comments on commit d5447f2

Please sign in to comment.