Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CardinalityAwareRowConverter while doing streaming merge #7401

Merged
merged 61 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
b80e702
Use CardinalityAwareRow converter
JayjeetAtGithub Aug 24, 2023
d83fae2
Move CardinalityAwareRowConverter in df
JayjeetAtGithub Aug 29, 2023
546ccff
Move CardinalityAwareRowConverter in df
JayjeetAtGithub Aug 29, 2023
e9fb8af
Remove unnecessary clone and make wrapper mod private
JayjeetAtGithub Aug 30, 2023
c5c707a
Use as_any_dictionary_opt
JayjeetAtGithub Aug 30, 2023
22fb159
Remove unnecessary comments
JayjeetAtGithub Aug 30, 2023
65a0209
Remove done
JayjeetAtGithub Aug 30, 2023
bd5faf7
Add test for cardinality aware row converter on high card dict
JayjeetAtGithub Aug 31, 2023
80d3bf2
Add test for cardinality aware row converter on low card dict
JayjeetAtGithub Aug 31, 2023
5eff541
Ignore the test_dict_merge_infinite test
JayjeetAtGithub Aug 31, 2023
2651532
Remove phantom Arc import
JayjeetAtGithub Sep 1, 2023
7da3681
Remove the infinite stream test
JayjeetAtGithub Sep 1, 2023
6cc6468
Update datafusion/core/src/physical_plan/wrapper.rs
JayjeetAtGithub Sep 7, 2023
74aaa59
Update convert_rows signature and add empty_rows
JayjeetAtGithub Sep 8, 2023
0d305ee
Add comments to the test
JayjeetAtGithub Sep 8, 2023
2391f43
Use Some and take() semantics
JayjeetAtGithub Sep 8, 2023
05685a1
Init with a row converter instance instead of none
JayjeetAtGithub Sep 8, 2023
f107f27
Remove unused variable
JayjeetAtGithub Sep 8, 2023
e44e600
Remove unused imports
JayjeetAtGithub Sep 8, 2023
068695e
Remove unused imports
JayjeetAtGithub Sep 8, 2023
c7fa020
Change GroupValues
JayjeetAtGithub Sep 8, 2023
4c054f6
Add comments, run fmt
alamb Sep 8, 2023
c58539b
Init with a empty row converter
JayjeetAtGithub Sep 8, 2023
74533c6
Use the cardinality aware row converter
JayjeetAtGithub Sep 8, 2023
b90f40d
Reconvert the group values
JayjeetAtGithub Sep 9, 2023
4cf0218
Rename wrapper to row_converter
JayjeetAtGithub Sep 9, 2023
eb81191
Recovert the group values
JayjeetAtGithub Sep 9, 2023
d22b645
Convert back to dictionary
JayjeetAtGithub Sep 10, 2023
aa24717
fmt
alamb Sep 11, 2023
dbd66f2
A fmt pass
JayjeetAtGithub Sep 11, 2023
8a42957
fix: fmt
alamb Sep 11, 2023
1a9a58d
Move the reconversion to dict to just consider group by columns
JayjeetAtGithub Sep 11, 2023
acf1cd4
Reconvert only the correct cols
JayjeetAtGithub Sep 11, 2023
6543b9d
Use assert eq
JayjeetAtGithub Sep 11, 2023
c6bf41a
clippy
alamb Sep 11, 2023
08c6f7d
clippy
alamb Sep 11, 2023
befc9b5
Merge branch 'sort-fix' of github.com:JayjeetAtGithub/arrow-datafusio…
alamb Sep 11, 2023
db800c4
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 11, 2023
c965bbe
Add comment about the reconversion to dict
JayjeetAtGithub Sep 11, 2023
8244c83
Fix the merge issues
JayjeetAtGithub Sep 11, 2023
235f3bc
move data type conversion
alamb Sep 12, 2023
f3eb44c
fix
alamb Sep 12, 2023
664e6a0
fix docs
alamb Sep 12, 2023
9a965ca
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 12, 2023
bbc5982
fix bug
alamb Sep 12, 2023
a1f69a7
Improve tests
alamb Sep 12, 2023
76feb4f
simplify
alamb Sep 12, 2023
137d78e
Use cardinality aware row converter in gby order
alamb Sep 12, 2023
65d31cc
clippy
alamb Sep 12, 2023
9b5681d
Adjust memory test
alamb Sep 12, 2023
b2fedac
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 13, 2023
b673c09
Add doc comments about row converter
alamb Sep 13, 2023
25861a7
remove outdated comment
alamb Sep 13, 2023
355ef73
Rework partition size calculation to make test clearer
alamb Sep 13, 2023
f48325a
Increase threshold to 512
JayjeetAtGithub Sep 15, 2023
22a90e7
Update row converter tests according to new threshold
JayjeetAtGithub Sep 15, 2023
819bd09
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 18, 2023
fdb4835
fix clippy
alamb Sep 18, 2023
b49a5bd
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 18, 2023
efca2b2
fix panic
alamb Sep 18, 2023
8498d04
Adjust constant for test
alamb Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions datafusion/core/src/physical_plan/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
// under the License.

