Skip to content

Commit

Permalink
chore: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Zork33 committed Jun 27, 2024
1 parent 3622d03 commit 258f37f
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 39 deletions.
50 changes: 50 additions & 0 deletions src/__tests__/e2e/topic-service/topic-service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import DiscoveryService from "../../../discovery/discovery-service";
import {ENDPOINT_DISCOVERY_PERIOD} from "../../../constants";
import {AnonymousAuthService} from "../../../credentials/anonymous-auth-service";
import {getDefaultLogger} from "../../../logger/get-default-logger";
import {InternalTopicService} from "../../../topic";

const DATABASE = '/local';
const ENDPOINT = 'grpc://localhost:2136';

describe('Query.execute()', () => {
let discoveryService: DiscoveryService;
let topicService: InternalTopicService;

beforeEach(async () => {
await testOnOneSessionWithoutDriver();
});

afterEach(async () => {
discoveryService.destroy();
await topicService.dispose();
});

it('write: simple', async () => {
// topicService.dispose();

});

it('read: simple', async () => {

});

async function testOnOneSessionWithoutDriver() {
const logger = getDefaultLogger();
const authService = new AnonymousAuthService();
discoveryService = new DiscoveryService({
endpoint: ENDPOINT,
database: DATABASE,
authService,
discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD,
logger,
});
await discoveryService.ready(ENDPOINT_DISCOVERY_PERIOD);
topicService = new InternalTopicService(
await discoveryService.getEndpoint(), // TODO: Should be one per endpoint
DATABASE,
authService,
logger,
);
}
});
49 changes: 22 additions & 27 deletions src/topic/internal-topic-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,7 @@ import ICreateTopicResult = Ydb.Topic.ICreateTopicResult;
import {AuthenticatedService, ClientOptions} from "../utils";
import {IAuthService} from "../credentials/i-auth-service";
import {ISslCredentials} from "../utils/ssl-credentials";
import {InternalTopicWrite, InternalTopicWriteOpts} from "./internal-topic-write";

type GrpcTopicService = Ydb.Topic.V1.TopicService;

/**
* Service methods, as they name in GRPC.
*/
export const enum Query_V1 {
// ExecuteQuery = '/Ydb.Query.V1.QueryService/ExecuteQuery',
}

export interface QuerySessionOperation {
cancel(reason: any): void;
}

export const grpcApiSymbol = Symbol('api');
export const implSymbol = Symbol('impl');
export const attachStreamSymbol = Symbol('attachStream');
import {InternalTopicWrite, InternalTopicWriteOpts, STREAM_DISPOSED} from "./internal-topic-write";

// TODO: Ensure required props in args and results
type CommitOffsetArgs = Ydb.Topic.ICommitOffsetRequest & Required<Pick<Ydb.Topic.ICommitOffsetRequest, 'path'>>;
Expand All @@ -32,39 +15,51 @@ type CommitOffsetResult = Ydb.Topic.CommitOffsetResponse;
type UpdateOffsetsInTransactionArgs = Ydb.Topic.IUpdateOffsetsInTransactionRequest;
type UpdateOffsetsInTransactionResult = Ydb.Topic.UpdateOffsetsInTransactionResponse;

type CreateTopicArgs = Ydb.Topic.ICreateTopicRequest & Required<Pick<Ydb.Topic.ICreateTopicRequest, 'path'>>;;
type CreateTopicArgs = Ydb.Topic.ICreateTopicRequest & Required<Pick<Ydb.Topic.ICreateTopicRequest, 'path'>>;
type CreateTopicResult = Ydb.Topic.CreateTopicResponse;

type DescribeTopicArgs = Ydb.Topic.IDescribeTopicRequest & Required<Pick<Ydb.Topic.IDescribeTopicRequest, 'path'>>;;
type DescribeTopicArgs = Ydb.Topic.IDescribeTopicRequest & Required<Pick<Ydb.Topic.IDescribeTopicRequest, 'path'>>;
type DescribeTopicResult = Ydb.Topic.DescribeTopicResponse;

type DescribeConsumerArgs = Ydb.Topic.IDescribeConsumerRequest & Required<Pick<Ydb.Topic.IDescribeConsumerRequest, 'path'>>;;
type DescribeConsumerArgs = Ydb.Topic.IDescribeConsumerRequest & Required<Pick<Ydb.Topic.IDescribeConsumerRequest, 'path'>>;
type DescribeConsumerResult = Ydb.Topic.DescribeConsumerResponse;

type AlterTopicArgs = Ydb.Topic.IAlterTopicRequest & Required<Pick<Ydb.Topic.IAlterTopicRequest, 'path'>>;;
type AlterTopicArgs = Ydb.Topic.IAlterTopicRequest & Required<Pick<Ydb.Topic.IAlterTopicRequest, 'path'>>;
type AlterTopicResult = Ydb.Topic.AlterTopicResponse;

type DropTopicArgs = Ydb.Topic.IDropTopicRequest & Required<Pick<Ydb.Topic.IDropTopicRequest, 'path'>>;;
type DropTopicArgs = Ydb.Topic.IDropTopicRequest & Required<Pick<Ydb.Topic.IDropTopicRequest, 'path'>>;
type DropTopicResult = Ydb.Topic.DropTopicResponse;


