Skip to content

Commit

Permalink
Push limit into aggregation for DISTINCT ... LIMIT queries (#8038)
Browse files Browse the repository at this point in the history
* Push limit into AggregateExec for DISTINCT with GROUP BY

* Soft limit for GroupedHashAggregateStream with no aggregate expressions

* Add datafusion.optimizer.enable_distinct_aggregation_soft_limit setting

* Fix result checking in topk_aggregate benchmark

* Make the topk_aggregate benchmark's make_data function public

* Add benchmark for DISTINCT queries

* Fix doc formatting with prettier

* Minor: Simply early emit logic in GroupByHash

* remove level of indentation

* Use '///' for function comments

* Address review comments

* rename transform_local_limit to transform_limit

* Resolve conflicts

* Update test after merge with main

---------

Co-authored-by: Mark Sirek <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people committed Nov 9, 2023
1 parent 1c17c47 commit 43cc870
Show file tree
Hide file tree
Showing 19 changed files with 1,299 additions and 124 deletions.
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ config_namespace! {
config_namespace! {
/// Options related to query optimization
pub struct OptimizerOptions {
/// When set to true, the optimizer will push a limit operation into
/// grouped aggregations which have no aggregate expressions, as a soft limit,
/// emitting groups once the limit is reached, before all rows in the group are read.
pub enable_distinct_aggregation_soft_limit: bool, default = true

/// When set to true, the physical plan optimizer will try to add round robin
/// repartitioning to increase parallelism to leverage more CPU cores
pub enable_round_robin_repartition: bool, default = true
Expand Down
11 changes: 11 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ pub trait TreeNode: Sized {
after_op.map_children(|node| node.transform_down(op))
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its
/// children(Preorder Traversal) using a mutable function, `F`.
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_down_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
{
let after_op = op(self)?.into();
after_op.map_children(|node| node.transform_down_mut(op))
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
/// children and then itself(Postorder Traversal).
/// When the `op` does not apply to a given node, it is left unchanged.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ nix = { version = "0.27.1", features = ["fs"] }
harness = false
name = "aggregate_query_sql"

[[bench]]
harness = false
name = "distinct_query_sql"

[[bench]]
harness = false
name = "sort_limit_query_sql"
Expand Down
85 changes: 85 additions & 0 deletions datafusion/core/benches/data_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_array::builder::{Int64Builder, StringBuilder};
use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion_common::DataFusionError;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
use rand_distr::Distribution;
use rand_distr::{Normal, Pareto};
use std::fmt::Write;
use std::sync::Arc;

/// create an in-memory table given the partition len, array len, and batch size,
Expand Down Expand Up @@ -156,3 +161,83 @@ pub fn create_record_batches(
})
.collect::<Vec<_>>()
}

/// Create time series data with `partition_cnt` partitions and `sample_cnt` rows per partition
/// in ascending order, if `asc` is true, otherwise randomly sampled using a Pareto distribution
#[allow(dead_code)]
pub(crate) fn make_data(
partition_cnt: i32,
sample_cnt: i32,
asc: bool,
) -> Result<(Arc<Schema>, Vec<Vec<RecordBatch>>), DataFusionError> {
// constants observed from trace data
let simultaneous_group_cnt = 2000;
let fitted_shape = 12f64;
let fitted_scale = 5f64;
let mean = 0.1;
let stddev = 1.1;
let pareto = Pareto::new(fitted_scale, fitted_shape).unwrap();
let normal = Normal::new(mean, stddev).unwrap();
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);

// populate data
let schema = test_schema();
let mut partitions = vec![];
let mut cur_time = 16909000000000i64;
for _ in 0..partition_cnt {
let mut id_builder = StringBuilder::new();
let mut ts_builder = Int64Builder::new();
let gen_id = |rng: &mut rand::rngs::SmallRng| {
rng.gen::<[u8; 16]>()
.iter()
.fold(String::new(), |mut output, b| {
let _ = write!(output, "{b:02X}");
output
})
};
let gen_sample_cnt =
|mut rng: &mut rand::rngs::SmallRng| pareto.sample(&mut rng).ceil() as u32;
let mut group_ids = (0..simultaneous_group_cnt)
.map(|_| gen_id(&mut rng))
.collect::<Vec<_>>();
let mut group_sample_cnts = (0..simultaneous_group_cnt)
.map(|_| gen_sample_cnt(&mut rng))
.collect::<Vec<_>>();
for _ in 0..sample_cnt {
let random_index = rng.gen_range(0..simultaneous_group_cnt);
let trace_id = &mut group_ids[random_index];
let sample_cnt = &mut group_sample_cnts[random_index];
*sample_cnt -= 1;
if *sample_cnt == 0 {
*trace_id = gen_id(&mut rng);
*sample_cnt = gen_sample_cnt(&mut rng);
}

id_builder.append_value(trace_id);
ts_builder.append_value(cur_time);

if asc {
cur_time += 1;
} else {
let samp: f64 = normal.sample(&mut rng);
let samp = samp.round();
cur_time += samp as i64;
}
}

// convert to MemTable
let id_col = Arc::new(id_builder.finish());
let ts_col = Arc::new(ts_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![id_col, ts_col])?;
partitions.push(vec![batch]);
}
Ok((schema, partitions))
}

/// The Schema used by make_data
fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("trace_id", DataType::Utf8, false),
Field::new("timestamp_ms", DataType::Int64, false),
]))
}
208 changes: 208 additions & 0 deletions datafusion/core/benches/distinct_query_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate arrow;
extern crate datafusion;