use crate::physical_plan::aggregates::group_values::GroupValues;
use crate::physical_plan::row_converter::CardinalityAwareRowConverter;
use ahash::RandomState;
use arrow::row::{RowConverter, Rows, SortField};
use arrow::row::{Rows, SortField};
use arrow_array::ArrayRef;
use arrow_schema::SchemaRef;
use datafusion_common::Result;
Expand All @@ -29,7 +30,7 @@ use hashbrown::raw::RawTable;
/// A [`GroupValues`] making use of [`Rows`]
pub struct GroupValuesRows {
/// Converter for the group values
row_converter: RowConverter,
row_converter: CardinalityAwareRowConverter,

/// Logically maps group values to a group_index in
/// [`Self::group_values`] and in each accumulator
Expand All @@ -52,7 +53,7 @@ pub struct GroupValuesRows {
/// important for multi-column group keys.
///
/// [`Row`]: arrow::row::Row
group_values: Rows,
group_values: Option<Rows>,

// buffer to be reused to store hashes
hashes_buffer: Vec<u64>,
Expand All @@ -63,7 +64,7 @@ pub struct GroupValuesRows {

impl GroupValuesRows {
pub fn try_new(schema: SchemaRef) -> Result<Self> {
let row_converter = RowConverter::new(
let row_converter = CardinalityAwareRowConverter::new(
schema
.fields()
.iter()
Expand All @@ -72,13 +73,12 @@ impl GroupValuesRows {
)?;

let map = RawTable::with_capacity(0);
let group_values = row_converter.empty_rows(0, 0);

Ok(Self {
row_converter,
map,
map_size: 0,
group_values,
group_values: None,
hashes_buffer: Default::default(),
random_state: Default::default(),
})
Expand All @@ -92,6 +92,11 @@ impl GroupValues for GroupValuesRows {
let group_rows = self.row_converter.convert_columns(cols)?;
let n_rows = group_rows.num_rows();

let mut group_values = match self.group_values.take() {
Some(group_values) => group_values,
None => self.row_converter.empty_rows(0, 0)?,
};

// tracks to which group each of the input rows belongs
groups.clear();

Expand All @@ -106,7 +111,7 @@ impl GroupValues for GroupValuesRows {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
group_rows.row(row) == self.group_values.row(*group_idx)
group_rows.row(row) == group_values.row(*group_idx)
});

let group_idx = match entry {
Expand All @@ -115,8 +120,8 @@ impl GroupValues for GroupValuesRows {
// 1.2 Need to create new entry for the group
None => {
// Add new entry to aggr_state and save newly created index
let group_idx = self.group_values.num_rows();
self.group_values.push(group_rows.row(row));
let group_idx = group_values.num_rows();
group_values.push(group_rows.row(row));

// for hasher function, use precomputed hash value
self.map.insert_accounted(
Expand All @@ -130,12 +135,14 @@ impl GroupValues for GroupValuesRows {
groups.push(group_idx);
}

self.group_values = Some(group_values);

Ok(())
}

fn size(&self) -> usize {
self.row_converter.size()
+ self.group_values.size()
+ self.group_values.as_ref().unwrap().size()
+ self.map_size
+ self.hashes_buffer.allocated_size()
}
Expand All @@ -145,25 +152,34 @@ impl GroupValues for GroupValuesRows {
}

fn len(&self) -> usize {
self.group_values.num_rows()
self.group_values
.as_ref()
.map(|group_values| group_values.num_rows())
.unwrap_or(0)
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
Ok(match emit_to {
let mut group_values = self
.group_values
.take()
.expect("Can not emit from empty rows");

let output = match emit_to {
EmitTo::All => {
// Eventually we may also want to clear the hash table here
self.row_converter.convert_rows(&self.group_values)?
let output = self.row_converter.convert_rows(&group_values)?;
group_values.clear();
output
}
EmitTo::First(n) => {
let groups_rows = self.group_values.iter().take(n);
let groups_rows = group_values.iter().take(n);
let output = self.row_converter.convert_rows(groups_rows)?;
// Clear out first n group keys by copying them to a new Rows.
// TODO file some ticket in arrow-rs to make this more efficent?
let mut new_group_values = self.row_converter.empty_rows(0, 0);
for row in self.group_values.iter().skip(n) {
let mut new_group_values = self.row_converter.empty_rows(0, 0)?;
for row in group_values.iter().skip(n) {
new_group_values.push(row);
}
std::mem::swap(&mut new_group_values, &mut self.group_values);
std::mem::swap(&mut new_group_values, &mut group_values);

// SAFETY: self.map outlives iterator and is not modified concurrently
unsafe {
Expand All @@ -179,6 +195,9 @@ impl GroupValues for GroupValuesRows {
}
output
}
})
};

self.group_values = Some(group_values);
Ok(output)
}
}
12 changes: 7 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/order/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::physical_expr::EmitTo;
use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
use crate::{
physical_expr::EmitTo, physical_plan::row_converter::CardinalityAwareRowConverter,
};
use arrow::row::{OwnedRow, Rows, SortField};
use arrow_array::ArrayRef;
use arrow_schema::Schema;
use datafusion_common::Result;
Expand Down Expand Up @@ -70,7 +72,7 @@ pub(crate) struct GroupOrderingPartial {

/// Converter for the sort key (used on the group columns
/// specified in `order_indexes`)
row_converter: RowConverter,
row_converter: CardinalityAwareRowConverter,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -124,7 +126,7 @@ impl GroupOrderingPartial {
Ok(Self {
state: State::Start,
order_indices: order_indices.to_vec(),
row_converter: RowConverter::new(fields)?,
row_converter: CardinalityAwareRowConverter::new(fields)?,
})
}

Expand All @@ -141,7 +143,7 @@ impl GroupOrderingPartial {
.map(|&idx| group_values[idx].clone())
.collect();

Ok(self.row_converter.convert_columns(&sort_values)?)
self.row_converter.convert_columns(&sort_values)
}

/// How many groups be emitted, or None if no data can be emitted
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ pub mod memory;
pub mod metrics;
pub mod projection;
pub mod repartition;
mod row_converter;
pub mod sorts;
pub mod stream;
pub mod streaming;
Expand Down
Loading
Loading