export class InternalTopicService extends AuthenticatedService<GrpcTopicService> implements ICreateTopicResult {
export class InternalTopicService extends AuthenticatedService<Ydb.Topic.V1.TopicService> implements ICreateTopicResult {
public endpoint: Endpoint;
private readonly logger: Logger;
private streams: {dispose(): void}[] = [];

constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) {
const host = endpoint.toString();
super(host, database, 'Ydb.Topic.V1.TopicService', GrpcTopicService, authService, sslCredentials, clientOptions);
super(host, database, 'Ydb.Topic.V1.TopicService', Ydb.Topic.V1.TopicService, authService, sslCredentials, clientOptions);
this.endpoint = endpoint;
this.logger = logger;
}

dispose() {
const streams = this.streams;
this.streams = [];
streams.forEach(s => {s.dispose()});
}

public async streamWrite(opts: InternalTopicWriteOpts) {
await this.updateMetadata();
return new InternalTopicWrite(this, this.logger, opts);
const stream = new InternalTopicWrite(this, this.logger, opts);
this.streams.push(stream);
stream.once(STREAM_DISPOSED, (stream: {dispose: () => {}}) => {
const index = this.streams.findIndex(v => v === stream)
if (index >= 0) this.streams.splice(index, 1);
});
return stream;
}


// public streamWrite(request: Ydb.Topic.StreamWriteMessage.IFromClient): Promise<Ydb.Topic.StreamWriteMessage.FromServer>;
//
// public streamRead(request: Ydb.Topic.StreamReadMessage.IFromClient): Promise<Ydb.Topic.StreamReadMessage.FromServer>;
Expand Down
72 changes: 60 additions & 12 deletions src/topic/internal-topic-write.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,81 @@
import {Logger} from "../logger/simple-logger";
import {Ydb} from "ydb-sdk-proto";
import {InternalTopicService} from "./internal-topic-service";
import StreamWriteMessage = Ydb.Topic.StreamWriteMessage;
import {ClientWritableStream, ServiceError} from "@grpc/grpc-js/src/call";
import FromClient = Ydb.Topic.StreamWriteMessage.FromClient;
import FromServer = Ydb.Topic.StreamWriteMessage.FromServer;
import {ClientWritableStream/*, ServiceError*/} from "@grpc/grpc-js/src/call";
import EventEmitter from "events";

export interface InternalTopicWriteOpts {
}

export class InternalTopicWrite {
type InitArgs =
Ydb.Topic.StreamWriteMessage.IInitRequest
& Required<Pick<Ydb.Topic.StreamWriteMessage.IInitRequest, 'path'>>;

type WriteArgs =
Ydb.Topic.StreamWriteMessage.IWriteRequest
& Required<Pick<Ydb.Topic.StreamWriteMessage.IWriteRequest, 'messages'>>;

type UpdateTokenArgs = Ydb.Topic.UpdateTokenRequest & Required<Pick<Ydb.Topic.UpdateTokenRequest, 'token'>>;

export const STREAM_DISPOSED = 'stream-disposed';

export class InternalTopicWrite extends EventEmitter {
// @ts-ignore
private writeStream: ClientWritableStream<StreamWriteMessage.FromClient>;
private writeStream?: ClientWritableStream<FromClient>;

constructor(
private topicService: InternalTopicService,
// @ts-ignore
private logger: Logger,
// @ts-ignore
private opts: InternalTopicWriteOpts) {
this.writeStream = this.topicService.grpcClient!
.makeClientStreamRequest(
super();
const stream = this.topicService.grpcClient!
.makeClientStreamRequest<FromClient, FromServer>(
'/Ydb.Topic.V1.TopicService/StreamWrite',
(v: StreamWriteMessage.FromClient) => StreamWriteMessage.FromServer.encode(v).finish() as Buffer,
StreamWriteMessage.FromServer.decode,
(v: FromClient) => FromClient.encode(v).finish() as Buffer,
FromServer.decode,
this.topicService.metadata,
(err: ServiceError | null, value?: StreamWriteMessage.FromServer) => {
(_err: any /* ServiceError */, _value?: FromServer) => {
// TODO: process
}
)
console.info(1000, _value);
});
this.writeStream = stream as ClientWritableStream<FromClient>; // 'as' is here as a quick solution of the fact that TS generates error here
}

public init(opts: InitArgs) {
if (this.writeStream)
this.writeStream.write(
FromClient.create({
initRequest: Ydb.Topic.StreamWriteMessage.InitRequest.create(opts),
}));
}

public write(opts: WriteArgs) {
if (this.writeStream)
this.writeStream.write(
FromClient.create({
writeRequest: Ydb.Topic.StreamWriteMessage.WriteRequest.create(opts),
}));
}

public updateToken(opts: UpdateTokenArgs) {
if (this.writeStream)
this.writeStream.write(
FromClient.create({
updateTokenRequest: Ydb.Topic.UpdateTokenRequest.create(opts),
}));
}

public dispose() {
if (this.writeStream) {
this.writeStream.end();
delete this.writeStream;
this.emit(STREAM_DISPOSED, this);
}
}

// TODO: Regular update token
// TODO: Update token when the auth provider returns a new one
}

0 comments on commit 258f37f

Please sign in to comment.