Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): add more logs on aggregation #16552

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,26 @@ useless_format = "allow"
mutable_key_type = "allow"
result_large_err = "allow"

## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile.
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## select max(number) from numbers_mt(10000000000); ~ 3x performance
# [profile.release]
# debug = 1
# lto = "thin"
# overflow-checks = false
# incremental = false
# codegen-units = 1

[profile.release]
debug = 1
lto = "thin"
overflow-checks = false
opt-level = "s" ## defaults to be 3
incremental = false
opt-level = "s"

# codegen-units = 1 # Reduce number of codegen units to increase optimizations.

# [profile.release.package]
# arrow2 = { codegen-units = 4 }
# common-functions = { codegen-units = 16 }
# databend-common-arrow = { codegen-units = 16 }
# databend-query = { codegen-units = 4 }
# databend-binaries = { codegen-units = 4 }

Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct AggregateHashTable {
// use for append rows directly during deserialize
pub direct_append: bool,
pub config: HashTableConfig,

current_radix_bits: u64,
entries: Vec<Entry>,
count: usize,
Expand Down Expand Up @@ -585,6 +586,7 @@ impl AggregateHashTable {
.iter()
.map(|arena| arena.allocated_bytes())
.sum::<usize>()
+ self.entries.len() * std::mem::size_of::<Entry>()
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ impl DataBlock {
self.columns().iter().map(|entry| entry.memory_size()).sum()
}

pub fn consume_convert_to_full(self) -> Self {
if self
.columns()
.iter()
.all(|entry| entry.value.as_column().is_some())
{
return self;
}

self.convert_to_full()
}

pub fn convert_to_full(&self) -> Self {
let columns = self
.columns()
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl DataBlock {
let arrow_schema = table_schema_to_arrow_schema(table_schema);
let mut arrays = Vec::with_capacity(self.columns().len());
for (entry, arrow_field) in self
.convert_to_full()
.consume_convert_to_full()
.columns()
.iter()
.zip(arrow_schema.fields())
Expand Down
50 changes: 50 additions & 0 deletions src/query/expression/tests/it/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_expression::FromData;
use databend_common_expression::SortColumnDescription;

use crate::common::new_block;
use crate::rand_block_for_all_types;

#[test]
fn test_block_sort() -> Result<()> {
Expand Down Expand Up @@ -201,3 +202,52 @@ fn test_block_sort() -> Result<()> {

Ok(())
}

#[test]
fn sort_concat() {
// Sort(Sort A || Sort B) = Sort (A || B)
use databend_common_expression::DataBlock;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::Rng;

let mut rng = rand::thread_rng();
let num_blocks = 100;

for _i in 0..num_blocks {
let block_a = rand_block_for_all_types(rng.gen_range(0..100));
let block_b = rand_block_for_all_types(rng.gen_range(0..100));

let mut sort_index: Vec<usize> = (0..block_a.num_columns()).collect();
sort_index.shuffle(&mut rng);

let sort_desc = sort_index
.iter()
.map(|i| SortColumnDescription {
offset: *i,
asc: rng.gen_bool(0.5),
nulls_first: rng.gen_bool(0.5),
is_nullable: rng.gen_bool(0.5),
})
.collect_vec();

let concat_ab_0 = DataBlock::concat(&[block_a.clone(), block_b.clone()]).unwrap();

let sort_a = DataBlock::sort(&block_a, &sort_desc, None).unwrap();
let sort_b = DataBlock::sort(&block_b, &sort_desc, None).unwrap();
let concat_ab_1 = DataBlock::concat(&[sort_a, sort_b]).unwrap();

let block_1 = DataBlock::sort(&concat_ab_0, &sort_desc, None).unwrap();
let block_2 = DataBlock::sort(&concat_ab_1, &sort_desc, None).unwrap();

assert_eq!(block_1.num_columns(), block_2.num_columns());
assert_eq!(block_1.num_rows(), block_2.num_rows());

let columns_1 = block_1.columns();
let columns_2 = block_2.columns();
for idx in 0..columns_1.len() {
assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type);
assert_eq!(columns_1[idx].value, columns_2[idx].value);
}
}
}
1 change: 0 additions & 1 deletion src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ lexical-core = "0.8.5"
libm = "0.2.6"
match-template = { workspace = true }
md-5 = "0.10.5"
multiversion = "0.7.4"
naive-cityhash = "0.2.0"
num-traits = "0.2.15"
once_cell = { workspace = true }
Expand Down
31 changes: 31 additions & 0 deletions src/query/functions/src/aggregates/aggregate_min_max_any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::*;
Expand Down Expand Up @@ -92,6 +93,36 @@ where
Ok(())
}

fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
function_data: Option<&dyn FunctionData>,
) -> Result<()> {
let column_len = T::column_len(&other);
if column_len == 0 {
return Ok(());
}

let column_iter = T::iter_column(&other);
if let Some(validity) = validity {
if validity.unset_bits() == column_len {
return Ok(());
}
for (data, valid) in column_iter.zip(validity.iter()) {
if valid {
let _ = self.add(data, function_data);
}
}
} else {
let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r });
if let Some(v) = v {
let _ = self.add(v, function_data);
}
}
Ok(())
}

