From 4f97a792cdcc166f24cbcdec4aaca662b91c7b2d Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 30 Jul 2024 06:53:38 +0000 Subject: [PATCH 1/2] Added grpc --- ydb/tests/tools/kqprun/kqprun.cpp | 26 +++++++++-- ydb/tests/tools/kqprun/src/actors.cpp | 56 ++++++++++++++++++------ ydb/tests/tools/kqprun/src/actors.h | 4 +- ydb/tests/tools/kqprun/src/common.h | 17 ++++++- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 26 ++++++++--- 5 files changed, 102 insertions(+), 27 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index fe30413d12d5..a9989f28b84d 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -210,7 +210,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } } - if (runnerOptions.YdbSettings.MonitoringEnabled) { + if (runnerOptions.YdbSettings.MonitoringEnabled || runnerOptions.YdbSettings.GrpcEnabled) { RunAsDaemon(); } @@ -452,8 +452,17 @@ class TMain : public TMainClassArgs { }); options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") .RequiredArgument("uint") - .DefaultValue(RunnerOptions.YdbSettings.InFlightLimit) - .StoreResult(&RunnerOptions.YdbSettings.InFlightLimit); + .DefaultValue(RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit) + .StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit); + TChoices verbose({ + {"each-query", NKqpRun::TAsyncQueriesSettings::EVerbose::EachQuery}, + {"final", NKqpRun::TAsyncQueriesSettings::EVerbose::Final} + }); + options.AddLongOption("async-verbose", "Verbose type for async queries") + .RequiredArgument("type") + .DefaultValue("each-query") + .Choices(verbose.GetChoices()) + .StoreMappedResultT(&RunnerOptions.YdbSettings.AsyncQueriesSettings.Verbose, verbose); TChoices scriptAction({ {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, @@ -510,6 +519,15 @@ class TMain : public TMainClassArgs { } }); + options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used kqprun will be runs as daemon") + .RequiredArgument("uint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + if (const TString& port = option->CurVal()) { + RunnerOptions.YdbSettings.GrpcEnabled = true; + RunnerOptions.YdbSettings.GrpcPort = FromString(port); + } + }); + options.AddLongOption('E', "emulate-yt", "Emulate YT tables (use file gateway instead of native gateway)") .NoArgument() .SetFlag(&EmulateYt); @@ -529,7 +547,7 @@ class TMain : public TMainClassArgs { } int DoRun(NLastGetopt::TOptsParseResult&&) override { - if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) { + if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) { ythrow yexception() << "Nothing to execute"; } diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 5e962c59157e..90e8a78282c7 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -102,12 +102,17 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped { using TBase = NActors::TActor; + struct TRequestInfo { + TInstant StartTime; + NThreading::TFuture RequestFuture; + }; + public: - TAsyncQueryRunnerActor(ui64 inFlightLimit) + TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) : TBase(&TAsyncQueryRunnerActor::StateFunc) - , InFlightLimit_(inFlightLimit) + , Settings_(settings) { - RunningRequests_.reserve(InFlightLimit_); + RunningRequests_.reserve(Settings_.InFlightLimit); } STRICT_STFUNC(StateFunc, @@ -123,6 +128,7 @@ class TAsyncQueryRunnerActor : public NActors::TActor { void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) { const ui64 requestId = ev->Get()->RequestId; + RequestsLatency += TInstant::Now() - RunningRequests_[requestId].StartTime; RunningRequests_.erase(requestId); const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef(); @@ -130,7 +136,9 @@ class TAsyncQueryRunnerActor : public NActors::TActor { if (status == Ydb::StatusIds::SUCCESS) { Completed_++; - Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) { + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + } } else { Failed_++; NYql::TIssues issues; @@ -138,6 +146,11 @@ class TAsyncQueryRunnerActor : public NActors::TActor { Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default(); } + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime > TDuration::Seconds(1)) { + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + LastReportTime = TInstant::Now(); + } + StartDelayedRequests(); TryFinalize(); } @@ -151,18 +164,23 @@ class TAsyncQueryRunnerActor : public NActors::TActor { private: void StartDelayedRequests() { - while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) { + while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) { auto request = std::move(DelayedRequests_.front()); DelayedRequests_.pop(); auto promise = NThreading::NewPromise(); Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr)); - RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture& f) { - Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); - }); + RunningRequests_[RequestId_] = { + .StartTime = TInstant::Now(), + .RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture& f) { + Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); + }) + }; MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size()); - Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) { + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + } RequestId_++; request->Get()->StartPromise.SetValue(); @@ -174,28 +192,38 @@ class TAsyncQueryRunnerActor : public NActors::TActor { return false; } + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) { + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + } + FinalizePromise_->SetValue(); PassAway(); return true; } TString GetInfoString() const { - return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; + TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; + if (const auto amountRequests = Completed_ + Failed_) { + result << ", average latency: " << RequestsLatency / amountRequests; + } + return result; } private: - const ui64 InFlightLimit_; + const TAsyncQueriesSettings Settings_; const TInstant StartTime_ = TInstant::Now(); const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout); std::optional> FinalizePromise_; std::queue DelayedRequests_; - std::unordered_map> RunningRequests_; + std::unordered_map RunningRequests_; + TInstant LastReportTime = TInstant::Now(); ui64 RequestId_ = 1; ui64 MaxInFlight_ = 0; ui64 Completed_ = 0; ui64 Failed_ = 0; + TDuration RequestsLatency; }; class TResourcesWaiterActor : public NActors::TActorBootstrapped { @@ -270,8 +298,8 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr return new TRunScriptActorMock(std::move(request), promise, progressCallback); } -NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) { - return new TAsyncQueryRunnerActor(inFlightLimit); +NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) { + return new TAsyncQueryRunnerActor(settings); } NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index 8dc4e731a4ea..23e15be157e2 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -1,5 +1,7 @@ #pragma once +#include "common.h" + #include #include @@ -72,7 +74,7 @@ using TProgressCallback = std::function promise, TProgressCallback progressCallback); -NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit); +NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings); NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index 561f40e38293..5966d2785818 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -14,6 +14,16 @@ namespace NKqpRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; +struct TAsyncQueriesSettings { + enum class EVerbose { + EachQuery, + Final, + }; + + ui64 InFlightLimit = 0; + EVerbose Verbose = EVerbose::EachQuery; +}; + struct TYdbSetupSettings { ui32 NodeCount = 1; TString DomainName = "Root"; @@ -21,6 +31,10 @@ struct TYdbSetupSettings { bool MonitoringEnabled = false; ui16 MonitoringPortOffset = 0; + + bool GrpcEnabled = false; + ui16 GrpcPort = 0; + bool TraceOptEnabled = false; TString LogOutputFile; @@ -29,8 +43,7 @@ struct TYdbSetupSettings { NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; TIntrusivePtr YtGateway; NKikimrConfig::TAppConfig AppConfig; - - ui64 InFlightLimit = 0; + TAsyncQueriesSettings AsyncQueriesSettings; }; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 9026a0fb40c0..0be1665fe665 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -120,8 +120,8 @@ class TYdbSetup::TImpl { serverSettings.SetFrFactory(functionRegistryFactory); } - NKikimr::Tests::TServerSettings GetServerSettings() { - ui32 msgBusPort = PortManager_.GetPort(); + NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) { + const ui32 msgBusPort = PortManager_.GetPort(); NKikimr::Tests::TServerSettings serverSettings(msgBusPort, Settings_.AppConfig.GetAuthConfig(), Settings_.AppConfig.GetPQConfig()); serverSettings.SetNodeCount(Settings_.NodeCount); @@ -152,15 +152,23 @@ class TYdbSetup::TImpl { serverSettings.SetNeedStatsCollectors(true); } + if (Settings_.GrpcEnabled) { + serverSettings.SetGrpcPort(grpcPort); + } + return serverSettings; } - void InitializeServer() { - NKikimr::Tests::TServerSettings serverSettings = GetServerSettings(); + void InitializeServer(ui32 grpcPort) { + NKikimr::Tests::TServerSettings serverSettings = GetServerSettings(grpcPort); Server_ = MakeHolder(serverSettings); Server_->GetRuntime()->SetDispatchTimeout(TDuration::Max()); + if (Settings_.GrpcEnabled) { + Server_->EnableGRpc(grpcPort); + } + Client_ = MakeHolder(serverSettings); Client_->InitRootScheme(); } @@ -204,8 +212,10 @@ class TYdbSetup::TImpl { : Settings_(settings) , CoutColors_(NColorizer::AutoColors(Cout)) { + const ui32 grpcPort = Settings_.GrpcPort ? Settings_.GrpcPort : PortManager_.GetPort(); + InitializeYqlLogger(); - InitializeServer(); + InitializeServer(grpcPort); WaitResourcesPublishing(); if (Settings_.MonitoringEnabled) { @@ -213,6 +223,10 @@ class TYdbSetup::TImpl { Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl; } } + + if (Settings_.GrpcEnabled) { + Cout << CoutColors_.Cyan() << "gRPC port: " << CoutColors_.Default() << grpcPort << Endl; + } } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TRequestOptions& query) const { @@ -274,7 +288,7 @@ class TYdbSetup::TImpl { void QueryRequestAsync(const TRequestOptions& query) { if (!AsyncQueryRunnerActorId_) { - AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit)); + AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings)); } auto request = GetQueryRequest(query); From 4770fa96965e33bb67f1f719bd4cb64d5281f636 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Wed, 31 Jul 2024 06:14:24 +0000 Subject: [PATCH 2/2] Fixed issues --- ydb/tests/tools/kqprun/kqprun.cpp | 6 +++--- ydb/tests/tools/kqprun/src/actors.cpp | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index a9989f28b84d..52769286de32 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -452,7 +452,7 @@ class TMain : public TMainClassArgs { }); options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") .RequiredArgument("uint") - .DefaultValue(RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit) + .DefaultValue(0) .StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit); TChoices verbose({ {"each-query", NKqpRun::TAsyncQueriesSettings::EVerbose::EachQuery}, @@ -510,7 +510,7 @@ class TMain : public TMainClassArgs { return nodeCount; }); - options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be runs as daemon") + options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be run as daemon") .RequiredArgument("uint") .Handler1([this](const NLastGetopt::TOptsParser* option) { if (const TString& port = option->CurVal()) { @@ -519,7 +519,7 @@ class TMain : public TMainClassArgs { } }); - options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used kqprun will be runs as daemon") + options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used kqprun will be run as daemon") .RequiredArgument("uint") .Handler1([this](const NLastGetopt::TOptsParser* option) { if (const TString& port = option->CurVal()) { diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 90e8a78282c7..5955d6fbb2b8 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -128,7 +128,7 @@ class TAsyncQueryRunnerActor : public NActors::TActor { void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) { const ui64 requestId = ev->Get()->RequestId; - RequestsLatency += TInstant::Now() - RunningRequests_[requestId].StartTime; + RequestsLatency_ += TInstant::Now() - RunningRequests_[requestId].StartTime; RunningRequests_.erase(requestId); const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef(); @@ -146,9 +146,9 @@ class TAsyncQueryRunnerActor : public NActors::TActor { Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default(); } - if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime > TDuration::Seconds(1)) { + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime_ > TDuration::Seconds(1)) { Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; - LastReportTime = TInstant::Now(); + LastReportTime_ = TInstant::Now(); } StartDelayedRequests(); @@ -204,7 +204,7 @@ class TAsyncQueryRunnerActor : public NActors::TActor { TString GetInfoString() const { TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; if (const auto amountRequests = Completed_ + Failed_) { - result << ", average latency: " << RequestsLatency / amountRequests; + result << ", average latency: " << RequestsLatency_ / amountRequests; } return result; } @@ -217,13 +217,13 @@ class TAsyncQueryRunnerActor : public NActors::TActor { std::optional> FinalizePromise_; std::queue DelayedRequests_; std::unordered_map RunningRequests_; - TInstant LastReportTime = TInstant::Now(); + TInstant LastReportTime_ = TInstant::Now(); ui64 RequestId_ = 1; ui64 MaxInFlight_ = 0; ui64 Completed_ = 0; ui64 Failed_ = 0; - TDuration RequestsLatency; + TDuration RequestsLatency_; }; class TResourcesWaiterActor : public NActors::TActorBootstrapped {