Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce memory copy during the compact process #15598

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/query/functions/src/aggregates/aggregator_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,42 @@ pub fn eval_aggr(
Ok((builder.build(), data_type))
}

/// Evaluates a unary aggregation function for multiple discrete chunks.
/// This function treats the input columns as multiple inputs to a unary function.
pub fn eval_unary_aggr_for_discrete_chunks(
name: &str,
params: Vec<Scalar>,
columns: &[Column],
rows: usize,
) -> Result<(Column, DataType)> {
if columns.is_empty() {
return Err(ErrorCode::BadArguments(format!(
"No input columns provided for aggregation function '{}'",
name
)));
}
if !columns
.iter()
.all(|x| x.data_type() == columns[0].data_type())
{
return Err(ErrorCode::BadArguments(
"All input columns must have the same data type".to_string(),
));
}
let factory = AggregateFunctionFactory::instance();

let func = factory.get(name, params, vec![columns[0].data_type()])?;
let data_type = func.return_type()?;

let eval = EvalAggr::new(func.clone());
for col in columns.chunks(1) {
func.accumulate(eval.addr, col, None, rows)?;
}
let mut builder = ColumnBuilder::with_capacity(&data_type, 1024);
func.merge_result(eval.addr, &mut builder)?;
Ok((builder.build(), data_type))
}