mod data_utils;
use crate::criterion::Criterion;
use data_utils::{create_table_provider, make_data};
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::{datasource::MemTable, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;

use parking_lot::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}

fn create_context(
partitions_len: usize,
array_len: usize,
batch_size: usize,
) -> Result<Arc<Mutex<SessionContext>>> {
let ctx = SessionContext::new();
let provider = create_table_provider(partitions_len, array_len, batch_size)?;
ctx.register_table("t", provider)?;
Ok(Arc::new(Mutex::new(ctx)))
}

fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
let partitions_len = 10;
let array_len = 1 << 26; // 64 M
let batch_size = 8192;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();

let mut group = c.benchmark_group("custom-measurement-time");
group.measurement_time(Duration::from_secs(40));

group.bench_function("distinct_group_by_u64_narrow_limit_10", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10",
)
})
});

group.bench_function("distinct_group_by_u64_narrow_limit_100", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 100",
)
})
});

group.bench_function("distinct_group_by_u64_narrow_limit_1000", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 1000",
)
})
});

group.bench_function("distinct_group_by_u64_narrow_limit_10000", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10000",
)
})
});

group.bench_function("group_by_multiple_columns_limit_10", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 3, 4 LIMIT 10",
)
})
});
group.finish();
}

async fn distinct_with_limit(
plan: Arc<dyn ExecutionPlan>,
ctx: Arc<TaskContext>,
) -> Result<()> {
let batches = collect(plan, ctx).await?;
assert_eq!(batches.len(), 1);
let batch = batches.first().unwrap();
assert_eq!(batch.num_rows(), 10);

Ok(())
}

fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
let rt = Runtime::new().unwrap();
criterion::black_box(
rt.block_on(async { distinct_with_limit(plan.clone(), ctx.clone()).await }),
)
.unwrap();
}

pub async fn create_context_sampled_data(
sql: &str,
partition_cnt: i32,
sample_cnt: i32,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
let (schema, parts) = make_data(partition_cnt, sample_cnt, false /* asc */).unwrap();
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());

// Create the DataFrame
let cfg = SessionConfig::new();
let ctx = SessionContext::new_with_config(cfg);
let _ = ctx.register_table("traces", mem_table)?;
let df = ctx.sql(sql).await?;
let physical_plan = df.create_physical_plan().await?;
Ok((physical_plan, ctx.task_ctx()))
}

fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

let limit = 10;
let partitions = 100;
let samples = 100_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");

let distinct_trace_id_100_partitions_100_000_samples_limit_100 = rt.block_on(async {
create_context_sampled_data(sql.as_str(), partitions, samples)
.await
.unwrap()
});

c.bench_function(
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
|b| b.iter(|| run(distinct_trace_id_100_partitions_100_000_samples_limit_100.0.clone(),
distinct_trace_id_100_partitions_100_000_samples_limit_100.1.clone())),
);

let partitions = 10;
let samples = 1_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");

let distinct_trace_id_10_partitions_1_000_000_samples_limit_10 = rt.block_on(async {
create_context_sampled_data(sql.as_str(), partitions, samples)
.await
.unwrap()
});

c.bench_function(
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
|b| b.iter(|| run(distinct_trace_id_10_partitions_1_000_000_samples_limit_10.0.clone(),
distinct_trace_id_10_partitions_1_000_000_samples_limit_10.1.clone())),
);

let partitions = 1;
let samples = 10_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");

let rt = Runtime::new().unwrap();
let distinct_trace_id_1_partition_10_000_000_samples_limit_10 = rt.block_on(async {
create_context_sampled_data(sql.as_str(), partitions, samples)
.await
.unwrap()
});

c.bench_function(
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
|b| b.iter(|| run(distinct_trace_id_1_partition_10_000_000_samples_limit_10.0.clone(),
distinct_trace_id_1_partition_10_000_000_samples_limit_10.1.clone())),
);
}

criterion_group!(
benches,
criterion_benchmark_limited_distinct,
criterion_benchmark_limited_distinct_sampled
);
criterion_main!(benches);
Loading

0 comments on commit 43cc870

Please sign in to comment.