diff --git a/src/query/service/src/pipelines/builders/builder_compact.rs b/src/query/service/src/pipelines/builders/builder_compact.rs index 0888f5571715..03fadf872e31 100644 --- a/src/query/service/src/pipelines/builders/builder_compact.rs +++ b/src/query/service/src/pipelines/builders/builder_compact.rs @@ -12,15 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_base::runtime::Runtime; +use databend_common_catalog::plan::PartInfoType; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; +use databend_common_catalog::plan::Projection; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_pipeline_sources::EmptySource; -use databend_common_sql::executor::physical_plans::CompactSource; +use databend_common_pipeline_sources::PrefetchAsyncSourcer; +use databend_common_pipeline_transforms::processors::TransformPipelineHelper; +use databend_common_sql::executor::physical_plans::CompactSource as PhysicalCompactSource; +use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_sql::StreamContext; +use databend_common_storages_fuse::operations::BlockCompactMutator; +use databend_common_storages_fuse::operations::CompactLazyPartInfo; +use databend_common_storages_fuse::operations::CompactSource; +use databend_common_storages_fuse::operations::CompactTransform; +use databend_common_storages_fuse::operations::TableMutationAggregator; +use databend_common_storages_fuse::operations::TransformSerializeBlock; use databend_common_storages_fuse::FuseTable; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { - pub(crate) fn build_compact_source(&mut self, compact_block: &CompactSource) -> Result<()> { + pub(crate) fn build_compact_source( + &mut self, + compact_block: &PhysicalCompactSource, + ) -> Result<()> { let table = self .ctx .build_table_by_table_info(&compact_block.table_info, None)?; @@ -30,11 +50,120 @@ impl PipelineBuilder { return self.main_pipeline.add_source(EmptySource::create, 1); } - table.build_compact_source( + let is_lazy = compact_block.parts.partitions_type() == PartInfoType::LazyLevel; + let thresholds = table.get_block_thresholds(); + let cluster_key_id = table.cluster_key_id(); + let mut max_threads = self.ctx.get_settings().get_max_threads()? as usize; + + if is_lazy { + let query_ctx = self.ctx.clone(); + + let lazy_parts = compact_block + .parts + .partitions + .iter() + .map(|v| { + v.as_any() + .downcast_ref::() + .unwrap() + .clone() + }) + .collect::>(); + + let column_ids = compact_block.column_ids.clone(); + self.main_pipeline.set_on_init(move || { + let ctx = query_ctx.clone(); + let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { + let partitions = BlockCompactMutator::build_compact_tasks( + ctx.clone(), + column_ids.clone(), + cluster_key_id, + thresholds, + lazy_parts, + ) + .await?; + + Result::<_>::Ok(partitions) + })?; + + let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions); + query_ctx.set_partitions(partitions)?; + Ok(()) + }); + } else { + max_threads = max_threads.min(compact_block.parts.len()).max(1); + self.ctx.set_partitions(compact_block.parts.clone())?; + } + + let block_reader = table.create_block_reader( + self.ctx.clone(), + Projection::Columns(table.all_column_indices()), + false, + table.change_tracking_enabled(), + false, + )?; + let stream_ctx = if table.change_tracking_enabled() { + Some(StreamContext::try_create( + self.ctx.get_function_context()?, + table.schema_with_stream(), + table.get_table_info().ident.seq, + false, + false, + )?) + } else { + None + }; + // Add source pipe. + self.main_pipeline.add_source( + |output| { + let source = CompactSource::create(self.ctx.clone(), block_reader.clone(), 1); + PrefetchAsyncSourcer::create(self.ctx.clone(), output, source) + }, + max_threads, + )?; + let storage_format = table.get_storage_format(); + self.main_pipeline.add_block_meta_transformer(|| { + CompactTransform::create( + self.ctx.clone(), + block_reader.clone(), + storage_format, + stream_ctx.clone(), + ) + }); + + // sort + let cluster_stats_gen = table.cluster_gen_for_append( self.ctx.clone(), - compact_block.parts.clone(), - compact_block.column_ids.clone(), &mut self.main_pipeline, - ) + thresholds, + None, + )?; + self.main_pipeline.add_transform(|input, output| { + let proc = TransformSerializeBlock::try_create( + self.ctx.clone(), + input, + output, + table, + cluster_stats_gen.clone(), + MutationKind::Compact, + )?; + proc.into_processor() + })?; + + if is_lazy { + self.main_pipeline.try_resize(1)?; + self.main_pipeline.add_async_accumulating_transformer(|| { + TableMutationAggregator::create( + table, + self.ctx.clone(), + vec![], + vec![], + vec![], + Default::default(), + MutationKind::Compact, + ) + }); + } + Ok(()) } } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 1895b6ac8bb0..71ef0b77de75 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -461,6 +461,10 @@ impl FuseTable { }; Ok(retention_period) } + + pub fn get_storage_format(&self) -> FuseStorageFormat { + self.storage_format + } } #[async_trait::async_trait] diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 491c089c382a..d17fe2f72cb6 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -12,31 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; -use databend_common_base::runtime::Runtime; -use databend_common_catalog::plan::PartInfoType; use databend_common_catalog::plan::Partitions; -use databend_common_catalog::plan::PartitionsShuffleKind; -use databend_common_catalog::plan::Projection; use databend_common_catalog::table::CompactionLimits; use databend_common_exception::Result; -use databend_common_expression::ColumnId; use databend_common_expression::ComputedExpr; use databend_common_expression::FieldIndex; -use databend_common_pipeline_core::Pipeline; -use databend_common_pipeline_transforms::processors::TransformPipelineHelper; -use databend_common_sql::executor::physical_plans::MutationKind; -use databend_common_sql::StreamContext; -use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; -use crate::operations::common::TableMutationAggregator; -use crate::operations::common::TransformSerializeBlock; use crate::operations::mutation::BlockCompactMutator; -use crate::operations::mutation::CompactLazyPartInfo; -use crate::operations::mutation::CompactSource; use crate::operations::mutation::SegmentCompactMutator; use crate::FuseTable; use crate::Table; @@ -119,125 +104,6 @@ impl FuseTable { ))) } - pub fn build_compact_source( - &self, - ctx: Arc, - parts: Partitions, - column_ids: HashSet, - pipeline: &mut Pipeline, - ) -> Result<()> { - let is_lazy = parts.partitions_type() == PartInfoType::LazyLevel; - let thresholds = self.get_block_thresholds(); - let cluster_key_id = self.cluster_key_id(); - let mut max_threads = ctx.get_settings().get_max_threads()? as usize; - - if is_lazy { - let query_ctx = ctx.clone(); - - let lazy_parts = parts - .partitions - .into_iter() - .map(|v| { - v.as_any() - .downcast_ref::() - .unwrap() - .clone() - }) - .collect::>(); - - pipeline.set_on_init(move || { - let ctx = query_ctx.clone(); - let column_ids = column_ids.clone(); - let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { - let partitions = BlockCompactMutator::build_compact_tasks( - ctx.clone(), - column_ids, - cluster_key_id, - thresholds, - lazy_parts, - ) - .await?; - - Result::<_>::Ok(partitions) - })?; - - let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions); - query_ctx.set_partitions(partitions)?; - Ok(()) - }); - } else { - max_threads = max_threads.min(parts.len()).max(1); - ctx.set_partitions(parts)?; - } - - let all_column_indices = self.all_column_indices(); - let projection = Projection::Columns(all_column_indices); - let block_reader = self.create_block_reader( - ctx.clone(), - projection, - false, - self.change_tracking_enabled(), - false, - )?; - let stream_ctx = if self.change_tracking_enabled() { - Some(StreamContext::try_create( - ctx.get_function_context()?, - self.schema_with_stream(), - self.get_table_info().ident.seq, - false, - false, - )?) - } else { - None - }; - // Add source pipe. - pipeline.add_source( - |output| { - CompactSource::try_create( - ctx.clone(), - self.storage_format, - block_reader.clone(), - stream_ctx.clone(), - output, - ) - }, - max_threads, - )?; - - // sort - let cluster_stats_gen = - self.cluster_gen_for_append(ctx.clone(), pipeline, thresholds, None)?; - pipeline.add_transform( - |input: Arc, output| { - let proc = TransformSerializeBlock::try_create( - ctx.clone(), - input, - output, - self, - cluster_stats_gen.clone(), - MutationKind::Compact, - )?; - proc.into_processor() - }, - )?; - - if is_lazy { - pipeline.try_resize(1)?; - pipeline.add_async_accumulating_transformer(|| { - TableMutationAggregator::create( - self, - ctx.clone(), - vec![], - vec![], - vec![], - Statistics::default(), - MutationKind::Compact, - ) - }); - } - Ok(()) - } - async fn compact_options_with_segment_limit( &self, num_segment_limit: Option, diff --git a/src/query/storages/fuse/src/operations/mutation/meta/mod.rs b/src/query/storages/fuse/src/operations/mutation/meta/mod.rs index 776b7d48979b..0a14e6b7ad28 100644 --- a/src/query/storages/fuse/src/operations/mutation/meta/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/meta/mod.rs @@ -21,6 +21,7 @@ pub use compact_part::CompactExtraInfo; pub use compact_part::CompactLazyPartInfo; pub use compact_part::CompactTaskInfo; pub use mutation_meta::ClusterStatsGenType; +pub use mutation_meta::CompactSourceMeta; pub use mutation_meta::SerializeBlock; pub use mutation_meta::SerializeDataMeta; pub use mutation_part::DeletedSegmentInfo; diff --git a/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs b/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs index b98943f6821e..913462add22c 100644 --- a/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs +++ b/src/query/storages/fuse/src/operations/mutation/meta/mutation_meta.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoDowncast; +use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use crate::operations::common::BlockMetaIndex; use crate::operations::mutation::CompactExtraInfo; use crate::operations::mutation::DeletedSegmentInfo; +use crate::MergeIOReadResult; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] pub enum SerializeDataMeta { @@ -55,3 +59,43 @@ impl SerializeBlock { SerializeBlock { index, stats_type } } } + +pub enum CompactSourceMeta { + Concat { + read_res: Vec, + metas: Vec>, + index: BlockMetaIndex, + }, + Extras(CompactExtraInfo), +} + +#[typetag::serde(name = "compact_data_source")] +impl BlockMetaInfo for CompactSourceMeta { + fn equals(&self, _: &Box) -> bool { + unimplemented!("Unimplemented equals CompactSourceMeta") + } + + fn clone_self(&self) -> Box { + unimplemented!("Unimplemented clone CompactSourceMeta") + } +} + +impl std::fmt::Debug for CompactSourceMeta { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("CompactSourceMeta").finish() + } +} + +impl serde::Serialize for CompactSourceMeta { + fn serialize(&self, _: S) -> std::result::Result + where S: serde::Serializer { + unimplemented!("Unimplemented serialize DataSourceMeta") + } +} + +impl<'de> serde::Deserialize<'de> for CompactSourceMeta { + fn deserialize(_: D) -> std::result::Result + where D: serde::Deserializer<'de> { + unimplemented!("Unimplemented deserialize DataSourceMeta") + } +} diff --git a/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs b/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs index 203a6e73ed3f..0256651a7a3b 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/compact_source.rs @@ -12,125 +12,148 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::sync::Arc; use std::time::Instant; use databend_common_base::base::ProgressValues; use databend_common_catalog::plan::gen_mutation_stream_meta; -use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_metrics::storage::*; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sources::PrefetchAsyncSource; +use databend_common_pipeline_transforms::processors::BlockMetaTransform; +use databend_common_pipeline_transforms::processors::UnknownMode; use databend_common_sql::StreamContext; -use databend_storages_common_table_meta::meta::BlockMeta; use crate::io::BlockReader; use crate::io::ReadSettings; -use crate::operations::mutation::ClusterStatsGenType; -use crate::operations::mutation::CompactBlockPartInfo; -use crate::operations::mutation::SerializeBlock; -use crate::operations::mutation::SerializeDataMeta; -use crate::operations::BlockMetaIndex; +use crate::operations::ClusterStatsGenType; +use crate::operations::CompactBlockPartInfo; +use crate::operations::CompactSourceMeta; +use crate::operations::SerializeBlock; +use crate::operations::SerializeDataMeta; use crate::FuseStorageFormat; -use crate::MergeIOReadResult; - -enum State { - ReadData(Option), - Concat { - read_res: Vec, - metas: Vec>, - index: BlockMetaIndex, - }, - Output(Option, DataBlock), - Finish, -} pub struct CompactSource { - state: State, ctx: Arc, block_reader: Arc, - storage_format: FuseStorageFormat, - output: Arc, - stream_ctx: Option, + prefetch_num: usize, } impl CompactSource { - pub fn try_create( + pub fn create( ctx: Arc, - storage_format: FuseStorageFormat, block_reader: Arc, - stream_ctx: Option, - output: Arc, - ) -> Result { - Ok(ProcessorPtr::create(Box::new(CompactSource { - state: State::ReadData(None), + prefetch_num: usize, + ) -> Self { + Self { ctx, block_reader, - storage_format, - output, - stream_ctx, - }))) + prefetch_num, + } } } #[async_trait::async_trait] -impl Processor for CompactSource { - fn name(&self) -> String { - "CompactSource".to_string() - } +impl PrefetchAsyncSource for CompactSource { + const NAME: &'static str = "CompactSource"; - fn as_any(&mut self) -> &mut dyn Any { - self + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + fn is_full(&self, prefetched: &[DataBlock]) -> bool { + prefetched.len() >= self.prefetch_num } - fn event(&mut self) -> Result { - if matches!(self.state, State::ReadData(None)) { - self.state = self - .ctx - .get_partition() - .map_or(State::Finish, |part| State::ReadData(Some(part))); - } + async fn generate(&mut self) -> Result> { + let part = match self.ctx.get_partition() { + Some(part) => part, + None => return Ok(None), + }; - if self.output.is_finished() { - return Ok(Event::Finished); - } + let part = CompactBlockPartInfo::from_part(&part)?; + let meta = match part { + CompactBlockPartInfo::CompactTaskInfo(task) => { + let mut task_futures = Vec::new(); + for block in &task.blocks { + let settings = ReadSettings::from_ctx(&self.ctx)?; + let block_reader = self.block_reader.clone(); + let block = block.clone(); + // read block in parallel. + task_futures.push(async move { + databend_common_base::runtime::spawn(async move { + // Perf + { + metrics_inc_compact_block_read_nums(1); + metrics_inc_compact_block_read_bytes(block.block_size); + } + + block_reader + .read_columns_data_by_merge_io( + &settings, + &block.location.0, + &block.col_metas, + &None, + ) + .await + }) + .await + .unwrap() + }); + } - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } + let start = Instant::now(); - match self.state { - State::ReadData(_) => Ok(Event::Async), - State::Concat { .. } => Ok(Event::Sync), - State::Output(_, _) => { - if let State::Output(part, data_block) = - std::mem::replace(&mut self.state, State::Finish) + let read_res = futures::future::try_join_all(task_futures).await?; + // Perf. { - self.state = part.map_or(State::Finish, |part| State::ReadData(Some(part))); - - self.output.push_data(Ok(data_block)); - Ok(Event::NeedConsume) - } else { - Err(ErrorCode::Internal("It's a bug.")) + metrics_inc_compact_block_read_milliseconds(start.elapsed().as_millis() as u64); } + Box::new(CompactSourceMeta::Concat { + read_res, + metas: task.blocks.clone(), + index: task.index.clone(), + }) } - State::Finish => { - self.output.finish(); - Ok(Event::Finished) + CompactBlockPartInfo::CompactExtraInfo(extra) => { + Box::new(CompactSourceMeta::Extras(extra.clone())) } + }; + Ok(Some(DataBlock::empty_with_meta(meta))) + } +} + +pub struct CompactTransform { + ctx: Arc, + block_reader: Arc, + storage_format: FuseStorageFormat, + stream_ctx: Option, +} + +impl CompactTransform { + pub fn create( + ctx: Arc, + block_reader: Arc, + storage_format: FuseStorageFormat, + stream_ctx: Option, + ) -> Self { + Self { + ctx, + block_reader, + storage_format, + stream_ctx, } } +} + +#[async_trait::async_trait] +impl BlockMetaTransform for CompactTransform { + const UNKNOWN_MODE: UnknownMode = UnknownMode::Pass; + const NAME: &'static str = "CompactTransform"; - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::Concat { + fn transform(&mut self, meta: CompactSourceMeta) -> Result> { + match meta { + CompactSourceMeta::Concat { read_res, metas, index, @@ -171,70 +194,12 @@ impl Processor for CompactSource { bytes: new_block.memory_size(), }; self.ctx.get_write_progress().incr(&progress_values); - - self.state = State::Output(self.ctx.get_partition(), new_block); + Ok(vec![new_block]) } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - Ok(()) - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::ReadData(Some(part)) => { - let block_reader = self.block_reader.as_ref(); - - // block read tasks. - let mut task_futures = Vec::new(); - let part = CompactBlockPartInfo::from_part(&part)?; - match part { - CompactBlockPartInfo::CompactExtraInfo(extra) => { - let meta = Box::new(SerializeDataMeta::CompactExtras(extra.clone())); - let block = DataBlock::empty_with_meta(meta); - self.state = State::Output(self.ctx.get_partition(), block); - } - CompactBlockPartInfo::CompactTaskInfo(task) => { - for block in &task.blocks { - let settings = ReadSettings::from_ctx(&self.ctx)?; - // read block in parallel. - task_futures.push(async move { - // Perf - { - metrics_inc_compact_block_read_nums(1); - metrics_inc_compact_block_read_bytes(block.block_size); - } - - block_reader - .read_columns_data_by_merge_io( - &settings, - &block.location.0, - &block.col_metas, - &None, - ) - .await - }); - } - - let start = Instant::now(); - - let read_res = futures::future::try_join_all(task_futures).await?; - // Perf. - { - metrics_inc_compact_block_read_milliseconds( - start.elapsed().as_millis() as u64, - ); - } - self.state = State::Concat { - read_res, - metas: task.blocks.clone(), - index: task.index.clone(), - }; - } - } - Ok(()) + CompactSourceMeta::Extras(extra) => { + let meta = Box::new(SerializeDataMeta::CompactExtras(extra)); + Ok(vec![DataBlock::empty_with_meta(meta)]) } - _ => Err(ErrorCode::Internal("It's a bug.")), } } } diff --git a/src/query/storages/fuse/src/operations/mutation/processors/mod.rs b/src/query/storages/fuse/src/operations/mutation/processors/mod.rs index f86df532bad5..ae312d9f438e 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/mod.rs @@ -16,5 +16,6 @@ mod compact_source; mod mutation_source; pub use compact_source::CompactSource; +pub use compact_source::CompactTransform; pub use mutation_source::MutationAction; pub use mutation_source::MutationSource;