Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
andylokandy committed Oct 13, 2023
1 parent 9677029 commit bb522f5
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 91 deletions.
192 changes: 102 additions & 90 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,51 @@ impl TableSchema {
}
}

/// Flatten all leaf columns in the order of column id.
pub fn flatten_leaf_columns(&self) -> Self {
let mut fields = self.leaf_and_intermediate_fields();
/// Flatten all columns that user can directly refer to, e.g., `a`, `a:b`.
pub fn flatten_referable_columns(&self) -> Self {
fn collect_in_field(
field: &TableField,
fields: &mut Vec<TableField>,
is_nullable: bool,
next_column_id: &mut ColumnId,
) {
let ty = field.data_type();
let is_nullable = ty.is_nullable() || is_nullable;

if let TableDataType::Tuple {
fields_type,
fields_name,
} = ty.remove_nullable()
{
for (name, ty) in fields_name.iter().zip(fields_type) {
collect_in_field(
&TableField::new_from_column_id(
&format!("{}:{}", field.name(), name),
ty.clone(),
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
}

*next_column_id += 1;
let mut field = field.clone();
if is_nullable {
field.data_type = field.data_type.wrap_nullable();
}
fields.push(field)
}

let mut fields = Vec::new();
for field in self.fields() {
let mut next_column_id = field.column_id;
collect_in_field(field, &mut fields, false, &mut next_column_id);
}
fields.sort_by_key(|f| f.column_id());

Self {
fields,
metadata: self.metadata.clone(),
Expand Down Expand Up @@ -786,106 +827,77 @@ impl TableSchema {
/// For example, if the field is `s Tuple(f1 Tuple(f2 Int32))`
/// its name will be `s:f1:f2` instead of `f2`.
pub fn leaf_fields(&self) -> Vec<TableField> {
let mut fields = Vec::new();
for field in self.fields() {
if is_internal_column_id(field.column_id) {
// Skip internal columns
continue;
}
let mut next_column_id = field.column_id;
Self::collect_in_field(field, &mut fields, false, &mut next_column_id, false);
}
fields
}

/// Return all leaf fields and intermediate fields with column id.
///
/// Note: the name of the inner fields is a full-path name.
///
/// For example, if the field is `s Tuple(f1 Tuple(f2 Int32))`
/// its name will be `s:f1:f2` instead of `f2`.
pub fn leaf_and_intermediate_fields(&self) -> Vec<TableField> {
let mut fields = Vec::new();
for field in self.fields() {
if is_internal_column_id(field.column_id) {
// Skip internal columns
continue;
}
let mut next_column_id = field.column_id;
Self::collect_in_field(field, &mut fields, false, &mut next_column_id, true);
}
fields
}

fn collect_in_field(
field: &TableField,
fields: &mut Vec<TableField>,
is_nullable: bool,
next_column_id: &mut ColumnId,
include_intermediate: bool,
) {
let ty = field.data_type();
let is_nullable = ty.is_nullable() || is_nullable;
let mut is_leaf = false;

match ty.remove_nullable() {
TableDataType::Tuple {
fields_type,
fields_name,
} => {
for (name, ty) in fields_name.iter().zip(fields_type) {
Self::collect_in_field(
fn collect_in_field(
field: &TableField,
fields: &mut Vec<TableField>,
is_nullable: bool,
next_column_id: &mut ColumnId,
) {
let ty = field.data_type();
let is_nullable = ty.is_nullable() || is_nullable;
match ty.remove_nullable() {
TableDataType::Tuple {
fields_type,
fields_name,
} => {
for (name, ty) in fields_name.iter().zip(fields_type) {
collect_in_field(
&TableField::new_from_column_id(
&format!("{}:{}", field.name(), name),
ty.clone(),
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
}
TableDataType::Array(ty) => {
collect_in_field(
&TableField::new_from_column_id(
&format!("{}:{}", field.name(), name),
ty.clone(),
&format!("{}:0", field.name()),
ty.as_ref().to_owned(),
*next_column_id,
),
fields,
is_nullable,
next_column_id,
include_intermediate,
);
}
}
TableDataType::Array(ty) => {
Self::collect_in_field(
&TableField::new_from_column_id(
&format!("{}:0", field.name()),
ty.as_ref().to_owned(),
*next_column_id,
),
fields,
is_nullable,
next_column_id,
include_intermediate,
);
}
TableDataType::Map(ty) => {
Self::collect_in_field(
&TableField::new_from_column_id(
field.name(),
ty.as_ref().to_owned(),
*next_column_id,
),
fields,
is_nullable,
next_column_id,
include_intermediate,
);
}
_ => {
is_leaf = true;
TableDataType::Map(ty) => {
collect_in_field(
&TableField::new_from_column_id(
field.name(),
ty.as_ref().to_owned(),
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
_ => {
*next_column_id += 1;
let mut field = field.clone();
if is_nullable {
field.data_type = field.data_type.wrap_nullable();
}
fields.push(field)
}
}
}

if is_leaf || include_intermediate {
*next_column_id += 1;
let mut field = field.clone();
if is_nullable {
field.data_type = field.data_type.wrap_nullable();
let mut fields = Vec::new();
for field in self.fields() {
if is_internal_column_id(field.column_id) {
// Skip internal columns
continue;
}
fields.push(field)
let mut next_column_id = field.column_id;
collect_in_field(field, &mut fields, false, &mut next_column_id);
}
fields
}

/// project will do column pruning.
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/semantic/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl TypeProvider<String> for DataSchema {

impl TypeProvider<String> for TableSchema {
fn get_type(&self, column_name: &String) -> Result<DataType> {
let flatten_schema = self.flatten_leaf_columns();
let flatten_schema = self.flatten_referable_columns();
let column = flatten_schema.field_with_name(column_name)?;
Ok(column.data_type().into())
}
Expand Down

0 comments on commit bb522f5

Please sign in to comment.