Skip to content

Commit

Permalink
fix(executor): fix query log lost for http kill (#15715)
Browse files Browse the repository at this point in the history
* Revert "chore(executor): revert  #15693 (#15711)"

This reverts commit b7c6b08.

* fix(query): uncomment test
  • Loading branch information
zhang2014 committed Jun 3, 2024
1 parent c6df5f3 commit fd7d1da
Show file tree
Hide file tree
Showing 20 changed files with 579 additions and 238 deletions.
409 changes: 409 additions & 0 deletions src/query/pipeline/core/src/finished_chain.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/query/pipeline/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@

pub mod processors;

mod finished_chain;
mod input_error;
mod lock_guard;
mod pipe;
mod pipeline;
mod pipeline_display;
mod unsafe_cell_wrap;

pub use finished_chain::always_callback;
pub use finished_chain::Callback;
pub use finished_chain::ExecutionInfo;
pub use finished_chain::FinishedCallbackChain;
pub use input_error::InputError;
pub use lock_guard::LockGuard;
pub use lock_guard::UnlockApi;
Expand Down
95 changes: 23 additions & 72 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use databend_common_exception::Result;
use log::info;
use petgraph::matrix_graph::Zero;

use crate::finished_chain::Callback;
use crate::finished_chain::ExecutionInfo;
use crate::finished_chain::FinishedCallbackChain;
use crate::pipe::Pipe;
use crate::pipe::PipeItem;
use crate::processors::DuplicateProcessor;
Expand All @@ -37,7 +40,6 @@ use crate::processors::ProcessorPtr;
use crate::processors::ResizeProcessor;
use crate::processors::ShuffleProcessor;
use crate::LockGuard;
use crate::PlanProfile;
use crate::SinkPipeBuilder;
use crate::SourcePipeBuilder;
use crate::TransformPipeBuilder;
Expand All @@ -64,9 +66,10 @@ pub struct Pipeline {
max_threads: usize,
pub pipes: Vec<Pipe>,
on_init: Option<InitCallback>,
on_finished: Option<FinishedCallback>,
lock_guards: Vec<LockGuard>,

on_finished_chain: FinishedCallbackChain,

plans_scope: Vec<PlanScope>,
scope_size: Arc<AtomicUsize>,
}
Expand All @@ -79,9 +82,6 @@ impl Debug for Pipeline {

pub type InitCallback = Box<dyn FnOnce() -> Result<()> + Send + Sync + 'static>;

pub type FinishedCallback =
Box<dyn FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static>;

pub type DynTransformBuilder = Box<dyn Fn(Arc<InputPort>, Arc<OutputPort>) -> Result<ProcessorPtr>>;

impl Pipeline {
Expand All @@ -90,9 +90,9 @@ impl Pipeline {
max_threads: 0,
pipes: Vec::new(),
on_init: None,
on_finished: None,
lock_guards: vec![],
plans_scope: vec![],
on_finished_chain: FinishedCallbackChain::create(),
scope_size: Arc::new(AtomicUsize::new(0)),
}
}
Expand All @@ -104,8 +104,8 @@ impl Pipeline {
max_threads: 0,
pipes: Vec::new(),
on_init: None,
on_finished: None,
lock_guards: vec![],
on_finished_chain: FinishedCallbackChain::create(),
plans_scope: scope,
}
}
Expand Down Expand Up @@ -470,66 +470,18 @@ impl Pipeline {
self.on_init = Some(Box::new(f));
}

// Set param into last
#[track_caller]
pub fn set_on_finished<
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
>(
&mut self,
f: F,
) {
pub fn set_on_finished<F: Callback>(&mut self, f: F) {
let location = std::panic::Location::caller();
if let Some(on_finished) = self.on_finished.take() {
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
on_finished((profiles, may_error))?;
let instants = Instant::now();
let _guard = defer(move || {
info!(
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
instants.elapsed(),
location.file(),
location.line(),
location.column()
);
});

f((profiles, may_error))
}));
return;
}

self.on_finished = Some(Box::new(f));
self.on_finished_chain.push_back(location, Box::new(f));
}

