Skip to content

Commit

Permalink
YQ added grpc endpoint into kqprun (#7231)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 31, 2024
1 parent 6f27c67 commit aa8cfc7
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 28 deletions.
28 changes: 23 additions & 5 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
}
}

if (runnerOptions.YdbSettings.MonitoringEnabled) {
if (runnerOptions.YdbSettings.MonitoringEnabled || runnerOptions.YdbSettings.GrpcEnabled) {
RunAsDaemon();
}

Expand Down Expand Up @@ -452,8 +452,17 @@ class TMain : public TMainClassArgs {
});
options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
.RequiredArgument("uint")
.DefaultValue(RunnerOptions.YdbSettings.InFlightLimit)
.StoreResult(&RunnerOptions.YdbSettings.InFlightLimit);
.DefaultValue(0)
.StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit);
TChoices<NKqpRun::TAsyncQueriesSettings::EVerbose> verbose({
{"each-query", NKqpRun::TAsyncQueriesSettings::EVerbose::EachQuery},
{"final", NKqpRun::TAsyncQueriesSettings::EVerbose::Final}
});
options.AddLongOption("async-verbose", "Verbose type for async queries")
.RequiredArgument("type")
.DefaultValue("each-query")
.Choices(verbose.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.YdbSettings.AsyncQueriesSettings.Verbose, verbose);

TChoices<NKikimrKqp::EQueryAction> scriptAction({
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},
Expand Down Expand Up @@ -501,7 +510,7 @@ class TMain : public TMainClassArgs {
return nodeCount;
});

options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be runs as daemon")
options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be run as daemon")
.RequiredArgument("uint")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
if (const TString& port = option->CurVal()) {
Expand All @@ -510,6 +519,15 @@ class TMain : public TMainClassArgs {
}
});

options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used kqprun will be run as daemon")
.RequiredArgument("uint")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
if (const TString& port = option->CurVal()) {
RunnerOptions.YdbSettings.GrpcEnabled = true;
RunnerOptions.YdbSettings.GrpcPort = FromString(port);
}
});

options.AddLongOption('E', "emulate-yt", "Emulate YT tables (use file gateway instead of native gateway)")
.NoArgument()
.SetFlag(&EmulateYt);
Expand All @@ -529,7 +547,7 @@ class TMain : public TMainClassArgs {
}

int DoRun(NLastGetopt::TOptsParseResult&&) override {
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) {
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute";
}

Expand Down
56 changes: 42 additions & 14 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,17 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
using TBase = NActors::TActor<TAsyncQueryRunnerActor>;

struct TRequestInfo {
TInstant StartTime;
NThreading::TFuture<TQueryResponse> RequestFuture;
};

public:
TAsyncQueryRunnerActor(ui64 inFlightLimit)
TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings)
: TBase(&TAsyncQueryRunnerActor::StateFunc)
, InFlightLimit_(inFlightLimit)
, Settings_(settings)
{
RunningRequests_.reserve(InFlightLimit_);
RunningRequests_.reserve(Settings_.InFlightLimit);
}

STRICT_STFUNC(StateFunc,
Expand All @@ -123,21 +128,29 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {

void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
const ui64 requestId = ev->Get()->RequestId;
RequestsLatency_ += TInstant::Now() - RunningRequests_[requestId].StartTime;
RunningRequests_.erase(requestId);

const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef();
const auto status = response.GetYdbStatus();

if (status == Ydb::StatusIds::SUCCESS) {
Completed_++;
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
}
} else {
Failed_++;
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues);
Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default();
}

if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime_ > TDuration::Seconds(1)) {
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
LastReportTime_ = TInstant::Now();
}

StartDelayedRequests();
TryFinalize();
}
Expand All @@ -151,18 +164,23 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {

private:
void StartDelayedRequests() {
while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) {
while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) {
auto request = std::move(DelayedRequests_.front());
DelayedRequests_.pop();

auto promise = NThreading::NewPromise<TQueryResponse>();
Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr));
RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
});
RunningRequests_[RequestId_] = {
.StartTime = TInstant::Now(),
.RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
})
};

MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
}

RequestId_++;
request->Get()->StartPromise.SetValue();
Expand All @@ -174,28 +192,38 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
return false;
}

if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) {
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
}

FinalizePromise_->SetValue();
PassAway();
return true;
}

TString GetInfoString() const {
return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
if (const auto amountRequests = Completed_ + Failed_) {
result << ", average latency: " << RequestsLatency_ / amountRequests;
}
return result;
}

private:
const ui64 InFlightLimit_;
const TAsyncQueriesSettings Settings_;
const TInstant StartTime_ = TInstant::Now();
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);

