Skip to content

Commit

Permalink
feat: estimate selectivity by table sample (#16362)
Browse files Browse the repository at this point in the history
* feat: estimate selectivity by table sample

* save

* update

* make it work

* save

* fix estimate row

* add time budget

* add test
  • Loading branch information
xudong963 committed Sep 5, 2024
1 parent 111d3f8 commit fb9fc9a
Show file tree
Hide file tree
Showing 21 changed files with 1,065 additions and 152 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::ServiceQueryExecutor;
use crate::servers::http::v1::ClientSessionManager;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
Expand Down Expand Up @@ -205,7 +206,10 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
///
/// This function is used to plan the SQL. If an error occurs, we will log the query start and finished.
pub async fn interpreter_plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<(Plan, PlanExtras)> {
let mut planner = Planner::new(ctx.clone());
let mut planner = Planner::new_with_sample_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
);
let result = planner.plan_sql(sql).await;
let short_sql = short_sql(sql.to_string());
let mut stmt = if let Ok((_, extras)) = &result {
Expand Down
31 changes: 31 additions & 0 deletions src/query/service/src/schedulers/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@

use std::sync::Arc;

use async_trait::async_trait;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_sql::optimizer::QuerySampleExecutor;
use futures_util::TryStreamExt;

use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
use crate::pipelines::PipelineBuilder;
use crate::schedulers::Fragmenter;
Expand All @@ -24,6 +30,7 @@ use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::PhysicalPlan;
use crate::sql::ColumnBinding;
use crate::stream::PullingExecutorStream;

/// Build query pipeline from physical plan.
/// If plan is distributed plan it will build_distributed_pipeline
Expand Down Expand Up @@ -106,3 +113,27 @@ pub async fn build_distributed_pipeline(
build_res.set_max_threads(settings.get_max_threads()? as usize);
Ok(build_res)
}

pub struct ServiceQueryExecutor {
ctx: Arc<QueryContext>,
}

impl ServiceQueryExecutor {
pub fn new(ctx: Arc<QueryContext>) -> Self {
Self { ctx }
}
}

#[async_trait]
impl QuerySampleExecutor for ServiceQueryExecutor {
async fn execute_query(&self, plan: &PhysicalPlan) -> Result<Vec<DataBlock>> {
let build_res = build_query_pipeline_without_render_result_set(&self.ctx, plan).await?;
let settings = ExecutorSettings::try_create(self.ctx.clone())?;
let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
self.ctx.set_executor(pulling_executor.get_inner())?;

PullingExecutorStream::create(pulling_executor)?
.try_collect::<Vec<DataBlock>>()
.await
}
}
8 changes: 7 additions & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,13 @@ impl DefaultSettings {
desc: "Seed for random function",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
})
}),
("dynamic_sample_time_budget_ms", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Time budget for dynamic sample in milliseconds",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,4 +701,8 @@ impl Settings {
pub fn get_random_function_seed(&self) -> Result<bool> {
Ok(self.try_get_u64("random_function_seed")? == 1)
}

pub fn get_dynamic_sample_time_budget_ms(&self) -> Result<u64> {
self.try_get_u64("dynamic_sample_time_budget_ms")
}
}
1 change: 1 addition & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ serde = { workspace = true }
sha2 = { workspace = true }
simsearch = "0.2"
time = "0.3.14"
tokio = "1.39.2"
url = "2.3.1"

[lints]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use databend_common_base::base::tokio::time::Instant;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;

use crate::optimizer::dynamic_sample::filter_selectivity_sample::filter_selectivity_sample;
use crate::optimizer::dynamic_sample::join_selectivity_sample::join_selectivity_sample;
use crate::optimizer::QuerySampleExecutor;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::optimizer::StatInfo;
use crate::plans::Operator;
use crate::plans::RelOperator;
use crate::MetadataRef;

