Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table_function): table_function inspect_parquet(s) #13214

Merged
merged 28 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use parquet2::read_parquet_metas_in_parallel;
pub use parquet2::read_parquet_schema_async;

pub mod parquet_rs;
pub use parquet_rs::read_metadata_async;
pub use parquet_rs::read_parquet_schema_async_rs;

mod stage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::cmp::max;
use std::sync::Arc;

use common_catalog::plan::DataSourcePlan;
use common_catalog::plan::PartStatistics;
use common_catalog::plan::Partitions;
use common_catalog::plan::PushDownInfo;
use common_catalog::table::Table;
use common_catalog::table_args::TableArgs;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::Int64Type;
use common_expression::types::NumberDataType;
use common_expression::types::StringType;
use common_expression::types::UInt64Type;
use common_expression::types::ValueType;
use common_expression::BlockEntry;
use common_expression::DataBlock;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchema;
use common_expression::TableSchemaRefExt;
use common_expression::Value;
use common_meta_app::schema::TableIdent;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_sources::AsyncSource;
use common_pipeline_sources::AsyncSourcer;
use common_sql::binder::resolve_stage_location;
use common_storage::init_stage_operator;
use common_storage::read_metadata_async;
use common_storage::StageFilesInfo;
use common_storages_fuse::table_functions::string_literal;

use crate::pipelines::processors::port::OutputPort;
use crate::pipelines::Pipeline;
use crate::sessions::TableContext;
use crate::table_functions::TableFunction;

const INSPECT_PARQUET: &str = "inspect_parquet";

pub struct InspectParquetTable {
uri: String,
table_info: TableInfo,
}

impl InspectParquetTable {
pub fn create(
database_name: &str,
table_func_name: &str,
table_id: u64,
table_args: TableArgs,
) -> Result<Arc<dyn TableFunction>> {
let args = table_args.expect_all_positioned(table_func_name, Some(1))?;
let file_path = String::from_utf8(
args[0]
.clone()
.into_string()
.map_err(|_| ErrorCode::BadArguments("Expected string argument."))?,
)?;
if !file_path.starts_with('@') {
return Err(ErrorCode::BadArguments(format!(
"stage path must start with @, but got {}",
file_path
)));
}

let table_info = TableInfo {
ident: TableIdent::new(table_id, 0),
desc: format!("'{}'.'{}'", database_name, table_func_name),
name: table_func_name.to_string(),
meta: TableMeta {
schema: Self::schema(),
engine: INSPECT_PARQUET.to_owned(),
..Default::default()
},
..Default::default()
};

Ok(Arc::new(Self {
uri: file_path,
table_info,
}))
}

pub fn schema() -> Arc<TableSchema> {
TableSchemaRefExt::create(vec![
TableField::new("created_by", TableDataType::String),
TableField::new("num_columns", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("num_rows", TableDataType::Number(NumberDataType::UInt64)),
TableField::new(
"num_row_groups",
TableDataType::Number(NumberDataType::UInt64),
),
TableField::new(
"serialized_size",
TableDataType::Number(NumberDataType::UInt64),
),
TableField::new(
"max_row_groups_size_compressed",
TableDataType::Number(NumberDataType::Int64),
),
TableField::new(
"max_row_groups_size_uncompressed",
TableDataType::Number(NumberDataType::Int64),
),
])
}
}

#[async_trait::async_trait]
impl Table for InspectParquetTable {
fn as_any(&self) -> &dyn Any {
self
}

fn get_table_info(&self) -> &TableInfo {
&self.table_info
}

#[async_backtrace::framed]
async fn read_partitions(
&self,
_ctx: Arc<dyn TableContext>,
_push_downs: Option<PushDownInfo>,
_dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
Ok((PartStatistics::default(), Partitions::default()))
}

fn table_args(&self) -> Option<TableArgs> {
Some(TableArgs::new_positioned(vec![string_literal(
self.uri.as_str(),
)]))
}

fn read_data(
&self,
ctx: Arc<dyn TableContext>,
_plan: &DataSourcePlan,
pipeline: &mut Pipeline,
_put_cache: bool,
) -> Result<()> {
pipeline.add_source(
|output| InspectParquetSource::create(ctx.clone(), output, self.uri.clone()),
1,
)?;
Ok(())
}
}

impl TableFunction for InspectParquetTable {
fn function_name(&self) -> &str {
self.name()
}

fn as_table<'a>(self: Arc<Self>) -> Arc<dyn Table + 'a>
where Self: 'a {
self
}
}