std::optional<NThreading::TPromise<void>> FinalizePromise_;
std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
std::unordered_map<ui64, NThreading::TFuture<TQueryResponse>> RunningRequests_;
std::unordered_map<ui64, TRequestInfo> RunningRequests_;
TInstant LastReportTime_ = TInstant::Now();

ui64 RequestId_ = 1;
ui64 MaxInFlight_ = 0;
ui64 Completed_ = 0;
ui64 Failed_ = 0;
TDuration RequestsLatency_;
};

class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
Expand Down Expand Up @@ -270,8 +298,8 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr
return new TRunScriptActorMock(std::move(request), promise, progressCallback);
}

NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) {
return new TAsyncQueryRunnerActor(inFlightLimit);
NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) {
return new TAsyncQueryRunnerActor(settings);
}

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common.h"

#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>

Expand Down Expand Up @@ -72,7 +74,7 @@ using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgre

NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback);

NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit);
NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings);

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);

Expand Down
17 changes: 15 additions & 2 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,27 @@ namespace NKqpRun {

constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";

struct TAsyncQueriesSettings {
enum class EVerbose {
EachQuery,
Final,
};

ui64 InFlightLimit = 0;
EVerbose Verbose = EVerbose::EachQuery;
};

struct TYdbSetupSettings {
ui32 NodeCount = 1;
TString DomainName = "Root";
TDuration InitializationTimeout = TDuration::Seconds(10);

bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;

bool GrpcEnabled = false;
ui16 GrpcPort = 0;

bool TraceOptEnabled = false;
TString LogOutputFile;

Expand All @@ -29,8 +43,7 @@ struct TYdbSetupSettings {
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
TIntrusivePtr<NYql::IYtGateway> YtGateway;
NKikimrConfig::TAppConfig AppConfig;

ui64 InFlightLimit = 0;
TAsyncQueriesSettings AsyncQueriesSettings;
};


Expand Down
26 changes: 20 additions & 6 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class TYdbSetup::TImpl {
serverSettings.SetFrFactory(functionRegistryFactory);
}

NKikimr::Tests::TServerSettings GetServerSettings() {
ui32 msgBusPort = PortManager_.GetPort();
NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) {
const ui32 msgBusPort = PortManager_.GetPort();

NKikimr::Tests::TServerSettings serverSettings(msgBusPort, Settings_.AppConfig.GetAuthConfig(), Settings_.AppConfig.GetPQConfig());
serverSettings.SetNodeCount(Settings_.NodeCount);
Expand Down Expand Up @@ -152,15 +152,23 @@ class TYdbSetup::TImpl {
serverSettings.SetNeedStatsCollectors(true);
}

if (Settings_.GrpcEnabled) {
serverSettings.SetGrpcPort(grpcPort);
}

return serverSettings;
}

void InitializeServer() {
NKikimr::Tests::TServerSettings serverSettings = GetServerSettings();
void InitializeServer(ui32 grpcPort) {
NKikimr::Tests::TServerSettings serverSettings = GetServerSettings(grpcPort);

Server_ = MakeHolder<NKikimr::Tests::TServer>(serverSettings);
Server_->GetRuntime()->SetDispatchTimeout(TDuration::Max());

if (Settings_.GrpcEnabled) {
Server_->EnableGRpc(grpcPort);
}

Client_ = MakeHolder<NKikimr::Tests::TClient>(serverSettings);
Client_->InitRootScheme();
}
Expand Down Expand Up @@ -204,15 +212,21 @@ class TYdbSetup::TImpl {
: Settings_(settings)
, CoutColors_(NColorizer::AutoColors(Cout))
{
const ui32 grpcPort = Settings_.GrpcPort ? Settings_.GrpcPort : PortManager_.GetPort();

InitializeYqlLogger();
InitializeServer();
InitializeServer(grpcPort);
WaitResourcesPublishing();

if (Settings_.MonitoringEnabled) {
for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) {
Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl;
}
}

if (Settings_.GrpcEnabled) {
Cout << CoutColors_.Cyan() << "gRPC port: " << CoutColors_.Default() << grpcPort << Endl;
}
}

NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TRequestOptions& query) const {
Expand Down Expand Up @@ -274,7 +288,7 @@ class TYdbSetup::TImpl {

void QueryRequestAsync(const TRequestOptions& query) {
if (!AsyncQueryRunnerActorId_) {
AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit));
AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings));
}

auto request = GetQueryRequest(query);
Expand Down

0 comments on commit aa8cfc7

Please sign in to comment.