// Lift current and set param into first
#[track_caller]
pub fn push_front_on_finished_callback<
F: FnOnce((&Vec<PlanProfile>, &Result<()>)) -> Result<()> + Send + Sync + 'static,
>(
&mut self,
f: F,
) {
pub fn lift_on_finished<F: Callback>(&mut self, f: F) {
let location = std::panic::Location::caller();
if let Some(on_finished) = self.on_finished.take() {
self.on_finished = Some(Box::new(move |(profiles, may_error)| {
let instants = Instant::now();
let guard = defer(move || {
info!(
"OnFinished callback elapsed: {:?} while in {}:{}:{}",
instants.elapsed(),
location.file(),
location.line(),
location.column()
);
});

f((profiles, may_error))?;
drop(guard);
on_finished((profiles, may_error))
}));

return;
}

self.on_finished = Some(Box::new(f));
self.on_finished_chain.push_front(location, Box::new(f));
}

pub fn take_on_init(&mut self) -> InitCallback {
Expand All @@ -539,11 +491,10 @@ impl Pipeline {
}
}

pub fn take_on_finished(&mut self) -> FinishedCallback {
match self.on_finished.take() {
None => Box::new(|_may_error| Ok(())),
Some(on_finished) => on_finished,
}
pub fn take_on_finished(&mut self) -> FinishedCallbackChain {
let mut chain = FinishedCallbackChain::create();
std::mem::swap(&mut self.on_finished_chain, &mut chain);
chain
}

pub fn add_plan_scope(&mut self, scope: PlanScope) -> PlanScopeGuard {
Expand All @@ -565,13 +516,13 @@ impl Drop for Pipeline {
fn drop(&mut self) {
drop_guard(move || {
// An error may have occurred before the executor was created.
if let Some(on_finished) = self.on_finished.take() {
let cause = Err(ErrorCode::Internal(
"Pipeline illegal state: not successfully shutdown.",
));
let cause = Err(ErrorCode::Internal(
"Pipeline illegal state: not successfully shutdown.",
));

let _ = (on_finished)((&vec![], &cause));
}
let _ = self
.on_finished_chain
.apply(ExecutionInfo::create(cause, vec![]));
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::plans::LockTableOption;
Expand Down Expand Up @@ -73,7 +74,7 @@ async fn do_hook_compact(
return Ok(());
}

pipeline.set_on_finished(move |(_profiles, err)| {
pipeline.set_on_finished(move |info: &ExecutionInfo| {
let compaction_limits = match compact_target.mutation_kind {
MutationKind::Insert => {
let compaction_num_block_hint = ctx.get_compaction_num_block_hint();
Expand Down Expand Up @@ -101,7 +102,7 @@ async fn do_hook_compact(
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);

let compact_start_at = Instant::now();
if err.is_ok() {
if info.res.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, lock_opt)
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_meta_app::schema::IndexMeta;
use databend_common_meta_app::schema::ListIndexesByIdReq;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::Plan;
Expand Down Expand Up @@ -64,8 +65,8 @@ pub async fn hook_refresh(
return;
}

pipeline.set_on_finished(move |(_profiles, err)| {
if err.is_ok() {
pipeline.set_on_finished(move |info: &ExecutionInfo| {
if info.res.is_ok() {
info!("execute pipeline finished successfully, starting run refresh job.");
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, lock_opt)) {
Ok(_) => {
Expand Down
14 changes: 8 additions & 6 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::SendableDataBlockStream;
use databend_common_pipeline_core::always_callback;
use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::SourcePipeBuilder;
use databend_common_sql::plans::Plan;
use databend_common_sql::PlanExtras;
Expand Down Expand Up @@ -110,11 +112,11 @@ pub trait Interpreter: Sync + Send {
let query_ctx = ctx.clone();
build_res
.main_pipeline
.set_on_finished(move |(plans_profile, may_error)| {
.set_on_finished(always_callback(move |info: &ExecutionInfo| {
let mut has_profiles = false;
// Standalone mode or query executed is successfully
if query_ctx.get_cluster().is_empty() || may_error.is_ok() {
query_ctx.add_query_profiles(plans_profile);
if query_ctx.get_cluster().is_empty() || info.res.is_ok() {
query_ctx.add_query_profiles(&info.profiling);

let query_profiles = query_ctx.get_query_profiles();

Expand All @@ -141,17 +143,17 @@ pub trait Interpreter: Sync + Send {

hook_vacuum_temp_files(&query_ctx)?;

let err_opt = match may_error {
let err_opt = match &info.res {
Ok(_) => None,
Err(e) => Some(e.clone()),
};

log_query_finished(&query_ctx, err_opt, has_profiles);
match may_error {
match &info.res {
Ok(_) => Ok(()),
Err(error) => Err(error.clone()),
}
});
}));

ctx.set_status_info("executing pipeline");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_license::license_manager::get_license_manager;
use databend_common_meta_app::schema::IndexMeta;
use databend_common_meta_app::schema::UpdateIndexReq;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::TableScan;
Expand Down Expand Up @@ -375,7 +376,7 @@ impl Interpreter for RefreshIndexInterpreter {

build_res
.main_pipeline
.set_on_finished(move |(_profiles, may_error)| match may_error {
.set_on_finished(move |info: &ExecutionInfo| match &info.res {
Ok(_) => GlobalIORuntime::instance()
.block_on(async move { modify_last_update(ctx, req).await }),
Err(error_code) => Err(error_code.clone()),
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
use databend_common_meta_store::MetaStore;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipe;
use databend_common_pipeline_core::PipeItem;
use databend_common_pipeline_core::Pipeline;
Expand Down Expand Up @@ -167,8 +168,9 @@ impl SelectInterpreter {
let catalog = self.ctx.get_catalog(catalog_name).await?;
let query_id = self.ctx.get_id();
let auto_commit = !self.ctx.txn_mgr().lock().is_active();
build_res.main_pipeline.set_on_finished(
move |(_profiles, may_error)| match may_error {
build_res
.main_pipeline
.set_on_finished(move |info: &ExecutionInfo| match &info.res {
Ok(_) => GlobalIORuntime::instance().block_on(async move {
info!(
"Updating the stream meta to consume data, query_id: {}",
Expand Down Expand Up @@ -196,8 +198,7 @@ impl SelectInterpreter {
}
}),
Err(error_code) => Err(error_code.clone()),
},
);
});
}
Ok(build_res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::TableStatistics;
use databend_common_meta_types::MatchSeq;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::field_default_value;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::BloomIndexColumns;
Expand Down Expand Up @@ -274,10 +275,10 @@ impl CreateTableInterpreter {

pipeline
.main_pipeline
.push_front_on_finished_callback(move |(_profiles, err)| {
.lift_on_finished(move |info: &ExecutionInfo| {
let qualified_table_name = format!("{}.{}", db_name, table_name);

if err.is_ok() {
if info.res.is_ok() {
info!(
"create_table_as_select {} success, commit table meta data by table id {}",
qualified_table_name, table_id
Expand Down
11 changes: 6 additions & 5 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::TableInfo;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::CommitSink;
use databend_common_sql::executor::physical_plans::CompactSource;
Expand Down Expand Up @@ -248,9 +249,8 @@ impl OptimizeTableInterpreter {
let ctx = self.ctx.clone();
let plan = self.plan.clone();
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |(_profiles, may_error)| match may_error {
build_res.main_pipeline.set_on_finished(
move |info: &ExecutionInfo| match &info.res {
Ok(_) => InterpreterClusteringHistory::write_log(
&ctx,
start,
Expand All @@ -259,7 +259,8 @@ impl OptimizeTableInterpreter {
reclustered_block_count,
),
Err(error_code) => Err(error_code.clone()),
});
},
);
}
}
} else {
Expand All @@ -274,7 +275,7 @@ impl OptimizeTableInterpreter {
} else {
build_res
.main_pipeline
.set_on_finished(move |(_profiles, may_error)| match may_error {
.set_on_finished(move |info: &ExecutionInfo| match &info.res {
Ok(_) => GlobalIORuntime::instance()
.block_on(async move { purge(ctx, catalog, plan, None).await }),
Err(error_code) => Err(error_code.clone()),
Expand Down
Loading

0 comments on commit fd7d1da

Please sign in to comment.