Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ added grpc endpoint into kqprun #7231

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
}
}

if (runnerOptions.YdbSettings.MonitoringEnabled) {
if (runnerOptions.YdbSettings.MonitoringEnabled || runnerOptions.YdbSettings.GrpcEnabled) {
RunAsDaemon();
}

Expand Down Expand Up @@ -452,8 +452,17 @@ class TMain : public TMainClassArgs {
});
options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
.RequiredArgument("uint")
.DefaultValue(RunnerOptions.YdbSettings.InFlightLimit)
.StoreResult(&RunnerOptions.YdbSettings.InFlightLimit);
.DefaultValue(RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit)
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
.StoreResult(&RunnerOptions.YdbSettings.AsyncQueriesSettings.InFlightLimit);
TChoices<NKqpRun::TAsyncQueriesSettings::EVerbose> verbose({
{"each-query", NKqpRun::TAsyncQueriesSettings::EVerbose::EachQuery},
{"final", NKqpRun::TAsyncQueriesSettings::EVerbose::Final}
});
options.AddLongOption("async-verbose", "Verbose type for async queries")
.RequiredArgument("type")
.DefaultValue("each-query")
.Choices(verbose.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.YdbSettings.AsyncQueriesSettings.Verbose, verbose);

TChoices<NKikimrKqp::EQueryAction> scriptAction({
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},
Expand Down Expand Up @@ -510,6 +519,15 @@ class TMain : public TMainClassArgs {
}
});

options.AddLongOption('G', "grpc", "gRPC port (use 0 to start on random free port), if used kqprun will be runs as daemon")
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
.RequiredArgument("uint")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
if (const TString& port = option->CurVal()) {
RunnerOptions.YdbSettings.GrpcEnabled = true;
RunnerOptions.YdbSettings.GrpcPort = FromString(port);
}
});

options.AddLongOption('E', "emulate-yt", "Emulate YT tables (use file gateway instead of native gateway)")
.NoArgument()
.SetFlag(&EmulateYt);
Expand All @@ -529,7 +547,7 @@ class TMain : public TMainClassArgs {
}

int DoRun(NLastGetopt::TOptsParseResult&&) override {
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) {
if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute";
}

Expand Down
56 changes: 42 additions & 14 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,17 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
using TBase = NActors::TActor<TAsyncQueryRunnerActor>;

struct TRequestInfo {
TInstant StartTime;
NThreading::TFuture<TQueryResponse> RequestFuture;
};

public:
TAsyncQueryRunnerActor(ui64 inFlightLimit)
TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings)
: TBase(&TAsyncQueryRunnerActor::StateFunc)
, InFlightLimit_(inFlightLimit)
, Settings_(settings)
{
RunningRequests_.reserve(InFlightLimit_);
RunningRequests_.reserve(Settings_.InFlightLimit);
}

STRICT_STFUNC(StateFunc,
Expand All @@ -123,21 +128,29 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {

void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
const ui64 requestId = ev->Get()->RequestId;
RequestsLatency += TInstant::Now() - RunningRequests_[requestId].StartTime;
RunningRequests_.erase(requestId);

const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef();
const auto status = response.GetYdbStatus();

if (status == Ydb::StatusIds::SUCCESS) {
Completed_++;
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
}
} else {
Failed_++;
NYql::TIssues issues;
NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues);
Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default();
}

if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime > TDuration::Seconds(1)) {
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
LastReportTime = TInstant::Now();
}

StartDelayedRequests();
TryFinalize();
}
Expand All @@ -151,18 +164,23 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {

private:
void StartDelayedRequests() {
while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) {
while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) {
auto request = std::move(DelayedRequests_.front());
DelayedRequests_.pop();

auto promise = NThreading::NewPromise<TQueryResponse>();
Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr));
RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
});
RunningRequests_[RequestId_] = {
.StartTime = TInstant::Now(),
.RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
})
};

MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
}

RequestId_++;
request->Get()->StartPromise.SetValue();
Expand All @@ -174,28 +192,38 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
return false;
}

if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) {
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
}

FinalizePromise_->SetValue();
PassAway();
return true;
}

TString GetInfoString() const {
return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
if (const auto amountRequests = Completed_ + Failed_) {
result << ", average latency: " << RequestsLatency / amountRequests;
}
return result;
}

private:
const ui64 InFlightLimit_;
const TAsyncQueriesSettings Settings_;
const TInstant StartTime_ = TInstant::Now();
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);

