diff --git a/src/query/pipeline/core/src/finished_chain.rs b/src/query/pipeline/core/src/finished_chain.rs new file mode 100644 index 000000000000..690caa09ab64 --- /dev/null +++ b/src/query/pipeline/core/src/finished_chain.rs @@ -0,0 +1,409 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::fmt::Write; +use std::panic::Location; +use std::time::Duration; +use std::time::Instant; + +use databend_common_base::runtime::catch_unwind; +use databend_common_exception::Result; +use log::info; + +use crate::PlanProfile; + +pub struct ExecutionInfo { + pub res: Result<()>, + pub profiling: Vec, +} + +impl ExecutionInfo { + pub fn create(res: Result<()>, profiling: Vec) -> ExecutionInfo { + ExecutionInfo { res, profiling } + } +} + +pub trait Callback: Send + Sync + 'static { + fn always_call(&self) -> bool { + false + } + + fn apply(self: Box, info: &ExecutionInfo) -> Result<()>; +} + +struct ApplyState { + is_always: bool, + is_interrupt: bool, + successfully: bool, + elapsed: Duration, + location: &'static Location<'static>, +} + +pub struct FinishedCallbackChain { + chain: VecDeque<(&'static Location<'static>, Box)>, +} + +impl FinishedCallbackChain { + pub fn create() -> FinishedCallbackChain { + FinishedCallbackChain { + chain: VecDeque::new(), + } + } + + pub fn push_front(&mut self, location: &'static Location<'static>, f: Box) { + self.chain.push_front((location, f)) + } + + pub fn push_back(&mut self, location: &'static Location<'static>, f: Box) { + self.chain.push_back((location, f)) + } + + pub fn apply(&mut self, mut info: ExecutionInfo) -> Result<()> { + let chain = std::mem::take(&mut self.chain); + + let mut states = Vec::with_capacity(chain.len()); + let mut callbacks = Vec::with_capacity(chain.len()); + let mut always_callbacks = Vec::with_capacity(chain.len()); + + for (location, callback) in chain.into_iter() { + if !callback.always_call() { + callbacks.push((location, callback)); + } else { + always_callbacks.push((location, callback)); + } + } + + let mut apply_res = Ok(()); + for (location, callback) in callbacks { + if apply_res.is_err() { + states.push(ApplyState { + location, + is_always: false, + is_interrupt: true, + successfully: false, + elapsed: Duration::from_secs(0), + }); + + continue; + } + + let instant = Instant::now(); + if let Err(cause) = callback.apply(&info) { + states.push(ApplyState { + location, + is_always: false, + is_interrupt: false, + successfully: false, + elapsed: instant.elapsed(), + }); + + info.res = Err(cause.clone()); + + apply_res = Err(cause); + continue; + } + + states.push(ApplyState { + location, + is_always: false, + is_interrupt: false, + successfully: true, + elapsed: instant.elapsed(), + }); + } + + Self::apply_always(info, states, always_callbacks); + apply_res + } + + fn apply_always( + info: ExecutionInfo, + mut states: Vec, + always_callbacks: Vec<(&'static Location<'static>, Box)>, + ) { + for (location, always_callback) in always_callbacks { + let instant = Instant::now(); + states.push(match always_callback.apply(&info) { + Ok(_) => ApplyState { + location, + is_always: true, + is_interrupt: false, + successfully: true, + elapsed: instant.elapsed(), + }, + Err(_cause) => ApplyState { + location, + is_always: true, + is_interrupt: false, + successfully: false, + elapsed: instant.elapsed(), + }, + }); + } + + Self::log_states(&states); + } + + pub fn extend(&mut self, other: FinishedCallbackChain) { + self.chain.extend(other.chain) + } + + fn log_states(apply_states: &[ApplyState]) { + let mut message = String::new(); + writeln!(&mut message, "Executor apply finished callback state:").unwrap(); + for apply_state in apply_states { + let execute_state = match apply_state.successfully { + true => "\u{2705}", + false => "\u{274C}", + }; + + let always_state = match apply_state.is_always { + true => "(always) ", + false => match apply_state.is_interrupt { + true => "(interrupt) ", + false => "", + }, + }; + + writeln!( + &mut message, + "β”œβ”€β”€{}:{:?} - {}{}:{}:{}", + execute_state, + apply_state.elapsed, + always_state, + apply_state.location.file(), + apply_state.location.line(), + apply_state.location.column() + ) + .unwrap(); + } + + info!("{}", message); + } +} + +impl Result<()> + Send + Sync + 'static> Callback for T { + fn apply(self: Box, info: &ExecutionInfo) -> Result<()> { + match catch_unwind(move || self(info)) { + Ok(Ok(_)) => Ok(()), + Err(cause) => Err(cause), + Ok(Err(cause)) => Err(cause), + } + } +} + +pub struct AlwaysCallback { + inner: Box, +} + +impl Callback for AlwaysCallback { + fn always_call(&self) -> bool { + true + } + + fn apply(self: Box, info: &ExecutionInfo) -> Result<()> { + self.inner.apply(info) + } +} + +pub fn always_callback(inner: T) -> AlwaysCallback { + AlwaysCallback { + inner: Box::new(inner), + } +} + +#[cfg(test)] +mod tests { + use std::panic::Location; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::sync::Arc; + + use databend_common_exception::ErrorCode; + use databend_common_exception::Result; + + use crate::always_callback; + use crate::ExecutionInfo; + use crate::FinishedCallbackChain; + + #[test] + fn test_callback_order() -> Result<()> { + let mut chain = FinishedCallbackChain::create(); + + let seq = Arc::new(AtomicUsize::new(0)); + + for index in 0..10 { + chain.push_back( + Location::caller(), + Box::new({ + let seq = seq.clone(); + move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(index, seq); + Ok(()) + } + }), + ); + } + + chain.apply(ExecutionInfo::create(Ok(()), vec![]))?; + + assert_eq!(seq.load(Ordering::SeqCst), 10); + + Ok(()) + } + + #[test] + fn test_callback_order_with_always_callback() -> Result<()> { + let mut chain = FinishedCallbackChain::create(); + + let seq = Arc::new(AtomicUsize::new(0)); + + for index in 0..10 { + chain.push_back( + Location::caller(), + Box::new({ + let seq = seq.clone(); + move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(index, seq); + Ok(()) + } + }), + ); + } + + // always callback after all callback + chain.push_front( + Location::caller(), + Box::new({ + let seq = seq.clone(); + always_callback(move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(11, seq); + Ok(()) + }) + }), + ); + + chain.push_front( + Location::caller(), + Box::new({ + let seq = seq.clone(); + always_callback(move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(10, seq); + Ok(()) + }) + }), + ); + + // always callback after all callback + chain.push_back( + Location::caller(), + Box::new({ + let seq = seq.clone(); + always_callback(move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(12, seq); + Ok(()) + }) + }), + ); + + chain.apply(ExecutionInfo::create(Ok(()), vec![]))?; + + assert_eq!(seq.load(Ordering::SeqCst), 13); + + Ok(()) + } + + #[test] + fn test_always_callback() -> Result<()> { + let mut chain = FinishedCallbackChain::create(); + + let seq = Arc::new(AtomicUsize::new(0)); + + for index in 0..10 { + chain.push_back( + Location::caller(), + Box::new({ + let seq = seq.clone(); + move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(index, seq); + Ok(()) + } + }), + ); + } + + chain.push_back( + Location::caller(), + Box::new(|_info: &ExecutionInfo| Err(ErrorCode::Internal(""))), + ); + + for _index in 0..10 { + chain.push_back( + Location::caller(), + Box::new(|_info: &ExecutionInfo| unreachable!("unreachable")), + ); + } + + // always callback after all callback + chain.push_front( + Location::caller(), + Box::new({ + let seq = seq.clone(); + always_callback(move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(11, seq); + Ok(()) + }) + }), + ); + + chain.push_front( + Location::caller(), + Box::new({ + let seq = seq.clone(); + always_callback(move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(10, seq); + Ok(()) + }) + }), + ); + + // always callback after all callback + chain.push_back( + Location::caller(), + Box::new({ + let seq = seq.clone(); + always_callback(move |_info: &ExecutionInfo| { + let seq = seq.fetch_add(1, Ordering::SeqCst); + assert_eq!(12, seq); + Ok(()) + }) + }), + ); + + assert!(chain.apply(ExecutionInfo::create(Ok(()), vec![])).is_err()); + + assert_eq!(seq.load(Ordering::SeqCst), 13); + + Ok(()) + } +} diff --git a/src/query/pipeline/core/src/lib.rs b/src/query/pipeline/core/src/lib.rs index fb04d186b8dc..f3c0011bff15 100644 --- a/src/query/pipeline/core/src/lib.rs +++ b/src/query/pipeline/core/src/lib.rs @@ -19,6 +19,7 @@ pub mod processors; +mod finished_chain; mod input_error; mod lock_guard; mod pipe; @@ -26,6 +27,10 @@ mod pipeline; mod pipeline_display; mod unsafe_cell_wrap; +pub use finished_chain::always_callback; +pub use finished_chain::Callback; +pub use finished_chain::ExecutionInfo; +pub use finished_chain::FinishedCallbackChain; pub use input_error::InputError; pub use lock_guard::LockGuard; pub use lock_guard::UnlockApi; diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 4802acaeff61..57852799f93d 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -26,6 +26,9 @@ use databend_common_exception::Result; use log::info; use petgraph::matrix_graph::Zero; +use crate::finished_chain::Callback; +use crate::finished_chain::ExecutionInfo; +use crate::finished_chain::FinishedCallbackChain; use crate::pipe::Pipe; use crate::pipe::PipeItem; use crate::processors::DuplicateProcessor; @@ -37,7 +40,6 @@ use crate::processors::ProcessorPtr; use crate::processors::ResizeProcessor; use crate::processors::ShuffleProcessor; use crate::LockGuard; -use crate::PlanProfile; use crate::SinkPipeBuilder; use crate::SourcePipeBuilder; use crate::TransformPipeBuilder; @@ -64,9 +66,10 @@ pub struct Pipeline { max_threads: usize, pub pipes: Vec, on_init: Option, - on_finished: Option, lock_guards: Vec, + on_finished_chain: FinishedCallbackChain, + plans_scope: Vec, scope_size: Arc, } @@ -79,9 +82,6 @@ impl Debug for Pipeline { pub type InitCallback = Box Result<()> + Send + Sync + 'static>; -pub type FinishedCallback = - Box, &Result<()>)) -> Result<()> + Send + Sync + 'static>; - pub type DynTransformBuilder = Box, Arc) -> Result>; impl Pipeline { @@ -90,9 +90,9 @@ impl Pipeline { max_threads: 0, pipes: Vec::new(), on_init: None, - on_finished: None, lock_guards: vec![], plans_scope: vec![], + on_finished_chain: FinishedCallbackChain::create(), scope_size: Arc::new(AtomicUsize::new(0)), } } @@ -104,8 +104,8 @@ impl Pipeline { max_threads: 0, pipes: Vec::new(), on_init: None, - on_finished: None, lock_guards: vec![], + on_finished_chain: FinishedCallbackChain::create(), plans_scope: scope, } } @@ -470,66 +470,18 @@ impl Pipeline { self.on_init = Some(Box::new(f)); } + // Set param into last #[track_caller] - pub fn set_on_finished< - F: FnOnce((&Vec, &Result<()>)) -> Result<()> + Send + Sync + 'static, - >( - &mut self, - f: F, - ) { + pub fn set_on_finished(&mut self, f: F) { let location = std::panic::Location::caller(); - if let Some(on_finished) = self.on_finished.take() { - self.on_finished = Some(Box::new(move |(profiles, may_error)| { - on_finished((profiles, may_error))?; - let instants = Instant::now(); - let _guard = defer(move || { - info!( - "OnFinished callback elapsed: {:?} while in {}:{}:{}", - instants.elapsed(), - location.file(), - location.line(), - location.column() - ); - }); - - f((profiles, may_error)) - })); - return; - } - - self.on_finished = Some(Box::new(f)); + self.on_finished_chain.push_back(location, Box::new(f)); } + // Lift current and set param into first #[track_caller] - pub fn push_front_on_finished_callback< - F: FnOnce((&Vec, &Result<()>)) -> Result<()> + Send + Sync + 'static, - >( - &mut self, - f: F, - ) { + pub fn lift_on_finished(&mut self, f: F) { let location = std::panic::Location::caller(); - if let Some(on_finished) = self.on_finished.take() { - self.on_finished = Some(Box::new(move |(profiles, may_error)| { - let instants = Instant::now(); - let guard = defer(move || { - info!( - "OnFinished callback elapsed: {:?} while in {}:{}:{}", - instants.elapsed(), - location.file(), - location.line(), - location.column() - ); - }); - - f((profiles, may_error))?; - drop(guard); - on_finished((profiles, may_error)) - })); - - return; - } - - self.on_finished = Some(Box::new(f)); + self.on_finished_chain.push_front(location, Box::new(f)); } pub fn take_on_init(&mut self) -> InitCallback { @@ -539,11 +491,10 @@ impl Pipeline { } } - pub fn take_on_finished(&mut self) -> FinishedCallback { - match self.on_finished.take() { - None => Box::new(|_may_error| Ok(())), - Some(on_finished) => on_finished, - } + pub fn take_on_finished(&mut self) -> FinishedCallbackChain { + let mut chain = FinishedCallbackChain::create(); + std::mem::swap(&mut self.on_finished_chain, &mut chain); + chain } pub fn add_plan_scope(&mut self, scope: PlanScope) -> PlanScopeGuard { @@ -565,13 +516,13 @@ impl Drop for Pipeline { fn drop(&mut self) { drop_guard(move || { // An error may have occurred before the executor was created. - if let Some(on_finished) = self.on_finished.take() { - let cause = Err(ErrorCode::Internal( - "Pipeline illegal state: not successfully shutdown.", - )); + let cause = Err(ErrorCode::Internal( + "Pipeline illegal state: not successfully shutdown.", + )); - let _ = (on_finished)((&vec![], &cause)); - } + let _ = self + .on_finished_chain + .apply(ExecutionInfo::create(cause, vec![])); }) } } diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 68e871770d6e..db4b0e79848d 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -19,6 +19,7 @@ use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::table::CompactionLimits; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::plans::LockTableOption; @@ -73,7 +74,7 @@ async fn do_hook_compact( return Ok(()); } - pipeline.set_on_finished(move |(_profiles, err)| { + pipeline.set_on_finished(move |info: &ExecutionInfo| { let compaction_limits = match compact_target.mutation_kind { MutationKind::Insert => { let compaction_num_block_hint = ctx.get_compaction_num_block_hint(); @@ -101,7 +102,7 @@ async fn do_hook_compact( metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64); let compact_start_at = Instant::now(); - if err.is_ok() { + if info.res.is_ok() { info!("execute {op_name} finished successfully. running table optimization job."); match GlobalIORuntime::instance().block_on({ compact_table(ctx, compact_target, compaction_limits, lock_opt) diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index fea1876cbd0f..0cde7429c7f0 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -24,6 +24,7 @@ use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::ListIndexesByIdReq; use databend_common_meta_app::schema::ListVirtualColumnsReq; use databend_common_meta_types::MetaId; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::Plan; @@ -64,8 +65,8 @@ pub async fn hook_refresh( return; } - pipeline.set_on_finished(move |(_profiles, err)| { - if err.is_ok() { + pipeline.set_on_finished(move |info: &ExecutionInfo| { + if info.res.is_ok() { info!("execute pipeline finished successfully, starting run refresh job."); match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, lock_opt)) { Ok(_) => { diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 868a53513451..0ad7aa46f2e6 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -27,7 +27,9 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::SendableDataBlockStream; +use databend_common_pipeline_core::always_callback; use databend_common_pipeline_core::processors::PlanProfile; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::SourcePipeBuilder; use databend_common_sql::plans::Plan; use databend_common_sql::PlanExtras; @@ -110,11 +112,11 @@ pub trait Interpreter: Sync + Send { let query_ctx = ctx.clone(); build_res .main_pipeline - .set_on_finished(move |(plans_profile, may_error)| { + .set_on_finished(always_callback(move |info: &ExecutionInfo| { let mut has_profiles = false; // Standalone mode or query executed is successfully - if query_ctx.get_cluster().is_empty() || may_error.is_ok() { - query_ctx.add_query_profiles(plans_profile); + if query_ctx.get_cluster().is_empty() || info.res.is_ok() { + query_ctx.add_query_profiles(&info.profiling); let query_profiles = query_ctx.get_query_profiles(); @@ -141,17 +143,17 @@ pub trait Interpreter: Sync + Send { hook_vacuum_temp_files(&query_ctx)?; - let err_opt = match may_error { + let err_opt = match &info.res { Ok(_) => None, Err(e) => Some(e.clone()), }; log_query_finished(&query_ctx, err_opt, has_profiles); - match may_error { + match &info.res { Ok(_) => Ok(()), Err(error) => Err(error.clone()), } - }); + })); ctx.set_status_info("executing pipeline"); diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index 7152212e10d4..1056b52acad0 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -31,6 +31,7 @@ use databend_common_license::license_manager::get_license_manager; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::UpdateIndexReq; use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::evaluator::CompoundBlockOperator; use databend_common_sql::executor::physical_plans::TableScan; @@ -375,7 +376,7 @@ impl Interpreter for RefreshIndexInterpreter { build_res .main_pipeline - .set_on_finished(move |(_profiles, may_error)| match may_error { + .set_on_finished(move |info: &ExecutionInfo| match &info.res { Ok(_) => GlobalIORuntime::instance() .block_on(async move { modify_last_update(ctx, req).await }), Err(error_code) => Err(error_code.clone()), diff --git a/src/query/service/src/interpreters/interpreter_select.rs b/src/query/service/src/interpreters/interpreter_select.rs index 721fa247673b..76d2ea471170 100644 --- a/src/query/service/src/interpreters/interpreter_select.rs +++ b/src/query/service/src/interpreters/interpreter_select.rs @@ -29,6 +29,7 @@ use databend_common_meta_app::schema::UpdateMultiTableMetaReq; use databend_common_meta_store::MetaStore; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipe; use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_core::Pipeline; @@ -167,8 +168,9 @@ impl SelectInterpreter { let catalog = self.ctx.get_catalog(catalog_name).await?; let query_id = self.ctx.get_id(); let auto_commit = !self.ctx.txn_mgr().lock().is_active(); - build_res.main_pipeline.set_on_finished( - move |(_profiles, may_error)| match may_error { + build_res + .main_pipeline + .set_on_finished(move |info: &ExecutionInfo| match &info.res { Ok(_) => GlobalIORuntime::instance().block_on(async move { info!( "Updating the stream meta to consume data, query_id: {}", @@ -196,8 +198,7 @@ impl SelectInterpreter { } }), Err(error_code) => Err(error_code.clone()), - }, - ); + }); } Ok(build_res) } diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 766cbdebdfdb..39f1b0882b31 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -40,6 +40,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TableNameIdent; use databend_common_meta_app::schema::TableStatistics; use databend_common_meta_types::MatchSeq; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_sql::field_default_value; use databend_common_sql::plans::CreateTablePlan; use databend_common_sql::BloomIndexColumns; @@ -274,10 +275,10 @@ impl CreateTableInterpreter { pipeline .main_pipeline - .push_front_on_finished_callback(move |(_profiles, err)| { + .lift_on_finished(move |info: &ExecutionInfo| { let qualified_table_name = format!("{}.{}", db_name, table_name); - if err.is_ok() { + if info.res.is_ok() { info!( "create_table_as_select {} success, commit table meta data by table id {}", qualified_table_name, table_id diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 5143d8778704..f3e3a5c7e8d5 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -25,6 +25,7 @@ use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_meta_app::schema::CatalogInfo; use databend_common_meta_app::schema::TableInfo; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_sql::executor::physical_plans::CommitSink; use databend_common_sql::executor::physical_plans::CompactSource; @@ -248,9 +249,8 @@ impl OptimizeTableInterpreter { let ctx = self.ctx.clone(); let plan = self.plan.clone(); let start = SystemTime::now(); - build_res - .main_pipeline - .set_on_finished(move |(_profiles, may_error)| match may_error { + build_res.main_pipeline.set_on_finished( + move |info: &ExecutionInfo| match &info.res { Ok(_) => InterpreterClusteringHistory::write_log( &ctx, start, @@ -259,7 +259,8 @@ impl OptimizeTableInterpreter { reclustered_block_count, ), Err(error_code) => Err(error_code.clone()), - }); + }, + ); } } } else { @@ -274,7 +275,7 @@ impl OptimizeTableInterpreter { } else { build_res .main_pipeline - .set_on_finished(move |(_profiles, may_error)| match may_error { + .set_on_finished(move |info: &ExecutionInfo| match &info.res { Ok(_) => GlobalIORuntime::instance() .block_on(async move { purge(ctx, catalog, plan, None).await }), Err(error_code) => Err(error_code.clone()), diff --git a/src/query/service/src/pipelines/builders/builder_on_finished.rs b/src/query/service/src/pipelines/builders/builder_on_finished.rs index 38144b54cfaf..1c53f44a0556 100644 --- a/src/query/service/src/pipelines/builders/builder_on_finished.rs +++ b/src/query/service/src/pipelines/builders/builder_on_finished.rs @@ -20,6 +20,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_meta_app::principal::StageInfo; use databend_common_metrics::storage::*; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_storages_stage::StageTable; use databend_storages_common_io::Files; @@ -47,8 +48,8 @@ impl PipelineBuilder { is_active }; // set on_finished callback. - main_pipeline.set_on_finished(move |(_profiles, may_error)| { - match may_error { + main_pipeline.set_on_finished(move |info: &ExecutionInfo| { + match &info.res { Ok(_) => { GlobalIORuntime::instance().block_on(async move { // 1. log on_error mode errors. diff --git a/src/query/service/src/pipelines/executor/mod.rs b/src/query/service/src/pipelines/executor/mod.rs index a76ad82086ef..f5b9f1520586 100644 --- a/src/query/service/src/pipelines/executor/mod.rs +++ b/src/query/service/src/pipelines/executor/mod.rs @@ -45,5 +45,4 @@ pub use processor_async_task::ProcessorAsyncTask; pub use queries_executor_tasks::QueriesExecutorTasksQueue; pub use queries_pipeline_executor::QueriesPipelineExecutor; pub use query_executor_tasks::QueryExecutorTasksQueue; -pub use query_pipeline_executor::FinishedCallback; pub use query_pipeline_executor::QueryPipelineExecutor; diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index f34b2310dd8f..95f7eb095988 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -26,6 +26,8 @@ use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_pipeline_core::ExecutionInfo; +use databend_common_pipeline_core::FinishedCallbackChain; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::PlanProfile; @@ -43,14 +45,11 @@ use crate::pipelines::executor::RunningGraph; pub type InitCallback = Box Result<()> + Send + Sync + 'static>; -pub type FinishedCallback = - Box, &Result<()>)) -> Result<()> + Send + Sync + 'static>; - pub struct QueryWrapper { graph: Arc, settings: ExecutorSettings, on_init_callback: Mutex>, - on_finished_callback: Mutex>, + on_finished_chain: Mutex, #[allow(unused)] lock_guards: Vec, finish_condvar_wait: Arc<(Mutex, Condvar)>, @@ -70,7 +69,7 @@ impl PipelineExecutor { )) } else { let on_init_callback = Some(pipeline.take_on_init()); - let on_finished_callback = Some(pipeline.take_on_finished()); + let on_finished_chain = pipeline.take_on_finished(); let finish_condvar = Arc::new((Mutex::new(false), Condvar::new())); @@ -87,7 +86,7 @@ impl PipelineExecutor { graph, settings, on_init_callback: Mutex::new(on_init_callback), - on_finished_callback: Mutex::new(on_finished_callback), + on_finished_chain: Mutex::new(on_finished_chain), lock_guards, finish_condvar_wait: finish_condvar, finished_notify: Arc::new(WatchNotify::new()), @@ -118,19 +117,11 @@ impl PipelineExecutor { }) }; - let on_finished_callback = { - let pipelines_callback = pipelines - .iter_mut() - .map(|x| x.take_on_finished()) - .collect::>(); + let mut on_finished_chain = FinishedCallbackChain::create(); - pipelines_callback.into_iter().reduce(|left, right| { - Box::new(move |arg| { - left(arg)?; - right(arg) - }) - }) - }; + for pipeline in &mut pipelines { + on_finished_chain.extend(pipeline.take_on_finished()); + } let finish_condvar = Arc::new((Mutex::new(false), Condvar::new())); @@ -150,7 +141,7 @@ impl PipelineExecutor { graph, settings, on_init_callback: Mutex::new(on_init_callback), - on_finished_callback: Mutex::new(on_finished_callback), + on_finished_chain: Mutex::new(on_finished_chain), lock_guards, finish_condvar_wait: finish_condvar, finished_notify: Arc::new(WatchNotify::new()), @@ -210,23 +201,15 @@ impl PipelineExecutor { let may_error = query_wrapper.graph.get_error(); return match may_error { None => { - let guard = query_wrapper.on_finished_callback.lock().take(); - if let Some(on_finished_callback) = guard { - catch_unwind(move || { - on_finished_callback((&self.get_plans_profile(), &Ok(()))) - })??; - } - Ok(()) + let mut finished_chain = query_wrapper.on_finished_chain.lock(); + let info = ExecutionInfo::create(Ok(()), self.get_plans_profile()); + finished_chain.apply(info) } Some(cause) => { - let guard = query_wrapper.on_finished_callback.lock().take(); - let cause_clone = cause.clone(); - if let Some(on_finished_callback) = guard { - catch_unwind(move || { - on_finished_callback((&self.get_plans_profile(), &Err(cause_clone))) - })??; - } - Err(cause) + let mut finished_chain = query_wrapper.on_finished_chain.lock(); + let profiling = self.get_plans_profile(); + let info = ExecutionInfo::create(Err(cause.clone()), profiling); + finished_chain.apply(info).and_then(|_| Err(cause)) } }; } diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index a0f2eff040a6..928a71632ceb 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -29,6 +29,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_sinks::Sink; use databend_common_pipeline_sinks::Sinker; @@ -129,7 +130,7 @@ impl PipelinePullingExecutor { ))) })?; - pipeline.set_on_finished(move |_may_error| { + pipeline.set_on_finished(move |_info: &ExecutionInfo| { drop(tx); Ok(()) }); diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 81a9afe03e3c..05694676e11c 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -31,6 +31,8 @@ use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrySpawn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_pipeline_core::ExecutionInfo; +use databend_common_pipeline_core::FinishedCallbackChain; use databend_common_pipeline_core::LockGuard; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::PlanProfile; @@ -54,9 +56,6 @@ use crate::pipelines::executor::WorkersCondvar; pub type InitCallback = Box Result<()> + Send + Sync + 'static>; -pub type FinishedCallback = - Box, &Result<()>)) -> Result<()> + Send + Sync + 'static>; - pub struct QueryPipelineExecutor { threads_num: usize, pub(crate) graph: Arc, @@ -64,7 +63,7 @@ pub struct QueryPipelineExecutor { pub async_runtime: Arc, pub global_tasks_queue: Arc, on_init_callback: Mutex>, - on_finished_callback: Mutex>, + on_finished_chain: Mutex, settings: ExecutorSettings, finished_notify: Arc, finished_error: Mutex>, @@ -86,19 +85,20 @@ impl QueryPipelineExecutor { } let on_init_callback = pipeline.take_on_init(); - let on_finished_callback = pipeline.take_on_finished(); + let mut on_finished_chain = pipeline.take_on_finished(); let lock_guards = pipeline.take_lock_guards(); match RunningGraph::create(pipeline, 1, settings.query_id.clone(), None) { Err(cause) => { - let _ = on_finished_callback((&vec![], &Err(cause.clone()))); + let _ = on_finished_chain.apply(ExecutionInfo::create(Err(cause.clone()), vec![])); + Err(cause) } Ok(running_graph) => Self::try_create( running_graph, threads_num, Mutex::new(Some(on_init_callback)), - Mutex::new(Some(on_finished_callback)), + Mutex::new(on_finished_chain), settings, lock_guards, ), @@ -140,19 +140,10 @@ impl QueryPipelineExecutor { }) }; - let on_finished_callback = { - let pipelines_callback = pipelines - .iter_mut() - .map(|x| x.take_on_finished()) - .collect::>(); - - pipelines_callback.into_iter().reduce(|left, right| { - Box::new(move |arg| { - left(arg)?; - right(arg) - }) - }) - }; + let mut finished_chain = FinishedCallbackChain::create(); + for pipeline in &mut pipelines { + finished_chain.extend(pipeline.take_on_finished()); + } let lock_guards = pipelines .iter_mut() @@ -161,17 +152,15 @@ impl QueryPipelineExecutor { match RunningGraph::from_pipelines(pipelines, 1, settings.query_id.clone(), None) { Err(cause) => { - if let Some(on_finished_callback) = on_finished_callback { - let _ = on_finished_callback((&vec![], &Err(cause.clone()))); - } - + let info = ExecutionInfo::create(Err(cause.clone()), vec![]); + let _ignore_res = finished_chain.apply(info); Err(cause) } Ok(running_graph) => Self::try_create( running_graph, threads_num, Mutex::new(on_init_callback), - Mutex::new(on_finished_callback), + Mutex::new(finished_chain), settings, lock_guards, ), @@ -182,7 +171,7 @@ impl QueryPipelineExecutor { graph: Arc, threads_num: usize, on_init_callback: Mutex>, - on_finished_callback: Mutex>, + on_finished_chain: Mutex, settings: ExecutorSettings, lock_guards: Vec, ) -> Result> { @@ -195,7 +184,7 @@ impl QueryPipelineExecutor { workers_condvar, global_tasks_queue, on_init_callback, - on_finished_callback, + on_finished_chain, async_runtime: GlobalIORuntime::instance(), settings, finished_error: Mutex::new(None), @@ -204,38 +193,32 @@ impl QueryPipelineExecutor { })) } - fn on_finished(&self, error: (&Vec, &Result<()>)) -> Result<()> { - let mut guard = self.on_finished_callback.lock(); - if let Some(on_finished_callback) = guard.take() { - drop(guard); + fn on_finished(&self, info: ExecutionInfo) -> Result<()> { + let mut on_finished_chain = self.on_finished_chain.lock(); - // untracking for on finished - let mut tracking_payload = ThreadTracker::new_tracking_payload(); - if let Some(mem_stat) = &tracking_payload.mem_stat { - tracking_payload.mem_stat = Some(MemStat::create_child( - String::from("Pipeline-on-finished"), - mem_stat.get_parent_memory_stat(), - )); - } - - catch_unwind(move || { - let _guard = ThreadTracker::tracking(tracking_payload); - on_finished_callback(error) - })??; + // untracking for on finished + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + if let Some(mem_stat) = &tracking_payload.mem_stat { + tracking_payload.mem_stat = Some(MemStat::create_child( + String::from("Pipeline-on-finished"), + mem_stat.get_parent_memory_stat(), + )); } - Ok(()) + + let _guard = ThreadTracker::tracking(tracking_payload); + on_finished_chain.apply(info) } pub fn finish(&self, cause: Option) { + let mut finished_error = self.finished_error.lock(); if let Some(cause) = cause { - let mut finished_error = self.finished_error.lock(); - // We only save the cause of the first error. if finished_error.is_none() { *finished_error = Some(cause); } } + drop(finished_error); self.global_tasks_queue.finish(self.workers_condvar.clone()); self.graph.interrupt_running_nodes(); self.finished_notify.notify_waiters(); @@ -262,7 +245,8 @@ impl QueryPipelineExecutor { let may_error = error.clone(); drop(finished_error_guard); - self.on_finished((&self.get_plans_profile(), &Err(may_error.clone())))?; + let profiling = self.get_plans_profile(); + self.on_finished(ExecutionInfo::create(Err(may_error.clone()), profiling))?; return Err(may_error); } } @@ -270,18 +254,20 @@ impl QueryPipelineExecutor { // We will ignore the abort query error, because returned by finished_error if abort query. if matches!(&thread_res, Err(error) if error.code() != ErrorCode::ABORTED_QUERY) { let may_error = thread_res.unwrap_err(); - self.on_finished((&self.get_plans_profile(), &Err(may_error.clone())))?; + let profiling = self.get_plans_profile(); + self.on_finished(ExecutionInfo::create(Err(may_error.clone()), profiling))?; return Err(may_error); } } if let Err(error) = self.graph.assert_finished_graph() { - self.on_finished((&self.get_plans_profile(), &Err(error.clone())))?; + let profiling = self.get_plans_profile(); + self.on_finished(ExecutionInfo::create(Err(error.clone()), profiling))?; return Err(error); } - // self.settings.query_id - self.on_finished((&self.get_plans_profile(), &Ok(())))?; + let profiling = self.get_plans_profile(); + self.on_finished(ExecutionInfo::create(Ok(()), profiling))?; Ok(()) } @@ -351,16 +337,16 @@ impl QueryPipelineExecutor { let max_execute_time_in_seconds = self.settings.max_execute_time_in_seconds; let finished_notify = self.finished_notify.clone(); self.async_runtime.spawn(async move { - let finished_future = Box::pin(finished_notify.notified()); - let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); - if let Either::Left(_) = select(max_execute_future, finished_future).await { - if let Some(executor) = this.upgrade() { - executor.finish(Some(ErrorCode::AbortedQuery( - "Aborted query, because the execution time exceeds the maximum execution time limit", - ))); - } - } - }); + let finished_future = Box::pin(finished_notify.notified()); + let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); + if let Either::Left(_) = select(max_execute_future, finished_future).await { + if let Some(executor) = this.upgrade() { + executor.finish(Some(ErrorCode::AbortedQuery( + "Aborted query, because the execution time exceeds the maximum execution time limit", + ))); + } + } + }); } Ok(()) @@ -522,33 +508,27 @@ impl Drop for QueryPipelineExecutor { drop_guard(move || { self.finish(None); - let mut guard = self.on_finished_callback.lock(); - if let Some(on_finished_callback) = guard.take() { - drop(guard); - let cause = match self.finished_error.lock().as_ref() { - Some(cause) => cause.clone(), - None => { - ErrorCode::Internal("Pipeline illegal state: not successfully shutdown.") - } - }; + let cause = match self.finished_error.lock().as_ref() { + Some(cause) => cause.clone(), + None => ErrorCode::Internal("Pipeline illegal state: not successfully shutdown."), + }; - // untracking for on finished - let mut tracking_payload = ThreadTracker::new_tracking_payload(); - if let Some(mem_stat) = &tracking_payload.mem_stat { - tracking_payload.mem_stat = Some(MemStat::create_child( - String::from("Pipeline-on-finished"), - mem_stat.get_parent_memory_stat(), - )); - } + let mut on_finished_chain = self.on_finished_chain.lock(); - if let Err(cause) = catch_unwind(move || { - let _guard = ThreadTracker::tracking(tracking_payload); - on_finished_callback((&self.get_plans_profile(), &Err(cause))) - }) - .flatten() - { - warn!("Pipeline executor shutdown failure, {:?}", cause); - } + // untracking for on finished + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + if let Some(mem_stat) = &tracking_payload.mem_stat { + tracking_payload.mem_stat = Some(MemStat::create_child( + String::from("Pipeline-on-finished"), + mem_stat.get_parent_memory_stat(), + )); + } + + let _guard = ThreadTracker::tracking(tracking_payload); + let profiling = self.get_plans_profile(); + let info = ExecutionInfo::create(Err(cause), profiling); + if let Err(cause) = on_finished_chain.apply(info) { + warn!("Pipeline executor shutdown failure, {:?}", cause); } }) } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index b1574bac241c..dc0dfbd6fcda 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -30,6 +30,7 @@ use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_grpc::ConnectionFactory; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_sql::executor::PhysicalPlan; use minitrace::prelude::*; use parking_lot::Mutex; @@ -395,23 +396,24 @@ impl DataExchangeManager { let statistics_receiver: Mutex = Mutex::new(statistics_receiver); - let on_finished = build_res.main_pipeline.take_on_finished(); + // Interrupting the execution of finished callback if network error build_res .main_pipeline - .set_on_finished(move |(profiles, may_error)| { + .lift_on_finished(move |info: &ExecutionInfo| { let query_id = ctx.get_id(); let mut statistics_receiver = statistics_receiver.lock(); - statistics_receiver.shutdown(may_error.is_err()); + statistics_receiver.shutdown(info.res.is_err()); ctx.get_exchange_manager().on_finished_query(&query_id); - statistics_receiver.wait_shutdown()?; - - on_finished((profiles, may_error))?; + statistics_receiver.wait_shutdown() + }); - match may_error { - Ok(_) => Ok(()), - Err(error_code) => Err(error_code.clone()), - } + // Return if itβ€˜s an error returned by another query node + build_res + .main_pipeline + .set_on_finished(move |info: &ExecutionInfo| match &info.res { + Ok(_) => Ok(()), + Err(error_code) => Err(error_code.clone()), }); Ok(build_res) diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 6639bb80f6c9..05e4acae9264 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -230,12 +230,13 @@ impl QueryContextShared { pub fn kill(&self, cause: ErrorCode) { self.set_error(cause.clone()); - self.aborting.store(true, Ordering::Release); if let Some(executor) = self.executor.read().upgrade() { executor.finish(Some(cause)); } + self.aborting.store(true, Ordering::Release); + // TODO: Wait for the query to be processed (write out the last error) } diff --git a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs index e35ba1ef99cf..723e100d1777 100644 --- a/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/tests/it/pipelines/executor/pipeline_executor.rs @@ -26,6 +26,7 @@ use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::ExecutionInfo; use databend_common_pipeline_core::Pipe; use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_core::Pipeline; @@ -99,7 +100,7 @@ fn create_pipeline() -> (Arc, Pipeline) { pipeline.set_on_init(|| Err(ErrorCode::Internal("test failure"))); pipeline.set_on_finished({ let called_finished = called_finished.clone(); - move |_may_error| { + move |_info: &ExecutionInfo| { called_finished.fetch_or(true, Ordering::SeqCst); Ok(()) } diff --git a/tests/suites/1_stateful/09_http_handler/09_0005_kill.result b/tests/suites/1_stateful/09_http_handler/09_0005_kill.result index 981240a2d7a0..ca2fad19eb65 100644 --- a/tests/suites/1_stateful/09_http_handler/09_0005_kill.result +++ b/tests/suites/1_stateful/09_http_handler/09_0005_kill.result @@ -11,3 +11,6 @@ "message": "canceled by client", "detail": "" } +## query_log +0 1 +1043 AbortedQuery. Code: 1043, Text = canceled by client. 4 diff --git a/tests/suites/1_stateful/09_http_handler/09_0005_kill.sh b/tests/suites/1_stateful/09_http_handler/09_0005_kill.sh index 57e9e18e8bfe..89e89fdbe0d6 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0005_kill.sh +++ b/tests/suites/1_stateful/09_http_handler/09_0005_kill.sh @@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) QID="my_query_for_kill_${RANDOM}" echo "## query" -curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select sleep(0.5) from numbers(15000000);", "pagination": { "wait_time_secs": 6}}' | jq ".state" +curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H "x-databend-query-id:${QID}" -H 'Content-Type: application/json' -d '{"sql": "select sleep(0.5), number from numbers(15000000000);", "pagination": { "wait_time_secs": 6}}' | jq ".state" echo "## kill" curl -s -u root: -XGET -w "%{http_code}\n" "http://localhost:8000/v1/query/${QID}/kill" echo "## page" @@ -13,10 +13,7 @@ curl -s -u root: -XGET -w "\n%{http_code}\n" "http://localhost:8000/v1/query/${Q echo "## final" curl -s -u root: -XGET -w "\n" "http://localhost:8000/v1/query/${QID}/final" | jq ".error" +sleep 5 ## todo: this is flaky on ci, may lost the second row, can not reproduce locally for now -#echo "## query_log" -#echo "select exception_code, exception_text, log_type from system.query_log where query_id='${QID}' order by log_type" | $BENDSQL_CLIENT_CONNECT | sed "s/${QID}/QID/g" -#---- -### query_log -#0 1 -#1043 AbortedQuery. Code: 1043, Text = canceled by client. 4 +echo "## query_log" +echo "select exception_code, exception_text, log_type from system.query_log where query_id='${QID}' order by log_type" | $BENDSQL_CLIENT_CONNECT | sed "s/${QID}/QID/g"