diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 07314478fd0..3b856d558cd 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -24,7 +24,7 @@ namespace DB F(type_super_batch, {"type", "super_batch"}), F(type_super_batch_cop_dag, {"type", "super_batch_cop_dag"})) \ M(tiflash_coprocessor_executor_count, "Total number of each executor", Counter, F(type_ts, {"type", "table_scan"}), \ F(type_sel, {"type", "selection"}), F(type_agg, {"type", "aggregation"}), F(type_topn, {"type", "top_n"}), \ - F(type_limit, {"type", "limit"})) \ + F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"})) \ M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \ F(type_batch, {{"type", "batch"}}, ExpBuckets{0.0005, 2, 20}), F(type_cop, {{"type", "cop"}}, ExpBuckets{0.0005, 2, 20}), \ F(type_super_batch, {{"type", "super_batch"}}, ExpBuckets{0.0005, 2, 20})) \ diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp index fe5d5a51619..7aea52b4f76 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp @@ -4,6 +4,7 @@ #pragma GCC diagnostic pop #include +#include #include #include @@ -63,7 +64,7 @@ void collectOutPutFieldTypesFromAgg(std::vector & field_type, c /// construct DAGQueryBlock from a tree struct based executors, which is the /// format after supporting join in dag request -DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_) +DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_, TiFlashMetricsPtr metrics) : id(id_), root(&root_), qb_column_prefix("__QB_" + std::to_string(id_) + "_"), qb_join_subquery_alias(qb_column_prefix + "join") { const tipb::Executor * current = root; @@ -72,23 +73,27 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_) switch (current->tp()) { case tipb::ExecType::TypeSelection: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_sel).Increment(); assignOrThrowException(&selection, current, SEL_NAME); selection_name = current->executor_id(); current = ¤t->selection().child(); break; case tipb::ExecType::TypeAggregation: case tipb::ExecType::TypeStreamAgg: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_agg).Increment(); assignOrThrowException(&aggregation, current, AGG_NAME); aggregation_name = current->executor_id(); collectOutPutFieldTypesFromAgg(output_field_types, current->aggregation()); current = ¤t->aggregation().child(); break; case tipb::ExecType::TypeLimit: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_limit).Increment(); assignOrThrowException(&limitOrTopN, current, LIMIT_NAME); limitOrTopN_name = current->executor_id(); current = ¤t->limit().child(); break; case tipb::ExecType::TypeTopN: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_topn).Increment(); assignOrThrowException(&limitOrTopN, current, TOPN_NAME); limitOrTopN_name = current->executor_id(); current = ¤t->topn().child(); @@ -109,15 +114,20 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const tipb::Executor & root_) { if (source->join().children_size() != 2) throw TiFlashException("Join executor children size not equal to 2", Errors::Coprocessor::BadRequest); - children.push_back(std::make_shared(id * 2, source->join().children(0))); - children.push_back(std::make_shared(id * 2 + 1, source->join().children(1))); + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_join).Increment(); + children.push_back(std::make_shared(id * 2, source->join().children(0), metrics)); + children.push_back(std::make_shared(id * 2 + 1, source->join().children(1), metrics)); + } + else + { + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_ts).Increment(); } fillOutputFieldTypes(); } /// construct DAGQueryBlock from a list struct based executors, which is the /// format before supporting join in dag request -DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrField & executors) +DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrField & executors, TiFlashMetricsPtr metrics) : id(id_), root(nullptr), qb_column_prefix("__QB_" + std::to_string(id_) + "_"), qb_join_subquery_alias(qb_column_prefix + "join") { for (int i = (int)executors.size() - 1; i >= 0; i--) @@ -125,6 +135,7 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi switch (executors[i].tp()) { case tipb::ExecType::TypeTableScan: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_ts).Increment(); assignOrThrowException(&source, &executors[i], SOURCE_NAME); /// use index as the prefix for executor name so when we sort by /// the executor name, it will result in the same order as it is @@ -133,25 +144,30 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi source_name = std::to_string(i) + "_tablescan"; break; case tipb::ExecType::TypeSelection: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_sel).Increment(); assignOrThrowException(&selection, &executors[i], SEL_NAME); selection_name = std::to_string(i) + "_selection"; break; case tipb::ExecType::TypeStreamAgg: case tipb::ExecType::TypeAggregation: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_agg).Increment(); assignOrThrowException(&aggregation, &executors[i], AGG_NAME); aggregation_name = std::to_string(i) + "_aggregation"; collectOutPutFieldTypesFromAgg(output_field_types, executors[i].aggregation()); break; case tipb::ExecType::TypeTopN: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_topn).Increment(); assignOrThrowException(&limitOrTopN, &executors[i], TOPN_NAME); limitOrTopN_name = std::to_string(i) + "_limitOrTopN"; break; case tipb::ExecType::TypeLimit: + GET_METRIC(metrics, tiflash_coprocessor_executor_count, type_limit).Increment(); assignOrThrowException(&limitOrTopN, &executors[i], LIMIT_NAME); limitOrTopN_name = std::to_string(i) + "_limitOrTopN"; break; default: - throw TiFlashException("Unsupported executor in DAG request: " + executors[i].DebugString(), Errors::Coprocessor::Unimplemented); + throw TiFlashException( + "Unsupported executor in DAG request: " + executors[i].DebugString(), Errors::Coprocessor::Unimplemented); } } fillOutputFieldTypes(); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h index e8d7a276f08..520e3252f74 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h @@ -24,8 +24,8 @@ using TiFlashMetricsPtr = std::shared_ptr; class DAGQueryBlock { public: - DAGQueryBlock(UInt32 id, const tipb::Executor & root); - DAGQueryBlock(UInt32 id, const ::google::protobuf::RepeatedPtrField & executors); + DAGQueryBlock(UInt32 id, const tipb::Executor & root, TiFlashMetricsPtr metrics); + DAGQueryBlock(UInt32 id, const ::google::protobuf::RepeatedPtrField & executors, TiFlashMetricsPtr metrics); /// the xxx_name is added for compatibility issues: before join is supported, executor does not /// has executor name, after join is supported in dag request, every executor has an unique /// name(executor->executor_id()). Since We can not always get the executor name from executor diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index 292fa0f3fe2..ba2eb188c57 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -21,16 +21,15 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, co dag_context(dag_context_), regions(regions_), dag_request(dag_request_), - metrics(context.getTiFlashMetrics()), is_batch_cop(is_batch_cop_) { if (dag_request.has_root_executor()) { - root_query_block = std::make_shared(1, dag_request.root_executor()); + root_query_block = std::make_shared(1, dag_request.root_executor(), context.getTiFlashMetrics()); } else { - root_query_block = std::make_shared(1, dag_request.executors()); + root_query_block = std::make_shared(1, dag_request.executors(), context.getTiFlashMetrics()); } root_query_block->collectAllPossibleChildrenJoinSubqueryAlias(dag_context.qb_id_to_join_alias_map); for (Int32 i : dag_request.output_offsets()) diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.h b/dbms/src/Flash/Coprocessor/DAGQuerySource.h index 9e935fc7798..4a3d1b69b0d 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.h +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.h @@ -78,8 +78,6 @@ class DAGQuerySource : public IQuerySource const tipb::DAGRequest & dag_request; - TiFlashMetricsPtr metrics; - std::vector result_field_types; tipb::EncodeType encode_type; std::shared_ptr root_query_block;