#[async_recursion::async_recursion(#[recursive::recursive])]
pub async fn dynamic_sample(
ctx: Arc<dyn TableContext>,
metadata: MetadataRef,
s_expr: &SExpr,
sample_executor: Arc<dyn QuerySampleExecutor>,
) -> Result<Arc<StatInfo>> {
let time_budget =
Duration::from_millis(ctx.get_settings().get_dynamic_sample_time_budget_ms()?);
let start_time = Instant::now();

async fn sample_with_budget<F, Fut>(
start_time: Instant,
time_budget: Duration,
fallback: F,
sample_fn: impl FnOnce() -> Fut,
) -> Result<Arc<StatInfo>>
where
F: FnOnce() -> Result<Arc<StatInfo>>,
Fut: std::future::Future<Output = Result<Arc<StatInfo>>>,
{
if time_budget.as_millis() == 0 || start_time.elapsed() > time_budget {
fallback()
} else {
let remaining_time = time_budget - start_time.elapsed();
match tokio::time::timeout(remaining_time, sample_fn()).await {
Ok(Ok(result)) => Ok(result),
// The error contains the timeout error or the error from the sample_fn
Ok(Err(_)) | Err(_) => fallback(),
}
}
}

match s_expr.plan() {
RelOperator::Filter(_) => {
sample_with_budget(
start_time,
time_budget,
|| {
let rel_expr = RelExpr::with_s_expr(s_expr);
rel_expr.derive_cardinality()
},
|| filter_selectivity_sample(ctx, metadata, s_expr, sample_executor),
)
.await
}
RelOperator::Join(_) => {
join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await
}
RelOperator::Scan(_) => s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)),
// Todo: add more operators here, and support more query patterns.
_ => {
let rel_expr = RelExpr::with_s_expr(s_expr);
rel_expr.derive_cardinality()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use databend_common_ast::ast::Sample;
use databend_common_ast::ast::SampleConfig;
use databend_common_ast::ast::SampleLevel;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use num_traits::ToPrimitive;

use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::statistics::CollectStatisticsOptimizer;
use crate::optimizer::QuerySampleExecutor;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::optimizer::SelectivityEstimator;
use crate::optimizer::StatInfo;
use crate::plans::Aggregate;
use crate::plans::AggregateFunction;
use crate::plans::AggregateMode;
use crate::plans::RelOperator;
use crate::plans::ScalarItem;
use crate::MetadataRef;
use crate::ScalarExpr;

pub async fn filter_selectivity_sample(
ctx: Arc<dyn TableContext>,
metadata: MetadataRef,
s_expr: &SExpr,
sample_executor: Arc<dyn QuerySampleExecutor>,
) -> Result<Arc<StatInfo>> {
// filter cardinality by sample will be called in `dphyp`, so we can ensure the filter is in complex query(contains not only one table)
// Because it's meaningless for filter cardinality by sample in single table query.
let child = s_expr.child(0)?;
let child_rel_expr = RelExpr::with_s_expr(child);
if let RelOperator::Scan(mut scan) = child.plan().clone() {
let num_rows = scan
.statistics
.table_stats
.as_ref()
.and_then(|s| s.num_rows)
.unwrap_or(0);

// Calculate sample size (0.2% of total data)
let sample_size = (num_rows as f64 * 0.002).ceil();
let mut new_s_expr = s_expr.clone();
// If the table is too small, we don't need to sample.
if sample_size >= 10.0 {
scan.sample = Some(Sample {
sample_level: SampleLevel::ROW,
sample_conf: SampleConfig::RowsNum(sample_size),
});
let new_child = SExpr::create_leaf(Arc::new(RelOperator::Scan(scan)));
new_s_expr = s_expr.replace_children(vec![Arc::new(new_child)]);
let collect_statistics_optimizer =
CollectStatisticsOptimizer::new(ctx.clone(), metadata.clone());
new_s_expr = collect_statistics_optimizer.run(&new_s_expr).await?;
}

new_s_expr = SExpr::create_unary(
Arc::new(create_count_aggregate(AggregateMode::Partial).into()),
Arc::new(new_s_expr),
);
new_s_expr = SExpr::create_unary(
Arc::new(create_count_aggregate(AggregateMode::Final).into()),
Arc::new(new_s_expr),
);

let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx.clone(), false);
let mut required = HashSet::new();
required.insert(0);
let plan = builder.build(&new_s_expr, required).await?;

let result = sample_executor.execute_query(&plan).await?;
if let Some(block) = result.first() {
if let Some(count) = block.get_last_column().as_number() {
if let Some(number_scalar) = count.index(0) {
// Compute and return selectivity
let selectivity = number_scalar.to_f64().to_f64().unwrap() / sample_size;
let mut statistics = child_rel_expr.derive_cardinality()?.statistics.clone();
let mut sb = SelectivityEstimator::new(&mut statistics, HashSet::new());
sb.update_other_statistic_by_selectivity(selectivity);
let stat_info = Arc::new(StatInfo {
cardinality: (selectivity * num_rows as f64).ceil(),
statistics,
});
*s_expr.stat_info.lock().unwrap() = Some(stat_info.clone());
return Ok(stat_info);
}
}
}
}
Err(ErrorCode::Internal(
"Failed to calculate filter selectivity by sample".to_string(),
))
}

fn create_count_aggregate(mode: AggregateMode) -> Aggregate {
Aggregate {
mode,
group_items: vec![],
aggregate_functions: vec![ScalarItem {
scalar: ScalarExpr::AggregateFunction(AggregateFunction {
func_name: "count".to_string(),
distinct: false,
params: vec![],
args: vec![],
return_type: Box::new(DataType::Number(NumberDataType::UInt64)),
display_name: "".to_string(),
}),
index: 0,
}],
from_distinct: false,
limit: None,
grouping_sets: None,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;

use crate::optimizer::dynamic_sample::dynamic_sample;
use crate::optimizer::QuerySampleExecutor;
use crate::optimizer::SExpr;
use crate::optimizer::StatInfo;
use crate::plans::Join;
use crate::MetadataRef;

pub async fn join_selectivity_sample(
ctx: Arc<dyn TableContext>,
metadata: MetadataRef,
s_expr: &SExpr,
sample_executor: Arc<dyn QuerySampleExecutor>,
) -> Result<Arc<StatInfo>> {
let left_stat_info = dynamic_sample(
ctx.clone(),
metadata.clone(),
s_expr.child(0)?,
sample_executor.clone(),
)
.await?;
let right_stat_info = dynamic_sample(
ctx.clone(),
metadata.clone(),
s_expr.child(1)?,
sample_executor.clone(),
)
.await?;
let join = Join::try_from(s_expr.plan().clone())?;
join.derive_join_stats(left_stat_info, right_stat_info)
}
Loading

0 comments on commit fb9fc9a

Please sign in to comment.