struct InspectParquetSource {
is_finished: bool,
ctx: Arc<dyn TableContext>,
uri: String,
}

impl InspectParquetSource {
pub fn create(
ctx: Arc<dyn TableContext>,
output: Arc<OutputPort>,
uri: String,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx.clone(), output, InspectParquetSource {
is_finished: false,
ctx,
uri,
})
}
}

#[async_trait::async_trait]
impl AsyncSource for InspectParquetSource {
const NAME: &'static str = INSPECT_PARQUET;

#[async_trait::unboxed_simple]
#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
if self.is_finished {
return Ok(None);
}
self.is_finished = true;
let uri = self.uri.strip_prefix('@').unwrap().to_string();
let (stage_info, path) = resolve_stage_location(&self.ctx, &uri).await?;

let operator = init_stage_operator(&stage_info)?;

let file_info = StageFilesInfo {
path: path.clone(),
files: None,
pattern: None,
};

let first_file = file_info.first_file(&operator).await?;

let parquet_schema =
read_metadata_async(&first_file.path, &operator, Some(first_file.size)).await?;
let created = match parquet_schema.file_metadata().created_by() {
Some(user) => user.to_owned(),
None => String::from("NULL"),
};
let serialized_size: u64 = first_file.size;
let num_columns: u64 = if parquet_schema.num_row_groups() > 0 {
parquet_schema.row_group(0).num_columns() as u64
} else {
0
};
let mut max_compressed: i64 = 0;
let mut max_uncompressed: i64 = 0;
for grp in parquet_schema.row_groups().iter() {
youngsofun marked this conversation as resolved.
Show resolved Hide resolved
let mut grp_compressed_size: i64 = 0;
let mut grp_uncompressed_size: i64 = 0;
for col in grp.columns().iter() {
grp_compressed_size += col.compressed_size();
grp_uncompressed_size += col.uncompressed_size();
}
max_compressed = max(max_compressed, grp_compressed_size);
max_uncompressed = max(max_uncompressed, grp_uncompressed_size);
}
let block = DataBlock::new(
vec![
BlockEntry::new(
DataType::String,
Value::Scalar(StringType::upcast_scalar(created.into())),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(UInt64Type::upcast_scalar(num_columns)),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(UInt64Type::upcast_scalar(
parquet_schema.file_metadata().num_rows() as u64,
)),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(UInt64Type::upcast_scalar(
parquet_schema.num_row_groups() as u64
)),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(UInt64Type::upcast_scalar(serialized_size)),
),
BlockEntry::new(
DataType::Number(NumberDataType::Int64),
Value::Scalar(Int64Type::upcast_scalar(max_compressed)),
),
BlockEntry::new(
DataType::Number(NumberDataType::Int64),
Value::Scalar(Int64Type::upcast_scalar(max_uncompressed)),
),
],
1,
);
Ok(Some(block))
}
}
17 changes: 17 additions & 0 deletions src/query/service/src/table_functions/inspect_parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod inspect_parquet_table;

pub use inspect_parquet_table::InspectParquetTable;
1 change: 1 addition & 0 deletions src/query/service/src/table_functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod async_crash_me;
mod infer_schema;
mod inspect_parquet;
mod list_stage;
mod numbers;
mod openai;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::storages::fuse::table_functions::FuseSnapshotTable;
use crate::storages::fuse::table_functions::FuseStatisticTable;
use crate::table_functions::async_crash_me::AsyncCrashMeTable;
use crate::table_functions::infer_schema::InferSchemaTable;
use crate::table_functions::inspect_parquet::InspectParquetTable;
use crate::table_functions::list_stage::ListStageTable;
use crate::table_functions::numbers::NumbersTable;
use crate::table_functions::srf::RangeTable;
Expand Down Expand Up @@ -149,6 +150,10 @@ impl TableFunctionFactory {
"infer_schema".to_string(),
(next_id(), Arc::new(InferSchemaTable::create)),
);
creators.insert(
"inspect_parquet".to_string(),
(next_id(), Arc::new(InspectParquetTable::create)),
);

creators.insert(
"list_stage".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions tests/sqllogictests/suites/stage_parquet/inspect_parquet
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
query
select * from inspect_parquet('@data/parquet/tuple.parquet')
----
Arrow2 - Native Rust implementation of Arrow 3 3 1 431 109 105
Loading