#[inline]
pub fn borsh_serialize_state<W: std::io::Write, T: BorshSerialize>(
writer: &mut W,
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<'a> BlockWriter<'a> {
let maybe_bloom_index = BloomIndex::try_create(
FunctionContext::default(),
location.1,
&[block],
&[block.clone()],
bloom_columns_map,
)?;
if let Some(bloom_index) = maybe_bloom_index {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn generate_blocks(fuse_table: &FuseTable, num_blocks: usize) -> Result<Ve

let blocks: std::vec::Vec<DataBlock> = stream.try_collect().await?;
for block in blocks {
let stats = gen_columns_statistics(&block, None, &schema)?;
let stats = gen_columns_statistics(&[block.clone()], None, &schema)?;
let (block_meta, _index_meta) = block_writer
.write(FuseStorageFormat::Parquet, &schema, block, stats, None)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ async fn setup() -> databend_common_exception::Result<FileMetaData> {
let block = DataBlock::new_from_columns(columns);
let operator = Operator::new(opendal::services::Memory::default())?.finish();
let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned());
let col_stats = gen_columns_statistics(&block, None, &schema)?;
let col_stats = gen_columns_statistics(&[block.clone()], None, &schema)?;
let block_writer = BlockWriter::new(&operator, &loc_generator);
let (_block_meta, thrift_file_meta) = block_writer
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn test_compact() -> Result<()> {
.get_table(&ctx.get_tenant(), &db_name, &tbl_name)
.await?;
let res = do_compact(ctx.clone(), table.clone()).await;
assert!(res.is_ok());
assert!(res.is_ok(), "{:?}", res);
assert!(res.unwrap());

// check count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ impl CompactSegmentTestFixture {
for block in blocks {
let block = block?;

let col_stats = gen_columns_statistics(&block, None, &schema)?;
let col_stats = gen_columns_statistics(&[block.clone()], None, &schema)?;

let cluster_stats = if num_blocks % 5 == 0 {
None
Expand Down
12 changes: 6 additions & 6 deletions src/query/service/tests/it/storages/fuse/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn test_ft_stats_block_stats() -> databend_common_exception::Result<()> {
StringType::from_data(vec!["aa", "aa", "bb"]),
]);

let r = gen_columns_statistics(&block, None, &schema)?;
let r = gen_columns_statistics(&[block.clone()], None, &schema)?;
assert_eq!(2, r.len());
let col_stats = r.get(&0).unwrap();
assert_eq!(col_stats.min(), &Scalar::Number(NumberScalar::Int32(1)));
Expand All @@ -95,7 +95,7 @@ fn test_ft_stats_block_stats_with_column_distinct_count() -> databend_common_exc
let mut column_distinct_count = HashMap::new();
column_distinct_count.insert(0, 3);
column_distinct_count.insert(1, 2);
let r = gen_columns_statistics(&block, Some(column_distinct_count), &schema)?;
let r = gen_columns_statistics(&[block.clone()], Some(column_distinct_count), &schema)?;
assert_eq!(2, r.len());
let col_stats = r.get(&0).unwrap();
assert_eq!(col_stats.min(), &Scalar::Number(NumberScalar::Int32(1)));
Expand Down Expand Up @@ -129,7 +129,7 @@ fn test_ft_tuple_stats_block_stats() -> databend_common_exception::Result<()> {

let block = DataBlock::new_from_columns(vec![column]);

let r = gen_columns_statistics(&block, None, &schema)?;
let r = gen_columns_statistics(&[block.clone()], None, &schema)?;
assert_eq!(2, r.len());
let col0_stats = r.get(&0).unwrap();
assert_eq!(col0_stats.min(), &Scalar::Number(NumberScalar::Int32(1)));
Expand All @@ -151,7 +151,7 @@ fn test_ft_stats_col_stats_reduce() -> databend_common_exception::Result<()> {
TestFixture::gen_sample_blocks_ex(num_of_blocks, rows_per_block, val_start_with);
let col_stats = blocks
.iter()
.map(|b| gen_columns_statistics(&b.clone().unwrap(), None, &schema))
.map(|b| gen_columns_statistics(&[b.clone().unwrap()], None, &schema))
.collect::<databend_common_exception::Result<Vec<_>>>()?;
let r = reducers::reduce_block_statistics(&col_stats);
assert_eq!(3, r.len());
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn test_accumulator() -> databend_common_exception::Result<()> {
let loc_generator = TableMetaLocationGenerator::with_prefix("/".to_owned());
for item in blocks {
let block = item?;
let col_stats = gen_columns_statistics(&block, None, &schema)?;
let col_stats = gen_columns_statistics(&[block.clone()], None, &schema)?;
let block_writer = BlockWriter::new(&operator, &loc_generator);
let (block_meta, _index_meta) = block_writer
.write(FuseStorageFormat::Parquet, &schema, block, col_stats, None)
Expand Down Expand Up @@ -528,7 +528,7 @@ fn test_ft_stats_block_stats_string_columns_trimming_using_eval()
let max_expr = max_col.0.index(0).unwrap();

// generate the statistics of column
let stats_of_columns = gen_columns_statistics(&block, None, &schema).unwrap();
let stats_of_columns = gen_columns_statistics(&[block], None, &schema).unwrap();

// check if the max value (untrimmed) is in degenerated condition:
// - the length of string value is larger or equal than STRING_PREFIX_LEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn gen_sample_block() -> (DataBlock, Vec<Column>, TableSchemaRef) {
#[test]
fn test_column_statistic() -> Result<()> {
let (sample_block, sample_cols, schema) = gen_sample_block();
let col_stats = gen_columns_statistics(&sample_block, None, &schema)?;
let col_stats = gen_columns_statistics(&[sample_block], None, &schema)?;

assert_eq!(5, col_stats.len());

Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/common/index/src/bloom_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl BloomIndex {
pub fn try_create(
func_ctx: FunctionContext,
version: u64,
data_blocks_tobe_indexed: &[&DataBlock],
data_blocks_tobe_indexed: &[DataBlock],
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
) -> Result<Option<Self>> {
if data_blocks_tobe_indexed.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,13 @@ fn test_bloom_filter() -> Result<()> {
)),
]),
];
let blocks_ref = blocks.iter().collect::<Vec<_>>();

let bloom_columns = bloom_columns_map(schema.clone(), vec![0, 1, 2, 3]);
let bloom_fields = bloom_columns.values().cloned().collect::<Vec<_>>();
let index = BloomIndex::try_create(
FunctionContext::default(),
LatestBloom::VERSION,
&blocks_ref,
&blocks,
bloom_columns,
)?
.unwrap();
Expand Down Expand Up @@ -316,14 +315,13 @@ fn test_specify_bloom_filter() -> Result<()> {
UInt8Type::from_data(vec![1, 2]),
StringType::from_data(vec!["a", "b"]),
])];
let blocks_ref = blocks.iter().collect::<Vec<_>>();

let bloom_columns = bloom_columns_map(schema.clone(), vec![0]);
let fields = bloom_columns.values().cloned().collect::<Vec<_>>();
let specify_index = BloomIndex::try_create(
FunctionContext::default(),
LatestBloom::VERSION,
&blocks_ref,
&blocks,
bloom_columns,
)?
.unwrap();
Expand Down Expand Up @@ -355,15 +353,14 @@ fn test_string_bloom_filter() -> Result<()> {
UInt8Type::from_data(vec![1, 2]),
StringType::from_data(vec![&val, "bc"]),
])];
let blocks_ref = blocks.iter().collect::<Vec<_>>();

// The average length of the string column exceeds 256 bytes.
let bloom_columns = bloom_columns_map(schema.clone(), vec![0, 1]);
let fields = bloom_columns.values().cloned().collect::<Vec<_>>();
let index = BloomIndex::try_create(
FunctionContext::default(),
LatestBloom::VERSION,
&blocks_ref,
&blocks,
bloom_columns,
)?
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) use write::create_index_schema;
pub(crate) use write::create_inverted_index_builders;
pub(crate) use write::create_tokenizer_manager;
pub use write::serialize_block;
pub use write::serialize_blocks;
pub use write::write_data;
pub use write::BlockBuilder;
pub use write::BlockSerialization;
Expand Down
67 changes: 48 additions & 19 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use chrono::Utc;
use databend_common_arrow::arrow::chunk::Chunk as ArrowChunk;
use databend_common_arrow::native::write::NativeWriter;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::DataBlock;
Expand Down Expand Up @@ -50,18 +51,25 @@ use crate::statistics::gen_columns_statistics;
use crate::statistics::ClusterStatsGenerator;
use crate::FuseStorageFormat;

// TODO rename this, it is serialization, or pass in a writer(if not rename)
pub fn serialize_block(
write_settings: &WriteSettings,
schema: &TableSchemaRef,
block: DataBlock,
buf: &mut Vec<u8>,
) -> Result<HashMap<ColumnId, ColumnMeta>> {
serialize_blocks(write_settings, schema, vec![block], buf)
}

pub fn serialize_blocks(
write_settings: &WriteSettings,
schema: &TableSchemaRef,
blocks: Vec<DataBlock>,
buf: &mut Vec<u8>,
) -> Result<HashMap<ColumnId, ColumnMeta>> {
let schema = Arc::new(schema.remove_virtual_computed_fields());
match write_settings.storage_format {
FuseStorageFormat::Parquet => {
let result =
blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?;
let result = blocks_to_parquet(&schema, blocks, buf, write_settings.table_compression)?;
let meta = column_parquet_metas(&result, &schema)?;
Ok(meta)
}
Expand All @@ -85,10 +93,16 @@ pub fn serialize_block(
},
);

let batch = ArrowChunk::try_from(block)?;

writer.start()?;
writer.write(&batch)?;
assert_eq!(
blocks.len(),
1,
"Native format does not support write multiple chunk for now"
);
for block in blocks {
let batch = ArrowChunk::try_from(block)?;
writer.write(&batch)?;
}
writer.finish()?;

let mut metas = HashMap::with_capacity(writer.metas.len());
Expand Down Expand Up @@ -121,15 +135,15 @@ pub struct BloomIndexState {
impl BloomIndexState {
pub fn try_create(
ctx: Arc<dyn TableContext>,
block: &DataBlock,
blocks: &[DataBlock],
location: Location,
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
) -> Result<Option<Self>> {
// write index
let maybe_bloom_index = BloomIndex::try_create(
ctx.get_function_context()?,
location.1,
&[block],
blocks,
bloom_columns_map,
)?;
if let Some(bloom_index) = maybe_bloom_index {
Expand Down Expand Up @@ -206,7 +220,7 @@ pub struct InvertedIndexState {
impl InvertedIndexState {
pub fn try_create(
source_schema: &TableSchemaRef,
block: &DataBlock,
blocks: &[DataBlock],
block_location: &Location,
inverted_index_builder: &InvertedIndexBuilder,
) -> Result<Self> {
Expand All @@ -215,7 +229,9 @@ impl InvertedIndexState {
Arc::new(inverted_index_builder.schema.clone()),
&inverted_index_builder.options,
)?;
writer.add_block(source_schema, block)?;
for block in blocks {
writer.add_block(source_schema, block)?;
}
let data = writer.finalize()?;
let size = data.len() as u64;

Expand Down Expand Up @@ -261,16 +277,29 @@ pub struct BlockBuilder {
}

impl BlockBuilder {
pub fn build<F>(&self, data_block: DataBlock, f: F) -> Result<BlockSerialization>
pub fn build<F>(&self, mut data_blocks: Vec<DataBlock>, f: F) -> Result<BlockSerialization>
where F: Fn(DataBlock, &ClusterStatsGenerator) -> Result<(Option<ClusterStatistics>, DataBlock)>
{
let (cluster_stats, data_block) = f(data_block, &self.cluster_stats_gen)?;
let cluster_stats = if !self.cluster_stats_gen.cluster_key_index.is_empty() {
if data_blocks.len() != 1 {
return Err(ErrorCode::Internal(format!(
"Invalid data blocks count: {}",
data_blocks.len(),
)));
}
let (cluster_stats, data_block) =
f(data_blocks.pop().unwrap(), &self.cluster_stats_gen)?;
data_blocks.push(data_block);
cluster_stats
} else {
None
};
let (block_location, block_id) = self.meta_locations.gen_block_location();

let bloom_index_location = self.meta_locations.block_bloom_index_location(&block_id);
let bloom_index_state = BloomIndexState::try_create(
self.ctx.clone(),
&data_block,
&data_blocks,
bloom_index_location,
self.bloom_columns_map.clone(),
)?;
Expand All @@ -282,23 +311,23 @@ impl BlockBuilder {
for inverted_index_builder in &self.inverted_index_builders {
let inverted_index_state = InvertedIndexState::try_create(
&self.source_schema,
&data_block,
&data_blocks,
&block_location,
inverted_index_builder,
)?;
inverted_index_states.push(inverted_index_state);
}

let row_count = data_block.num_rows() as u64;
let block_size = data_block.memory_size() as u64;
let row_count = data_blocks.iter().map(|b| b.num_rows()).sum::<usize>() as u64;
let block_size = data_blocks.iter().map(|b| b.memory_size()).sum::<usize>() as u64;
let col_stats =
gen_columns_statistics(&data_block, column_distinct_count, &self.source_schema)?;
gen_columns_statistics(&data_blocks, column_distinct_count, &self.source_schema)?;

let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let col_metas = serialize_block(
let col_metas = serialize_blocks(
&self.write_settings,
&self.source_schema,
data_block,
data_blocks,
&mut buffer,
)?;
let file_size = buffer.len() as u64;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod write_settings;

pub(crate) use block_writer::create_inverted_index_builders;
pub use block_writer::serialize_block;
pub use block_writer::serialize_blocks;
pub use block_writer::write_data;
pub use block_writer::BlockBuilder;
pub use block_writer::BlockSerialization;
Expand Down
Loading
Loading