fn merge(&mut self, rhs: &Self) -> Result<()> {
if let Some(v) = &rhs.value {
self.add(T::to_scalar_ref(v), None)?;
Expand Down
42 changes: 29 additions & 13 deletions src/query/functions/src/aggregates/aggregate_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::*;
Expand Down Expand Up @@ -80,21 +81,33 @@ where
}
}

#[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))]
fn sum_batch<T, N>(other: T::Column) -> N::Scalar
// #[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))]
#[inline]
pub fn sum_batch<T, TSum>(inner: Buffer<T>, validity: Option<&Bitmap>) -> TSum
where
T: ValueType + Sync + Send,
N: ValueType,
T::Scalar: Number + AsPrimitive<N::Scalar>,
N::Scalar: Number + AsPrimitive<f64> + std::ops::AddAssign,
for<'a> T::ScalarRef<'a>: Number + AsPrimitive<N::Scalar>,
T: Number + AsPrimitive<TSum>,
TSum: Number + std::ops::AddAssign,
{
// use temp variable to hint the compiler to unroll the loop
let mut sum = N::Scalar::default();
for value in T::iter_column(&other) {
sum += value.as_();
match validity {
Some(v) if v.unset_bits() > 0 => {
let mut sum = TSum::default();
inner.iter().zip(v.iter()).for_each(|(t, b)| {
if b {
sum += t.as_();
}
});

sum
}
_ => {
let mut sum = TSum::default();
inner.iter().for_each(|t| {
sum += t.as_();
});

sum
}
}
sum
}

impl<T, N> UnaryState<T, N> for NumberSumState<N>
Expand All @@ -117,9 +130,12 @@ where
fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
self.value += sum_batch::<T, N>(other);
let col = T::upcast_column(other);
let buffer = NumberType::<T::Scalar>::try_downcast_column(&col).unwrap();
self.value += sum_batch::<T::Scalar, N::Scalar>(buffer, validity);
Ok(())
}

Expand Down
30 changes: 16 additions & 14 deletions src/query/functions/src/aggregates/aggregate_unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,22 @@ where
fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
function_data: Option<&dyn FunctionData>,
) -> Result<()> {
for value in T::iter_column(&other) {
self.add(value, function_data)?;
match validity {
Some(validity) => {
for (data, valid) in T::iter_column(&other).zip(validity.iter()) {
if valid {
self.add(data, function_data)?;
}
}
}
None => {
for value in T::iter_column(&other) {
self.add(value, function_data)?;
}
}
}
Ok(())
}
Expand Down Expand Up @@ -206,18 +218,8 @@ where
) -> Result<()> {
let column = T::try_downcast_column(&columns[0]).unwrap();
let state: &mut S = place.get::<S>();
match validity {
Some(bitmap) if bitmap.unset_bits() > 0 => {
let column_iter = T::iter_column(&column);
for (value, is_valid) in column_iter.zip(bitmap.iter()) {
if is_valid {
state.add(value, self.function_data.as_deref())?;
}
}
Ok(())
}
_ => state.add_batch(column, self.function_data.as_deref()),
}

state.add_batch(column, validity, self.function_data.as_deref())
}

fn accumulate_row(&self, place: StateAddr, columns: InputColumns, row: usize) -> Result<()> {
Expand Down
Loading
Loading