std::optional<NThreading::TPromise<void>> FinalizePromise_;
std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
std::unordered_map<ui64, NThreading::TFuture<TQueryResponse>> RunningRequests_;
std::unordered_map<ui64, TRequestInfo> RunningRequests_;
TInstant LastReportTime = TInstant::Now();

ui64 RequestId_ = 1;
ui64 MaxInFlight_ = 0;
ui64 Completed_ = 0;
ui64 Failed_ = 0;
TDuration RequestsLatency;
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
};

class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
Expand Down Expand Up @@ -270,8 +298,8 @@ NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPr
return new TRunScriptActorMock(std::move(request), promise, progressCallback);
}

NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) {
return new TAsyncQueryRunnerActor(inFlightLimit);
NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) {
return new TAsyncQueryRunnerActor(settings);
}

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common.h"

#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>

Expand Down Expand Up @@ -72,7 +74,7 @@ using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgre

NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback);

NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit);
NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings);

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);

Expand Down
17 changes: 15 additions & 2 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,27 @@ namespace NKqpRun {

constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";

struct TAsyncQueriesSettings {
enum class EVerbose {
EachQuery,
Final,
};

ui64 InFlightLimit = 0;
EVerbose Verbose = EVerbose::EachQuery;
};

struct TYdbSetupSettings {
ui32 NodeCount = 1;
TString DomainName = "Root";
TDuration InitializationTimeout = TDuration::Seconds(10);

bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;

bool GrpcEnabled = false;
ui16 GrpcPort = 0;

bool TraceOptEnabled = false;
TString LogOutputFile;

Expand All @@ -29,8 +43,7 @@ struct TYdbSetupSettings {
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
TIntrusivePtr<NYql::IYtGateway> YtGateway;
NKikimrConfig::TAppConfig AppConfig;

ui64 InFlightLimit = 0;
TAsyncQueriesSettings AsyncQueriesSettings;
};


Expand Down
26 changes: 20 additions & 6 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class TYdbSetup::TImpl {
serverSettings.SetFrFactory(functionRegistryFactory);
}

NKikimr::Tests::TServerSettings GetServerSettings() {
ui32 msgBusPort = PortManager_.GetPort();
NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) {
const ui32 msgBusPort = PortManager_.GetPort();

NKikimr::Tests::TServerSettings serverSettings(msgBusPort, Settings_.AppConfig.GetAuthConfig(), Settings_.AppConfig.GetPQConfig());
serverSettings.SetNodeCount(Settings_.NodeCount);
Expand Down Expand Up @@ -152,15 +152,23 @@ class TYdbSetup::TImpl {
serverSettings.SetNeedStatsCollectors(true);
}

if (Settings_.GrpcEnabled) {
serverSettings.SetGrpcPort(grpcPort);
}

return serverSettings;
}

void InitializeServer() {
NKikimr::Tests::TServerSettings serverSettings = GetServerSettings();
void InitializeServer(ui32 grpcPort) {
NKikimr::Tests::TServerSettings serverSettings = GetServerSettings(grpcPort);

Server_ = MakeHolder<NKikimr::Tests::TServer>(serverSettings);
Server_->GetRuntime()->SetDispatchTimeout(TDuration::Max());

if (Settings_.GrpcEnabled) {
Server_->EnableGRpc(grpcPort);
}

Client_ = MakeHolder<NKikimr::Tests::TClient>(serverSettings);
Client_->InitRootScheme();
}
Expand Down Expand Up @@ -204,15 +212,21 @@ class TYdbSetup::TImpl {
: Settings_(settings)
, CoutColors_(NColorizer::AutoColors(Cout))
{
const ui32 grpcPort = Settings_.GrpcPort ? Settings_.GrpcPort : PortManager_.GetPort();

InitializeYqlLogger();
InitializeServer();
InitializeServer(grpcPort);
WaitResourcesPublishing();

if (Settings_.MonitoringEnabled) {
for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) {
Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl;
}
}

if (Settings_.GrpcEnabled) {
Cout << CoutColors_.Cyan() << "gRPC port: " << CoutColors_.Default() << grpcPort << Endl;
}
}

NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TRequestOptions& query) const {
Expand Down Expand Up @@ -274,7 +288,7 @@ class TYdbSetup::TImpl {

void QueryRequestAsync(const TRequestOptions& query) {
if (!AsyncQueryRunnerActorId_) {
AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit));
AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.AsyncQueriesSettings));
}

auto request = GetQueryRequest(query);
Expand Down
Loading