diff --git a/src/query/functions/src/aggregates/aggregator_common.rs b/src/query/functions/src/aggregates/aggregator_common.rs index d68e198125a4..a0422b583c53 100644 --- a/src/query/functions/src/aggregates/aggregator_common.rs +++ b/src/query/functions/src/aggregates/aggregator_common.rs @@ -156,6 +156,42 @@ pub fn eval_aggr( Ok((builder.build(), data_type)) } +/// Evaluates a unary aggregation function for multiple discrete chunks. +/// This function treats the input columns as multiple inputs to a unary function. +pub fn eval_unary_aggr_for_discrete_chunks( + name: &str, + params: Vec, + columns: &[Column], + rows: usize, +) -> Result<(Column, DataType)> { + if columns.is_empty() { + return Err(ErrorCode::BadArguments(format!( + "No input columns provided for aggregation function '{}'", + name + ))); + } + if !columns + .iter() + .all(|x| x.data_type() == columns[0].data_type()) + { + return Err(ErrorCode::BadArguments( + "All input columns must have the same data type".to_string(), + )); + } + let factory = AggregateFunctionFactory::instance(); + + let func = factory.get(name, params, vec![columns[0].data_type()])?; + let data_type = func.return_type()?; + + let eval = EvalAggr::new(func.clone()); + for col in columns.chunks(1) { + func.accumulate(eval.addr, col, None, rows)?; + } + let mut builder = ColumnBuilder::with_capacity(&data_type, 1024); + func.merge_result(eval.addr, &mut builder)?; + Ok((builder.build(), data_type)) +} + #[inline] pub fn borsh_serialize_state( writer: &mut W, diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index 2654c246f5c2..0a96b246534e 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -113,7 +113,7 @@ impl<'a> BlockWriter<'a> { let maybe_bloom_index = BloomIndex::try_create( FunctionContext::default(), location.1, - &[block], + &[block.clone()], bloom_columns_map, )?; if let Some(bloom_index) = maybe_bloom_index { diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index 4bbc4279e6eb..81419290f59d 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -155,7 +155,7 @@ async fn generate_blocks(fuse_table: &FuseTable, num_blocks: usize) -> Result = stream.try_collect().await?; for block in blocks { - let stats = gen_columns_statistics(&block, None, &schema)?; + let stats = gen_columns_statistics(&[block.clone()], None, &schema)?; let (block_meta, _index_meta) = block_writer .write(FuseStorageFormat::Parquet, &schema, block, stats, None) .await?; diff --git a/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs b/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs index bdb737adff2e..213791fec9b7 100644 --- a/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs +++ b/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs @@ -393,7 +393,7 @@ async fn setup() -> databend_common_exception::Result { let block = DataBlock::new_from_columns(columns); let operator = Operator::new(opendal::services::Memory::default())?.finish(); let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned()); - let col_stats = gen_columns_statistics(&block, None, &schema)?; + let col_stats = gen_columns_statistics(&[block.clone()], None, &schema)?; let block_writer = BlockWriter::new(&operator, &loc_generator); let (_block_meta, thrift_file_meta) = block_writer .write(FuseStorageFormat::Parquet, &schema, block, col_stats, None) diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index ca06c33db0de..fb02425819df 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -81,7 +81,7 @@ async fn test_compact() -> Result<()> { .get_table(&ctx.get_tenant(), &db_name, &tbl_name) .await?; let res = do_compact(ctx.clone(), table.clone()).await; - assert!(res.is_ok()); + assert!(res.is_ok(), "{:?}", res); assert!(res.unwrap()); // check count diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 383743788863..d5fe79aaf4db 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -729,7 +729,7 @@ impl CompactSegmentTestFixture { for block in blocks { let block = block?; - let col_stats = gen_columns_statistics(&block, None, &schema)?; + let col_stats = gen_columns_statistics(&[block.clone()], None, &schema)?; let cluster_stats = if num_blocks % 5 == 0 { None diff --git a/src/query/service/tests/it/storages/fuse/statistics.rs b/src/query/service/tests/it/storages/fuse/statistics.rs index bea0c9224561..09d07843fddb 100644 --- a/src/query/service/tests/it/storages/fuse/statistics.rs +++ b/src/query/service/tests/it/storages/fuse/statistics.rs @@ -68,7 +68,7 @@ fn test_ft_stats_block_stats() -> databend_common_exception::Result<()> { StringType::from_data(vec!["aa", "aa", "bb"]), ]); - let r = gen_columns_statistics(&block, None, &schema)?; + let r = gen_columns_statistics(&[block.clone()], None, &schema)?; assert_eq!(2, r.len()); let col_stats = r.get(&0).unwrap(); assert_eq!(col_stats.min(), &Scalar::Number(NumberScalar::Int32(1))); @@ -95,7 +95,7 @@ fn test_ft_stats_block_stats_with_column_distinct_count() -> databend_common_exc let mut column_distinct_count = HashMap::new(); column_distinct_count.insert(0, 3); column_distinct_count.insert(1, 2); - let r = gen_columns_statistics(&block, Some(column_distinct_count), &schema)?; + let r = gen_columns_statistics(&[block.clone()], Some(column_distinct_count), &schema)?; assert_eq!(2, r.len()); let col_stats = r.get(&0).unwrap(); assert_eq!(col_stats.min(), &Scalar::Number(NumberScalar::Int32(1))); @@ -129,7 +129,7 @@ fn test_ft_tuple_stats_block_stats() -> databend_common_exception::Result<()> { let block = DataBlock::new_from_columns(vec![column]); - let r = gen_columns_statistics(&block, None, &schema)?; + let r = gen_columns_statistics(&[block.clone()], None, &schema)?; assert_eq!(2, r.len()); let col0_stats = r.get(&0).unwrap(); assert_eq!(col0_stats.min(), &Scalar::Number(NumberScalar::Int32(1))); @@ -151,7 +151,7 @@ fn test_ft_stats_col_stats_reduce() -> databend_common_exception::Result<()> { TestFixture::gen_sample_blocks_ex(num_of_blocks, rows_per_block, val_start_with); let col_stats = blocks .iter() - .map(|b| gen_columns_statistics(&b.clone().unwrap(), None, &schema)) + .map(|b| gen_columns_statistics(&[b.clone().unwrap()], None, &schema)) .collect::>>()?; let r = reducers::reduce_block_statistics(&col_stats); assert_eq!(3, r.len()); @@ -320,7 +320,7 @@ async fn test_accumulator() -> databend_common_exception::Result<()> { let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned()); for item in blocks { let block = item?; - let col_stats = gen_columns_statistics(&block, None, &schema)?; + let col_stats = gen_columns_statistics(&[block.clone()], None, &schema)?; let block_writer = BlockWriter::new(&operator, &loc_generator); let (block_meta, _index_meta) = block_writer .write(FuseStorageFormat::Parquet, &schema, block, col_stats, None) @@ -528,7 +528,7 @@ fn test_ft_stats_block_stats_string_columns_trimming_using_eval() let max_expr = max_col.0.index(0).unwrap(); // generate the statistics of column - let stats_of_columns = gen_columns_statistics(&block, None, &schema).unwrap(); + let stats_of_columns = gen_columns_statistics(&[block], None, &schema).unwrap(); // check if the max value (untrimmed) is in degenerated condition: // - the length of string value is larger or equal than STRING_PREFIX_LEN diff --git a/src/query/service/tests/it/storages/statistics/column_statistics.rs b/src/query/service/tests/it/storages/statistics/column_statistics.rs index 2fd67b01b1a7..1c5b8ce2ec23 100644 --- a/src/query/service/tests/it/storages/statistics/column_statistics.rs +++ b/src/query/service/tests/it/storages/statistics/column_statistics.rs @@ -83,7 +83,7 @@ fn gen_sample_block() -> (DataBlock, Vec, TableSchemaRef) { #[test] fn test_column_statistic() -> Result<()> { let (sample_block, sample_cols, schema) = gen_sample_block(); - let col_stats = gen_columns_statistics(&sample_block, None, &schema)?; + let col_stats = gen_columns_statistics(&[sample_block], None, &schema)?; assert_eq!(5, col_stats.len()); diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index 56df6dad9049..65c9ebd78e9b 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -178,7 +178,7 @@ impl BloomIndex { pub fn try_create( func_ctx: FunctionContext, version: u64, - data_blocks_tobe_indexed: &[&DataBlock], + data_blocks_tobe_indexed: &[DataBlock], bloom_columns_map: BTreeMap, ) -> Result> { if data_blocks_tobe_indexed.is_empty() { diff --git a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs index 3f62729929fd..6f8e02f72a26 100644 --- a/src/query/storages/common/index/tests/it/filters/bloom_filter.rs +++ b/src/query/storages/common/index/tests/it/filters/bloom_filter.rs @@ -144,14 +144,13 @@ fn test_bloom_filter() -> Result<()> { )), ]), ]; - let blocks_ref = blocks.iter().collect::>(); let bloom_columns = bloom_columns_map(schema.clone(), vec![0, 1, 2, 3]); let bloom_fields = bloom_columns.values().cloned().collect::>(); let index = BloomIndex::try_create( FunctionContext::default(), LatestBloom::VERSION, - &blocks_ref, + &blocks, bloom_columns, )? .unwrap(); @@ -316,14 +315,13 @@ fn test_specify_bloom_filter() -> Result<()> { UInt8Type::from_data(vec![1, 2]), StringType::from_data(vec!["a", "b"]), ])]; - let blocks_ref = blocks.iter().collect::>(); let bloom_columns = bloom_columns_map(schema.clone(), vec![0]); let fields = bloom_columns.values().cloned().collect::>(); let specify_index = BloomIndex::try_create( FunctionContext::default(), LatestBloom::VERSION, - &blocks_ref, + &blocks, bloom_columns, )? .unwrap(); @@ -355,7 +353,6 @@ fn test_string_bloom_filter() -> Result<()> { UInt8Type::from_data(vec![1, 2]), StringType::from_data(vec![&val, "bc"]), ])]; - let blocks_ref = blocks.iter().collect::>(); // The average length of the string column exceeds 256 bytes. let bloom_columns = bloom_columns_map(schema.clone(), vec![0, 1]); @@ -363,7 +360,7 @@ fn test_string_bloom_filter() -> Result<()> { let index = BloomIndex::try_create( FunctionContext::default(), LatestBloom::VERSION, - &blocks_ref, + &blocks, bloom_columns, )? .unwrap(); diff --git a/src/query/storages/fuse/src/io/mod.rs b/src/query/storages/fuse/src/io/mod.rs index 32c1995672af..638b7de24cf8 100644 --- a/src/query/storages/fuse/src/io/mod.rs +++ b/src/query/storages/fuse/src/io/mod.rs @@ -41,6 +41,7 @@ pub(crate) use write::create_index_schema; pub(crate) use write::create_inverted_index_builders; pub(crate) use write::create_tokenizer_manager; pub use write::serialize_block; +pub use write::serialize_blocks; pub use write::write_data; pub use write::BlockBuilder; pub use write::BlockSerialization; diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index f56d1756293d..5696a9d0754a 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -21,6 +21,7 @@ use chrono::Utc; use databend_common_arrow::arrow::chunk::Chunk as ArrowChunk; use databend_common_arrow::native::write::NativeWriter; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; @@ -50,18 +51,25 @@ use crate::statistics::gen_columns_statistics; use crate::statistics::ClusterStatsGenerator; use crate::FuseStorageFormat; -// TODO rename this, it is serialization, or pass in a writer(if not rename) pub fn serialize_block( write_settings: &WriteSettings, schema: &TableSchemaRef, block: DataBlock, buf: &mut Vec, +) -> Result> { + serialize_blocks(write_settings, schema, vec![block], buf) +} + +pub fn serialize_blocks( + write_settings: &WriteSettings, + schema: &TableSchemaRef, + blocks: Vec, + buf: &mut Vec, ) -> Result> { let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = - blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?; + let result = blocks_to_parquet(&schema, blocks, buf, write_settings.table_compression)?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) } @@ -85,10 +93,16 @@ pub fn serialize_block( }, ); - let batch = ArrowChunk::try_from(block)?; - writer.start()?; - writer.write(&batch)?; + assert_eq!( + blocks.len(), + 1, + "Native format does not support write multiple chunk for now" + ); + for block in blocks { + let batch = ArrowChunk::try_from(block)?; + writer.write(&batch)?; + } writer.finish()?; let mut metas = HashMap::with_capacity(writer.metas.len()); @@ -121,7 +135,7 @@ pub struct BloomIndexState { impl BloomIndexState { pub fn try_create( ctx: Arc, - block: &DataBlock, + blocks: &[DataBlock], location: Location, bloom_columns_map: BTreeMap, ) -> Result> { @@ -129,7 +143,7 @@ impl BloomIndexState { let maybe_bloom_index = BloomIndex::try_create( ctx.get_function_context()?, location.1, - &[block], + blocks, bloom_columns_map, )?; if let Some(bloom_index) = maybe_bloom_index { @@ -206,7 +220,7 @@ pub struct InvertedIndexState { impl InvertedIndexState { pub fn try_create( source_schema: &TableSchemaRef, - block: &DataBlock, + blocks: &[DataBlock], block_location: &Location, inverted_index_builder: &InvertedIndexBuilder, ) -> Result { @@ -215,7 +229,9 @@ impl InvertedIndexState { Arc::new(inverted_index_builder.schema.clone()), &inverted_index_builder.options, )?; - writer.add_block(source_schema, block)?; + for block in blocks { + writer.add_block(source_schema, block)?; + } let data = writer.finalize()?; let size = data.len() as u64; @@ -261,16 +277,29 @@ pub struct BlockBuilder { } impl BlockBuilder { - pub fn build(&self, data_block: DataBlock, f: F) -> Result + pub fn build(&self, mut data_blocks: Vec, f: F) -> Result where F: Fn(DataBlock, &ClusterStatsGenerator) -> Result<(Option, DataBlock)> { - let (cluster_stats, data_block) = f(data_block, &self.cluster_stats_gen)?; + let cluster_stats = if !self.cluster_stats_gen.cluster_key_index.is_empty() { + if data_blocks.len() != 1 { + return Err(ErrorCode::Internal(format!( + "Invalid data blocks count: {}", + data_blocks.len(), + ))); + } + let (cluster_stats, data_block) = + f(data_blocks.pop().unwrap(), &self.cluster_stats_gen)?; + data_blocks.push(data_block); + cluster_stats + } else { + None + }; let (block_location, block_id) = self.meta_locations.gen_block_location(); let bloom_index_location = self.meta_locations.block_bloom_index_location(&block_id); let bloom_index_state = BloomIndexState::try_create( self.ctx.clone(), - &data_block, + &data_blocks, bloom_index_location, self.bloom_columns_map.clone(), )?; @@ -282,23 +311,23 @@ impl BlockBuilder { for inverted_index_builder in &self.inverted_index_builders { let inverted_index_state = InvertedIndexState::try_create( &self.source_schema, - &data_block, + &data_blocks, &block_location, inverted_index_builder, )?; inverted_index_states.push(inverted_index_state); } - let row_count = data_block.num_rows() as u64; - let block_size = data_block.memory_size() as u64; + let row_count = data_blocks.iter().map(|b| b.num_rows()).sum::() as u64; + let block_size = data_blocks.iter().map(|b| b.memory_size()).sum::() as u64; let col_stats = - gen_columns_statistics(&data_block, column_distinct_count, &self.source_schema)?; + gen_columns_statistics(&data_blocks, column_distinct_count, &self.source_schema)?; let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let col_metas = serialize_block( + let col_metas = serialize_blocks( &self.write_settings, &self.source_schema, - data_block, + data_blocks, &mut buffer, )?; let file_size = buffer.len() as u64; diff --git a/src/query/storages/fuse/src/io/write/mod.rs b/src/query/storages/fuse/src/io/write/mod.rs index 1f7ce3db95eb..47992270b886 100644 --- a/src/query/storages/fuse/src/io/write/mod.rs +++ b/src/query/storages/fuse/src/io/write/mod.rs @@ -20,6 +20,7 @@ mod write_settings; pub(crate) use block_writer::create_inverted_index_builders; pub use block_writer::serialize_block; +pub use block_writer::serialize_blocks; pub use block_writer::write_data; pub use block_writer::BlockBuilder; pub use block_writer::BlockSerialization; diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index 838cb82a2948..941de0194821 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -52,7 +52,7 @@ use crate::FuseTable; enum State { Consume, NeedSerialize { - block: DataBlock, + blocks: Vec, stats_type: ClusterStatsGenType, index: Option, }, @@ -229,7 +229,7 @@ impl Processor for TransformSerializeBlock { Ok(Event::NeedConsume) } SerializeDataMeta::SerializeBlock(serialize_block) => { - if input_data.is_empty() { + if input_data.is_empty() && serialize_block.lazy_compacted_block.is_none() { // delete a whole block, block level let data_block = Self::mutation_logs(MutationLogEntry::DeletedBlock { index: serialize_block.index, @@ -238,8 +238,12 @@ impl Processor for TransformSerializeBlock { Ok(Event::NeedConsume) } else { // replace the old block + let blocks = serialize_block.lazy_compacted_block.map_or_else( + || vec![input_data], + |lazy_compacted_block| lazy_compacted_block.0, + ); self.state = State::NeedSerialize { - block: input_data, + blocks, stats_type: serialize_block.stats_type, index: Some(serialize_block.index), }; @@ -263,7 +267,7 @@ impl Processor for TransformSerializeBlock { } else { // append block self.state = State::NeedSerialize { - block: input_data, + blocks: vec![input_data], stats_type: ClusterStatsGenType::Generally, index: None, }; @@ -274,16 +278,18 @@ impl Processor for TransformSerializeBlock { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, State::Consume) { State::NeedSerialize { - block, + blocks, stats_type, index, } => { // Check if the datablock is valid, this is needed to ensure data is correct - block.check_valid()?; + for block in blocks.iter() { + block.check_valid()?; + } let serialized = self.block_builder - .build(block, |block, generator| match &stats_type { + .build(blocks, |block, generator| match &stats_type { ClusterStatsGenType::Generally => generator.gen_stats_for_append(block), ClusterStatsGenType::WithOrigin(origin_stats) => { let cluster_stats = generator diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 676c8f55f50f..30500a6790ec 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -38,6 +38,7 @@ use crate::operations::mutation::BlockCompactMutator; use crate::operations::mutation::CompactLazyPartInfo; use crate::operations::mutation::CompactSource; use crate::operations::mutation::SegmentCompactMutator; +use crate::FuseStorageFormat; use crate::FuseTable; use crate::Table; use crate::TableContext; @@ -191,16 +192,26 @@ impl FuseTable { } else { None }; + let is_lazy_compact = self.cluster_key_meta().is_none() + // native format does not support serializing multiple chunks into one file for now. + && matches!(self.storage_format, FuseStorageFormat::Parquet); // Add source pipe. pipeline.add_source( - |output| { - CompactSource::try_create( + |output| match is_lazy_compact { + false => CompactSource::::try_create( ctx.clone(), self.storage_format, block_reader.clone(), stream_ctx.clone(), output, - ) + ), + true => CompactSource::::try_create( + ctx.clone(), + self.storage_format, + block_reader.clone(), + stream_ctx.clone(), + output, + ), }, max_threads, )?; diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index 32bbb4f4cfa0..4090a788978f 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -438,7 +438,7 @@ impl AggregationContext { let serialized = GlobalIORuntime::instance() .spawn(async move { - block_builder.build(res_block, |block, generator| { + block_builder.build(vec![res_block], |block, generator| { let cluster_stats = generator.gen_with_origin_stats(&block, origin_stats.clone())?; info!( diff --git a/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs b/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs index b98943f6821e..6b29e52316c1 100644 --- a/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs +++ b/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs @@ -13,14 +13,14 @@ // limitations under the License. use databend_common_expression::BlockMetaInfo; -use databend_common_expression::BlockMetaInfoDowncast; use databend_storages_common_table_meta::meta::ClusterStatistics; use crate::operations::common::BlockMetaIndex; use crate::operations::mutation::CompactExtraInfo; use crate::operations::mutation::DeletedSegmentInfo; +use crate::operations::LazyCompactedBlock; -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub enum SerializeDataMeta { SerializeBlock(SerializeBlock), DeletedSegment(DeletedSegmentInfo), @@ -29,8 +29,8 @@ pub enum SerializeDataMeta { #[typetag::serde(name = "serialize_data_meta")] impl BlockMetaInfo for SerializeDataMeta { - fn equals(&self, info: &Box) -> bool { - SerializeDataMeta::downcast_ref_from(info).is_some_and(|other| self == other) + fn equals(&self, _info: &Box) -> bool { + unreachable!("SerializeDataMeta should not be compared") } fn clone_self(&self) -> Box { @@ -44,14 +44,23 @@ pub enum ClusterStatsGenType { WithOrigin(Option), } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct SerializeBlock { pub index: BlockMetaIndex, pub stats_type: ClusterStatsGenType, + pub lazy_compacted_block: Option, } impl SerializeBlock { - pub fn create(index: BlockMetaIndex, stats_type: ClusterStatsGenType) -> Self { - SerializeBlock { index, stats_type } + pub fn create( + index: BlockMetaIndex, + stats_type: ClusterStatsGenType, + lazy_compacted_block: Option, + ) -> SerializeBlock { + SerializeBlock { + index, + stats_type, + lazy_compacted_block, + } } } diff --git a/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs b/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs index 203a6e73ed3f..1be94f49fef2 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; @@ -40,7 +41,6 @@ use crate::operations::mutation::SerializeDataMeta; use crate::operations::BlockMetaIndex; use crate::FuseStorageFormat; use crate::MergeIOReadResult; - enum State { ReadData(Option), Concat { @@ -52,7 +52,30 @@ enum State { Finish, } -pub struct CompactSource { +#[derive(Clone)] +pub struct LazyCompactedBlock(pub Vec); + +impl Debug for LazyCompactedBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "lazy compacted {} blocks", self.0.len()) + } +} + +impl serde::Serialize for LazyCompactedBlock { + fn serialize(&self, _: S) -> Result + where S: serde::Serializer { + unimplemented!("Unimplemented serialize LazyCompactedBlock") + } +} + +impl<'de> serde::Deserialize<'de> for LazyCompactedBlock { + fn deserialize(_: D) -> Result + where D: serde::Deserializer<'de> { + unimplemented!("Unimplemented deserialize LazyCompactedBlock") + } +} + +pub struct CompactSource { state: State, ctx: Arc, block_reader: Arc, @@ -61,7 +84,7 @@ pub struct CompactSource { stream_ctx: Option, } -impl CompactSource { +impl CompactSource { pub fn try_create( ctx: Arc, storage_format: FuseStorageFormat, @@ -69,7 +92,7 @@ impl CompactSource { stream_ctx: Option, output: Arc, ) -> Result { - Ok(ProcessorPtr::create(Box::new(CompactSource { + Ok(ProcessorPtr::create(Box::new(Self { state: State::ReadData(None), ctx, block_reader, @@ -81,7 +104,7 @@ impl CompactSource { } #[async_trait::async_trait] -impl Processor for CompactSource { +impl Processor for CompactSource { fn name(&self) -> String { "CompactSource".to_string() } @@ -153,19 +176,33 @@ impl Processor for CompactSource { }) .collect::>>()?; - // concat blocks. - let block = if blocks.len() == 1 { - blocks[0].convert_to_full() - } else { - DataBlock::concat(&blocks)? + let new_block = match IS_LAZY { + true => { + let lazy_block = LazyCompactedBlock( + blocks.into_iter().map(|x| x.convert_to_full()).collect(), + ); + let meta = + Box::new(SerializeDataMeta::SerializeBlock(SerializeBlock::create( + index, + ClusterStatsGenType::Generally, + Some(lazy_block), + ))); + DataBlock::empty_with_meta(meta) + } + false => { + // concat blocks. + let block = if blocks.len() == 1 { + blocks[0].convert_to_full() + } else { + DataBlock::concat(&blocks)? + }; + let meta = Box::new(SerializeDataMeta::SerializeBlock( + SerializeBlock::create(index, ClusterStatsGenType::Generally, None), + )); + block.add_meta(Some(meta))? + } }; - let meta = Box::new(SerializeDataMeta::SerializeBlock(SerializeBlock::create( - index, - ClusterStatsGenType::Generally, - ))); - let new_block = block.add_meta(Some(meta))?; - let progress_values = ProgressValues { rows: new_block.num_rows(), bytes: new_block.memory_size(), diff --git a/src/query/storages/fuse/src/operations/mutation/processors/mod.rs b/src/query/storages/fuse/src/operations/mutation/processors/mod.rs index 3fd489fd90ce..72de53d32b08 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/mod.rs @@ -17,6 +17,7 @@ mod mutation_source; mod recluster_aggregator; pub use compact_source::CompactSource; +pub use compact_source::LazyCompactedBlock; pub use mutation_source::MutationAction; pub use mutation_source::MutationSource; pub use recluster_aggregator::ReclusterAggregator; diff --git a/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs index 919878811983..841e33a47796 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs @@ -250,6 +250,7 @@ impl Processor for MutationSource { SerializeBlock::create( self.index.clone(), self.stats_type.clone(), + None, ), )); self.state = State::Output( @@ -351,7 +352,7 @@ impl Processor for MutationSource { .iter() .try_fold(data_block, |input, op| op.execute(&func_ctx, input))?; let inner_meta = Box::new(SerializeDataMeta::SerializeBlock( - SerializeBlock::create(self.index.clone(), self.stats_type.clone()), + SerializeBlock::create(self.index.clone(), self.stats_type.clone(), None), )); let meta: BlockMetaInfoPtr = if self.block_reader.update_stream_columns() { Box::new(gen_mutation_stream_meta(Some(inner_meta), &path)?) @@ -407,7 +408,11 @@ impl Processor for MutationSource { }; self.ctx.get_write_progress().incr(&progress_values); let meta = Box::new(SerializeDataMeta::SerializeBlock( - SerializeBlock::create(self.index.clone(), self.stats_type.clone()), + SerializeBlock::create( + self.index.clone(), + self.stats_type.clone(), + None, + ), )); self.state = State::Output( self.ctx.get_partition(), diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 5aac4671ab03..4ca025c27e1d 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -549,7 +549,7 @@ impl AggregationContext { let origin_stats = block_meta.cluster_stats.clone(); let serialized = GlobalIORuntime::instance() .spawn(async move { - block_builder.build(new_block, |block, generator| { + block_builder.build(vec![new_block], |block, generator| { let cluster_stats = generator.gen_with_origin_stats(&block, origin_stats.clone())?; Ok((cluster_stats, block)) diff --git a/src/query/storages/fuse/src/statistics/block_statistics.rs b/src/query/storages/fuse/src/statistics/block_statistics.rs index cb8f8585cef5..74d73e5a0fd7 100644 --- a/src/query/storages/fuse/src/statistics/block_statistics.rs +++ b/src/query/storages/fuse/src/statistics/block_statistics.rs @@ -44,7 +44,7 @@ impl BlockStatistics { block_rows_size: data_block.num_rows() as u64, block_bytes_size: data_block.memory_size() as u64, block_column_statistics: column_statistic::gen_columns_statistics( - data_block, + &[data_block.clone()], column_distinct_count, schema, )?, diff --git a/src/query/storages/fuse/src/statistics/column_statistic.rs b/src/query/storages/fuse/src/statistics/column_statistic.rs index 718b275e6cc7..3c27d44f87bf 100644 --- a/src/query/storages/fuse/src/statistics/column_statistic.rs +++ b/src/query/storages/fuse/src/statistics/column_statistic.rs @@ -25,7 +25,7 @@ use databend_common_expression::FieldIndex; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID; -use databend_common_functions::aggregates::eval_aggr; +use databend_common_functions::aggregates::eval_unary_aggr_for_discrete_chunks; use databend_storages_common_index::Index; use databend_storages_common_index::RangeIndex; use databend_storages_common_table_meta::meta::ColumnStatistics; @@ -35,13 +35,13 @@ use databend_storages_common_table_meta::meta::StatisticsOfColumns; // 0.04f--> 10 buckets const DISTINCT_ERROR_RATE: f64 = 0.04; -pub fn calc_column_distinct_of_values(column: &Column, rows: usize) -> Result { - let distinct_values = eval_aggr( +pub fn calc_column_distinct_of_values(columns: &[Column], rows: usize) -> Result { + let distinct_values = eval_unary_aggr_for_discrete_chunks( "approx_count_distinct", vec![Scalar::Number(NumberScalar::Float64( DISTINCT_ERROR_RATE.into(), ))], - &[column.clone()], + columns, rows, )?; let col = NumberType::::try_downcast_column(&distinct_values.0).unwrap(); @@ -53,17 +53,32 @@ pub fn get_traverse_columns_dfs(data_block: &DataBlock) -> traverse::TraverseRes } pub fn gen_columns_statistics( - data_block: &DataBlock, + data_blocks: &[DataBlock], column_distinct_count: Option>, schema: &TableSchemaRef, ) -> Result { let mut statistics = StatisticsOfColumns::new(); - let data_block = data_block.convert_to_full(); - let rows = data_block.num_rows(); - - let leaves = get_traverse_columns_dfs(&data_block)?; + if data_blocks.is_empty() { + return Ok(statistics); + } + let rows = data_blocks[0].num_rows(); + let first_block = data_blocks[0].convert_to_full(); + let leaves_of_first_block: Vec<(Option, Column, DataType)> = + get_traverse_columns_dfs(&first_block)?; + let mut leaves: Vec<(Option, Vec, DataType)> = leaves_of_first_block + .into_iter() + .map(|(idx, col, data_type)| (idx, vec![col], data_type)) + .collect(); + for block in data_blocks.iter().skip(1) { + let block = block.convert_to_full(); + let leaves_of_block: Vec<(Option, Column, DataType)> = + get_traverse_columns_dfs(&block)?; + for (leave, this_leave) in leaves.iter_mut().zip(leaves_of_block.into_iter()) { + leave.1.push(this_leave.1); + } + } let leaf_column_ids = schema.to_leaf_column_ids(); - for ((col_idx, col, data_type), column_id) in leaves.iter().zip(leaf_column_ids) { + for ((col_idx, cols, data_type), column_id) in leaves.iter().zip(leaf_column_ids) { // Ignore the range index does not supported type. if !RangeIndex::supported_type(data_type) { continue; @@ -78,8 +93,8 @@ pub fn gen_columns_statistics( let mut min = Scalar::Null; let mut max = Scalar::Null; - let (mins, _) = eval_aggr("min", vec![], &[col.clone()], rows)?; - let (maxs, _) = eval_aggr("max", vec![], &[col.clone()], rows)?; + let (mins, _) = eval_unary_aggr_for_discrete_chunks("min", vec![], cols, rows)?; + let (maxs, _) = eval_unary_aggr_for_discrete_chunks("max", vec![], cols, rows)?; if mins.len() > 0 { min = if let Some(v) = mins.index(0) { @@ -105,12 +120,15 @@ pub fn gen_columns_statistics( } } - let (is_all_null, bitmap) = col.validity(); - let unset_bits = match (is_all_null, bitmap) { - (true, _) => rows, - (false, Some(bitmap)) => bitmap.unset_bits(), - (false, None) => 0, - }; + let mut unset_bits = 0; + for col in cols { + let (is_all_null, bitmap) = col.validity(); + unset_bits += match (is_all_null, bitmap) { + (true, _) => rows, + (false, Some(bitmap)) => bitmap.unset_bits(), + (false, None) => 0, + }; + } // use distinct count calculated by the xor hash function to avoid repetitive operation. let distinct_of_values = match (col_idx, &column_distinct_count) { @@ -123,13 +141,13 @@ pub fn gen_columns_statistics( *value as u64 } } else { - calc_column_distinct_of_values(col, rows)? + calc_column_distinct_of_values(cols, rows)? } } - (_, _) => calc_column_distinct_of_values(col, rows)?, + (_, _) => calc_column_distinct_of_values(cols, rows)?, }; - let in_memory_size = col.memory_size() as u64; + let in_memory_size = cols.iter().map(|c| c.memory_size()).sum::() as u64; let col_stats = ColumnStatistics::new( min, max,