diff --git a/config/jest.development.js b/config/jest.development.js index f6d3c178..44bf635f 100644 --- a/config/jest.development.js +++ b/config/jest.development.js @@ -4,7 +4,7 @@ module.exports = { tsconfig: 'tsconfig-cjs.json' } }, - roots: ['/src'], + roots: ['/../src'], transform: { '^.+\\.tsx?$': 'ts-jest', }, diff --git a/eslint-local-rules/examples/ex21.ts b/eslint-local-rules/examples/ex21.ts index f6d0d3b4..177fb6a2 100644 --- a/eslint-local-rules/examples/ex21.ts +++ b/eslint-local-rules/examples/ex21.ts @@ -1,6 +1,7 @@ /* eslint local-rules/context: "error" */ // local -rules/context: trace, no-root +import { ContextWithLogger } from '../../src/context-with-logger'; class A { // local-rules/context: trace @@ -21,9 +22,9 @@ class A { // super(); const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:...eslint-local-rules.examples.A.constructor', ''); - console.info(2000, await (() => { + console.info(2000, ctx.doSync(() => await(() => { F(); - })()); + }))()); // const WW = () => { // // local-rules/context: root trace diff --git a/eslint-local-rules/examples/ex22.ts b/eslint-local-rules/examples/ex22.ts new file mode 100644 index 00000000..590f0d67 --- /dev/null +++ b/eslint-local-rules/examples/ex22.ts @@ -0,0 +1,28 @@ +/* eslint local-rules/context: "error" */ + +export class SessionService extends AuthenticatedService { + // public endpoint: Endpoint; + // private readonly logger: Logger; + + // eslint-disable-next-line max-len + constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionService.constructor', logger); + const host = ctx.doSync(() => endpoint.toString()); + // + // super(host, database, 'Ydb.Table.V1.TableService', TableService, authService, sslCredentials, clientOptions); + // this.endpoint = endpoint; + // this.logger = logger; + } + + // @retryable() + // @pessimizable + // async create(): Promise { + // // const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionService.create', this); + // // const response = await ctx.do(() => this.api.createSession(CreateSessionRequest.create())); + // // const payload = ctx.doSync(() => getOperationPayload(response)); + // // const { sessionId } = ctx.doSync(() => CreateSessionResult.decode(payload)); + // + // // eslint-disable-next-line @typescript-eslint/no-use-before-define + // return new Session(this.api, this.endpoint, sessionId, ctx.logger, ctx.doSync(() => this.getResponseMetadata.bind(this))); + // } +} diff --git a/eslint-local-rules/rules/context.js b/eslint-local-rules/rules/context.js index e98bc9e1..0d757e62 100644 --- a/eslint-local-rules/rules/context.js +++ b/eslint-local-rules/rules/context.js @@ -1,6 +1,8 @@ -// TODO: Ensure it's after 'super()' // TODO: Would optimize an result, if we will trace that object being created from IGNORED package // TODO: do not wrap .bind() +// TODO: wrap whole body of sync func +// TODO: span directives +// TODO: wrapper for async func should have async const path = require('path'); const _ = require('lodash'); @@ -147,12 +149,13 @@ module.exports = { programNode = node; const opts = {}; // console.info(5000, context.sourceCode.getCommentsBefore(node)) - getAnnotations(node, opts, context.sourceCode.getCommentsBefore(node)); + getAnnotations(node, opts, node, false); pushToStack(STATE_PROGRAM, null, opts); }, 'Program:exit'(node) { debug('Program:exit'); - // fixImportContext(context, node); + fixImportContext(context, node); + popFromStack(); }, 'ImportDeclaration'(node) { debug('ImportDeclaration'); @@ -190,7 +193,6 @@ module.exports = { // insert const ctx = ... only after super(), since it's not allowed use this.before if (CTX_AFTER_SUPER && node.callee.type === 'Super') { state.ctxInsertPoint = node.range[1]; - return; } // context initialization @@ -216,8 +218,8 @@ module.exports = { 'ClassDeclaration'(node) { debug('ClassDeclaration'); const opts = {}; - getAnnotations(node, opts, context.sourceCode.getCommentsAfter(context.sourceCode.getFirstToken(node.body /* ClassBody */))); - pushToStack(STATE_CLASS, node.id.name, {}); + getAnnotations(node, opts, context.sourceCode.getFirstToken(node.body /* ClassBody */), true); + pushToStack(STATE_CLASS, node.id.name, opts); }, 'ClassDeclaration:exit'(node) { debug('ClassDeclaration:exit'); @@ -228,15 +230,16 @@ module.exports = { 'ArrowFunctionExpression'(node) { debug('ArrowFunctionExpression'); - if (node.parent.type === 'CallExpression' && node.parent.callee.object?.name === 'ctx') return; // it's arrow function in ctx.do(() => ...) - const opts = {async: node.async}; opts.block = node.body.type === 'BlockStatement'; getAnnotations(node, opts, opts.block - ? context.sourceCode.getCommentsAfter(context.sourceCode.getFirstToken(node.body /* BlockStatement */)) - : context.sourceCode.getCommentsBefore(node.body /* Literal, BinaryExpression, CallExpression ... */)); - if (node.parent.type === 'PropertyDefinition') { + ? context.sourceCode.getFirstToken(node.body /* BlockStatement */) + : node.body /* Literal, BinaryExpression, CallExpression ... */, opts.block); + + if (node.parent.type === 'CallExpression' && node.parent.callee.object?.name === 'ctx') { + pushToStack(STATE_FUNC, null, {...opts, skip: true}); // it's arrow function in ctx.do(() => ...) + } else if (node.parent.type === 'PropertyDefinition') { if ('accessibility' in node.parent) opts.accessibility = node.parent.accessibility; // TODO: Convert to key opts pushToStack(node.parent.static && !opts.decorator ? STATE_FUNC : STATE_METHOD, node.parent.key.name, opts); } else if (node.parent.type === 'VariableDeclarator' && node.parent.parent.type === 'VariableDeclaration' && node.parent.parent.kind === 'const') { @@ -248,13 +251,13 @@ module.exports = { }, 'ArrowFunctionExpression:exit'(node) { debug('ArrowFunctionExpression:exit'); - fixGetContext(node); + if (!state.skip) fixGetContext(node); popFromStack(); }, 'FunctionDeclaration'(node) { debug('FunctionDeclaration'); const opts = {async: node.async}; - getAnnotations(node, opts, context.sourceCode.getCommentsAfter(context.sourceCode.getFirstToken(node.body /* BlockStatement */))); + getAnnotations(node, opts, context.sourceCode.getFirstToken(node.body /* BlockStatement */), true); pushToStack(STATE_FUNC, node.id.name, opts); }, 'FunctionDeclaration:exit'(node) { @@ -265,7 +268,7 @@ module.exports = { 'FunctionExpression'(node) { debug('FunctionExpression'); const opts = {async: node.async}; - getAnnotations(node, opts, context.sourceCode.getCommentsAfter(context.sourceCode.getFirstToken(node.body /* BlockStatement */))); + getAnnotations(node, opts, context.sourceCode.getFirstToken(node.body /* BlockStatement */), true); if (node.parent.type === 'MethodDefinition') { // method of a class if ('accessibility' in node.parent) opts.accessibility = node.parent.accessibility; pushToStack(node.parent.static && !opts.decorator ? STATE_FUNC : STATE_METHOD, @@ -367,7 +370,6 @@ module.exports = { function fixGetContext(node) { debug('fixGetContext()'); - console.info(1000, rootFuncState !== state, !state.trace, state) if (rootFuncState !== state || !state.trace) { // this level is not suppose to has ctx declaration if (state.ctxNode) { context.report({ @@ -428,65 +430,6 @@ module.exports = { } } - function fixSetTimeoutAndSetInterval(node) { - debug('fixSetTimeoutAndSetInterval()'); - const fnOrAwaitNode = node.arguments[0]; - - rootFuncState.hasCtx = true; - - const fnNode = fnOrAwaitNode.type === 'AwaitExpression' ? fnOrAwaitNode.argument : fnOrAwaitNode; - - const isAlreadyWrapped = fnNode.type === 'CallExpression' - && (fnNode.callee.name === 'ctx' || fnNode.callee.object?.name === 'ctx'); - - const wrappedArgumentNode = isAlreadyWrapped ? fnNode.arguments[0] : undefined; - - const callWithoutWrapInParentheses = !!(wrappedArgumentNode - || fnNode.type === 'Identifier' - || fnOrAwaitNode.type === 'CallExpression'); - - const before = wrappedArgumentNode - ? 'ctx.doHandleError(' - : callWithoutWrapInParentheses - ? 'ctx.doHandleError(() => ' - : 'ctx.doHandleError(() => ('; - const after = wrappedArgumentNode ? ')' : callWithoutWrapInParentheses ? '())' : ')())'; - - // if (debug.enabled) { - // console.info(200, 'fnOrAwaitNode', context.sourceCode.getText(fnOrAwaitNode)) - // console.info(210, 'fnNode', context.sourceCode.getText(fnNode)) - // console.info(220, 'isAlreadyWrapped', isAlreadyWrapped) - // console.info(230, 'wrappedArgumentNode', wrappedArgumentNode ? context.sourceCode.getText(wrappedArgumentNode) : false) - // console.info(240, 'callWithoutWrapInParentheses', callWithoutWrapInParentheses) - // console.info(250, 'before', before) - // console.info(260, 'after', after) - // } - - let fnText = context.sourceCode.getText(fnOrAwaitNode); - if (fnText.endsWith(';')) fnText = fnText.substring(0, fnText.length - 1); - if (fnText.startsWith(before) && fnText.endsWith(after)) return; // already correctly wrapped - - // if (debug.enabled) { - // console.info(280, 'fix', context.sourceCode.getText(fnOrAwaitNode)); - // console.info(290, 'by', `${before}${wrappedArgumentNode - // ? context.sourceCode.getText(wrappedArgumentNode) - // : context.sourceCode.getText(fnOrAwaitNode)}${after}`); - // } - - context.report({ - node: fnOrAwaitNode, - message: wrappedArgumentNode ? 'Fix' : `Add {{before}}...{{after}}`, - data: { - before, - after, - }, - fix: (fixer) => fixer.replaceText(fnOrAwaitNode, - `${before}${wrappedArgumentNode - ? context.sourceCode.getText(wrappedArgumentNode) - : context.sourceCode.getText(fnOrAwaitNode)}${after}`), - }); - } - /** * Adds, removes or updates import CONTEXT_CLASS from 'CONTEXT_CLASS_PATH'; depending on anyContextInFile variable. */ @@ -502,11 +445,7 @@ module.exports = { const importContext = `import { ${CONTEXT_CLASS} } from '${classPath}'`; - // if (debug.enabled) { - // console.info(700, 'anyContextInFile', anyContextInFile) - // console.info(710, 'importContext', importContext) - // console.info(720, 'importContextNode', !!importContextNode) - // } + debug('importContextNode: %s, anyContextInFile: %s', !!importContextNode, !!anyContextInFile); if (importContextNode) { // remove import @@ -550,6 +489,48 @@ module.exports = { } } + function fixSetTimeoutAndSetInterval(node) { + debug('fixSetTimeoutAndSetInterval()'); + const fnOrAwaitNode = node.arguments[0]; + + rootFuncState.hasCtx = true; + + const fnNode = fnOrAwaitNode.type === 'AwaitExpression' ? fnOrAwaitNode.argument : fnOrAwaitNode; + + const isAlreadyWrapped = fnNode.type === 'CallExpression' + && (fnNode.callee.name === 'ctx' || fnNode.callee.object?.name === 'ctx'); + + const wrappedArgumentNode = isAlreadyWrapped ? fnNode.arguments[0] : undefined; + + const callWithoutWrapInParentheses = !!(wrappedArgumentNode + || fnNode.type === 'Identifier' + || fnOrAwaitNode.type === 'CallExpression'); + + const before = wrappedArgumentNode + ? 'ctx.doHandleError(' + : callWithoutWrapInParentheses + ? 'ctx.doHandleError(() => ' + : 'ctx.doHandleError(() => ('; + const after = wrappedArgumentNode ? ')' : callWithoutWrapInParentheses ? '())' : ')())'; + + let fnText = context.sourceCode.getText(fnOrAwaitNode); + if (fnText.endsWith(';')) fnText = fnText.substring(0, fnText.length - 1); + if (fnText.startsWith(before) && fnText.endsWith(after)) return; // already correctly wrapped + + context.report({ + node: fnOrAwaitNode, + message: wrappedArgumentNode ? 'Fix' : `Add {{before}}...{{after}}`, + data: { + before, + after, + }, + fix: (fixer) => fixer.replaceText(fnOrAwaitNode, + `${before}${wrappedArgumentNode + ? context.sourceCode.getText(wrappedArgumentNode) + : context.sourceCode.getText(fnOrAwaitNode)}${after}`), + }); + } + function fixThisLogger(node) { debug('fixThisLogger()'); @@ -607,16 +588,17 @@ module.exports = { if (node.type === 'MemberExpression') return getLeftmostName(node.object); } - function getAnnotations(node, opts, comments) { + function getAnnotations(node, opts, commentsNode, commentsAfter) { + const comments = context[commentsAfter ? 'getCommentsAfter' : 'getCommentsBefore'](commentsNode); for (const comment of comments) { const r = /^\s*local-rules\/context:(.*)$/gm.exec(comment.value); if (r) { - for (const attr of [...r[1] - .matchAll(/[\s,]*([\w\-_]*)/g)] + for (const attr of [...r[1].matchAll(/[\s,]*([\w\-_]*)/g)] .reduce((a, v) => { if (v[1]) a.push(v[1]); return a; }, [])) { const itsNo = attr.startsWith('no-'); const val = _.camelCase(_.kebabCase(itsNo ? attr.substring(3) : attr)); + if (!~OPTS_KEYS.indexOf(val)) { context.report({ node, @@ -629,7 +611,7 @@ module.exports = { } } - opts.ctxInsertPoint = (comments.length > 0 ? comments[comments.length - 1] : node).range[1]; + opts.ctxInsertPoint = (comments.length > 0 ? comments[comments.length - 1] : commentsNode).range[1]; } }, }; diff --git a/src/table.ts b/src/table.ts index b21e6e3a..78ec18bf 100644 --- a/src/table.ts +++ b/src/table.ts @@ -1,4 +1,4 @@ -// eslint local-rules/context: "error" +// /* eslint local-rules/context: "error" */ // eslint-disable-next-line max-classes-per-file import _ from 'lodash'; @@ -31,6 +31,7 @@ import { YdbError, MissingStatus, } from './errors'; +import { ContextWithLogger } from './context-with-logger'; import TableService = Ydb.Table.V1.TableService; import CreateSessionRequest = Ydb.Table.CreateSessionRequest; @@ -66,11 +67,13 @@ interface PartialResponse { export class SessionService extends AuthenticatedService { public endpoint: Endpoint; + // @ts-ignore private readonly logger: Logger; // eslint-disable-next-line max-len constructor(endpoint: Endpoint, database: string, authService: IAuthService, logger: Logger, sslCredentials?: ISslCredentials, clientOptions?: ClientOptions) { - const host = endpoint.toString(); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionService.constructor', logger); + const host = ctx.doSync(() => endpoint.toString()); super(host, database, 'Ydb.Table.V1.TableService', TableService, authService, sslCredentials, clientOptions); this.endpoint = endpoint; @@ -80,12 +83,13 @@ export class SessionService extends AuthenticatedService { @retryable() @pessimizable async create(): Promise { - const response = await this.api.createSession(CreateSessionRequest.create()); - const payload = getOperationPayload(response); - const { sessionId } = CreateSessionResult.decode(payload); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionService.create', this); + const response = await ctx.do(() => this.api.createSession(ctx.doSync(() => CreateSessionRequest.create()))); + const payload = ctx.doSync(() => getOperationPayload(response)); + const { sessionId } = ctx.doSync(() => CreateSessionResult.decode(payload)); // eslint-disable-next-line @typescript-eslint/no-use-before-define - return new Session(this.api, this.endpoint, sessionId, this.logger, this.getResponseMetadata.bind(this)); + return new Session(this.api, this.endpoint, sessionId, ctx.logger, ctx.doSync(() => this.getResponseMetadata.bind(this))); } } @@ -191,6 +195,8 @@ interface IDropTableSettings { muteNonExistingTableErrors: boolean; } export class DropTableSettings extends OperationParamsSettings { + // local-rules/context: no-trace + muteNonExistingTableErrors: boolean; constructor({ muteNonExistingTableErrors = true } = {} as IDropTableSettings) { @@ -359,51 +365,66 @@ export class Session extends EventEmitter implements ICreateSessionResult { private api: TableService, public endpoint: Endpoint, public sessionId: string, + // @ts-ignore private logger: Logger, private getResponseMetadata: (request: object) => grpc.Metadata | undefined, ) { super(); + ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.constructor', this); } acquire() { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.acquire', this); + this.free = false; - this.logger.debug(`Acquired session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); + ctx.logger.debug(`Acquired session ${this.sessionId} on endpoint ${ctx.doSync(() => this.endpoint.toString())}.`); return this; } release() { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.release', this); + this.free = true; - this.logger.debug(`Released session ${this.sessionId} on endpoint ${this.endpoint.toString()}.`); - this.emit(SessionEvent.SESSION_RELEASE, this); + ctx.logger.debug(`Released session ${this.sessionId} on endpoint ${ctx.doSync(() => this.endpoint.toString())}.`); + ctx.doSync(() => this.emit(SessionEvent.SESSION_RELEASE, this)); } public isFree() { - return this.free && !this.isDeleted(); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.isFree', this); + + return this.free && !ctx.doSync(() => this.isDeleted()); } public isClosing() { + ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.isClosing', this); + return this.closing; } public isDeleted() { + ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.isDeleted', this); + return this.beingDeleted; } @retryable() @pessimizable public async delete(): Promise { - if (this.isDeleted()) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.delete', this); + + if (ctx.doSync(() => this.isDeleted())) { return; } this.beingDeleted = true; - ensureOperationSucceeded(await this.api.deleteSession({ sessionId: this.sessionId })); + await ctx.do(async () => ensureOperationSucceeded(await ctx.do(() => this.api.deleteSession({ sessionId: this.sessionId })))); } @retryable() @pessimizable public async keepAlive(): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.keepAlive', this); const request = { sessionId: this.sessionId }; - const response = await this.api.keepAlive(request); + const response = await ctx.do(() => this.api.keepAlive(request)); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + ctx.doSync(() => ensureOperationSucceeded(ctx.doSync(() => this.processResponseMetadata(request, response)))); } @retryable() @@ -413,6 +434,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { description: TableDescription, settings?: CreateTableSettings, ): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.createTable', this); const request: Ydb.Table.ICreateTableRequest = { ...description, sessionId: this.sessionId, @@ -422,9 +444,9 @@ export class Session extends EventEmitter implements ICreateSessionResult { if (settings) { request.operationParams = settings.operationParams; } - const response = await this.api.createTable(request); + const response = await ctx.do(() => this.api.createTable(request)); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + ctx.doSync(() => ensureOperationSucceeded(ctx.doSync(() => this.processResponseMetadata(request, response)))); } @retryable() @@ -434,6 +456,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { description: AlterTableDescription, settings?: AlterTableSettings, ): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.alterTable', this); const request: Ydb.Table.IAlterTableRequest = { ...description, sessionId: this.sessionId, @@ -444,10 +467,10 @@ export class Session extends EventEmitter implements ICreateSessionResult { request.operationParams = settings.operationParams; } - const response = await this.api.alterTable(request); + const response = await ctx.do(() => this.api.alterTable(request)); try { - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + ctx.doSync(() => ensureOperationSucceeded(ctx.doSync(() => this.processResponseMetadata(request, response)))); } catch (error) { // !! does not returns response status if async operation mode if (request.operationParams?.operationMode !== OperationMode.SYNC && error instanceof MissingStatus) return; @@ -456,12 +479,13 @@ export class Session extends EventEmitter implements ICreateSessionResult { } /* - Drop table located at `tablePath` in the current database. By default dropping non-existent tables does not - throw an error, to throw an error pass `new DropTableSettings({muteNonExistingTableErrors: true})` as 2nd argument. - */ + Drop table located at `tablePath` in the current database. By default dropping non-existent tables does not + throw an error, to throw an error pass `new DropTableSettings({muteNonExistingTableErrors: true})` as 2nd argument. + */ @retryable() @pessimizable public async dropTable(tablePath: string, settings?: DropTableSettings): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.dropTable', this); const request: Ydb.Table.IDropTableRequest = { sessionId: this.sessionId, path: `${this.endpoint.database}/${tablePath}`, @@ -473,14 +497,15 @@ export class Session extends EventEmitter implements ICreateSessionResult { // eslint-disable-next-line no-param-reassign settings = settings || new DropTableSettings(); const suppressedErrors = settings?.muteNonExistingTableErrors ? [SchemeError.status] : []; - const response = await this.api.dropTable(request); + const response = await ctx.do(() => this.api.dropTable(request)); - ensureOperationSucceeded(this.processResponseMetadata(request, response), suppressedErrors); + ctx.doSync(() => ensureOperationSucceeded(ctx.doSync(() => this.processResponseMetadata(request, response)), suppressedErrors)); } @retryable() @pessimizable public async describeTable(tablePath: string, settings?: DescribeTableSettings): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.describeTable', this); const request: Ydb.Table.IDescribeTableRequest = { sessionId: this.sessionId, path: `${this.endpoint.database}/${tablePath}`, @@ -494,10 +519,10 @@ export class Session extends EventEmitter implements ICreateSessionResult { request.operationParams = settings.operationParams; } - const response = await this.api.describeTable(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const response = await ctx.do(() => this.api.describeTable(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response)))); - return DescribeTableResult.decode(payload); + return ctx.doSync(() => DescribeTableResult.decode(payload)); } @retryable() @@ -505,14 +530,15 @@ export class Session extends EventEmitter implements ICreateSessionResult { public async describeTableOptions( settings?: DescribeTableSettings, ): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.describeTableOptions', this); const request: Ydb.Table.IDescribeTableOptionsRequest = { operationParams: settings?.operationParams, }; - const response = await this.api.describeTableOptions(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const response = await ctx.do(() => this.api.describeTableOptions(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response)))); - return Ydb.Table.DescribeTableOptionsResult.decode(payload); + return ctx.doSync(() => Ydb.Table.DescribeTableOptionsResult.decode(payload)); } @retryable() @@ -521,6 +547,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { txSettings: ITransactionSettings, settings?: BeginTransactionSettings, ): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.beginTransaction', this); const request: Ydb.Table.IBeginTransactionRequest = { sessionId: this.sessionId, txSettings, @@ -529,9 +556,9 @@ export class Session extends EventEmitter implements ICreateSessionResult { if (settings) { request.operationParams = settings.operationParams; } - const response = await this.api.beginTransaction(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); - const { txMeta } = BeginTransactionResult.decode(payload); + const response = await ctx.do(() => this.api.beginTransaction(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response)))); + const { txMeta } = ctx.doSync(() => BeginTransactionResult.decode(payload)); if (txMeta) { return txMeta; @@ -542,6 +569,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { @retryable() @pessimizable public async commitTransaction(txControl: IExistingTransaction, settings?: CommitTransactionSettings): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.commitTransaction', this); const request: Ydb.Table.ICommitTransactionRequest = { sessionId: this.sessionId, txId: txControl.txId, @@ -551,14 +579,15 @@ export class Session extends EventEmitter implements ICreateSessionResult { request.operationParams = settings.operationParams; request.collectStats = settings.collectStats; } - const response = await this.api.commitTransaction(request); + const response = await ctx.do(() => this.api.commitTransaction(request)); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + ctx.doSync(() => ensureOperationSucceeded(ctx.doSync(() => this.processResponseMetadata(request, response)))); } @retryable() @pessimizable public async rollbackTransaction(txControl: IExistingTransaction, settings?: RollbackTransactionSettings): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.rollbackTransaction', this); const request: Ydb.Table.IRollbackTransactionRequest = { sessionId: this.sessionId, txId: txControl.txId, @@ -567,14 +596,15 @@ export class Session extends EventEmitter implements ICreateSessionResult { if (settings) { request.operationParams = settings.operationParams; } - const response = await this.api.rollbackTransaction(request); + const response = await ctx.do(() => this.api.rollbackTransaction(request)); - ensureOperationSucceeded(this.processResponseMetadata(request, response)); + ctx.doSync(() => ensureOperationSucceeded(ctx.doSync(() => this.processResponseMetadata(request, response)))); } @retryable() @pessimizable public async prepareQuery(queryText: string, settings?: PrepareQuerySettings): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.prepareQuery', this); const request: Ydb.Table.IPrepareDataQueryRequest = { sessionId: this.sessionId, yqlText: queryText, @@ -583,10 +613,10 @@ export class Session extends EventEmitter implements ICreateSessionResult { if (settings) { request.operationParams = settings.operationParams; } - const response = await this.api.prepareDataQuery(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const response = await ctx.do(() => this.api.prepareDataQuery(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response)))); - return PrepareQueryResult.decode(payload); + return ctx.doSync(() => PrepareQueryResult.decode(payload)); } @pessimizable @@ -596,8 +626,10 @@ export class Session extends EventEmitter implements ICreateSessionResult { txControl: IExistingTransaction | INewTransaction = AUTO_TX, settings?: ExecuteQuerySettings, ): Promise { - this.logger.trace('preparedQuery %o', query); - this.logger.trace('parameters %o', params); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.executeQuery', this); + + ctx.logger.trace('preparedQuery %o', query); + ctx.logger.trace('parameters %o', params); let queryToExecute: IQuery; let keepInCache = false; @@ -627,10 +659,10 @@ export class Session extends EventEmitter implements ICreateSessionResult { if (keepInCache) { request.queryCachePolicy = { keepInCache }; } - const response = await this.api.executeDataQuery(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata)); + const response = await ctx.do(() => this.api.executeDataQuery(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response, settings?.onResponseMetadata)))); - return ExecuteQueryResult.decode(payload); + return ctx.doSync(() => ExecuteQueryResult.decode(payload)); } private processResponseMetadata( @@ -638,15 +670,16 @@ export class Session extends EventEmitter implements ICreateSessionResult { response: AsyncResponse, onResponseMetadata?: (metadata: grpc.Metadata) => void, ) { - const metadata = this.getResponseMetadata(request); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.processResponseMetadata', this); + const metadata = ctx.doSync(() => this.getResponseMetadata(request)); if (metadata) { - const serverHints = metadata.get(ResponseMetadataKeys.ServerHints) || []; + const serverHints = ctx.doSync(() => metadata.get(ResponseMetadataKeys.ServerHints)) || []; - if (serverHints.includes('session-close')) { + if (ctx.doSync(() => serverHints.includes('session-close'))) { this.closing = true; } - onResponseMetadata?.(metadata); + ctx.doSync(() => ctx.doSync(() => onResponseMetadata?.(metadata))); } return response; @@ -654,6 +687,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { @pessimizable public async bulkUpsert(tablePath: string, rows: TypedValue, settings?: BulkUpsertSettings) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.bulkUpsert', this); const request: Ydb.Table.IBulkUpsertRequest = { table: `${this.endpoint.database}/${tablePath}`, rows, @@ -662,10 +696,10 @@ export class Session extends EventEmitter implements ICreateSessionResult { if (settings) { request.operationParams = settings.operationParams; } - const response = await this.api.bulkUpsert(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const response = await ctx.do(() => this.api.bulkUpsert(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response)))); - return BulkUpsertResult.decode(payload); + return ctx.doSync(() => BulkUpsertResult.decode(payload)); } @pessimizable @@ -674,6 +708,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { consumer: (result: Ydb.Table.ReadTableResult) => void, settings?: ReadTableSettings, ): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.streamReadTable', this); const request: Ydb.Table.IReadTableRequest = { sessionId: this.sessionId, path: `${this.endpoint.database}/${tablePath}`, @@ -686,12 +721,12 @@ export class Session extends EventEmitter implements ICreateSessionResult { request.keyRange = settings.keyRange; } - return this.executeStreamRequest( + return ctx.doSync(() => this.executeStreamRequest( request, - this.api.streamReadTable.bind(this.api), + ctx.doSync(() => this.api.streamReadTable.bind(this.api)), Ydb.Table.ReadTableResult.create, consumer, - ); + )); } @pessimizable @@ -701,6 +736,7 @@ export class Session extends EventEmitter implements ICreateSessionResult { params: IQueryParams = {}, settings?: ExecuteScanQuerySettings, ): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.streamExecuteScanQuery', this); let queryToExecute: IQuery; // eslint-disable-next-line prefer-const @@ -720,12 +756,12 @@ export class Session extends EventEmitter implements ICreateSessionResult { request.collectStats = settings.collectStats; } - return this.executeStreamRequest( + return ctx.doSync(() => this.executeStreamRequest( request, - this.api.streamExecuteScanQuery.bind(this.api), + ctx.doSync(() => this.api.streamExecuteScanQuery.bind(this.api)), ExecuteScanQueryPartialResult.create, consumer, - ); + )); } private executeStreamRequest, IRes, Res>( @@ -735,14 +771,16 @@ export class Session extends EventEmitter implements ICreateSessionResult { consumer: (result: Res) => void, ) : Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.executeStreamRequest', this); + return new Promise((resolve, reject) => { - apiStreamMethod(request, (error, response) => { + ctx.doSync(() => apiStreamMethod(request, (error, response) => { try { if (error) { if (error instanceof StreamEnd) { - resolve(); + ctx.doSync(() => resolve()); } else { - reject(error); + ctx.doSync(() => reject(error)); } } else if (response) { const operation = { @@ -750,39 +788,40 @@ export class Session extends EventEmitter implements ICreateSessionResult { issues: response.issues, } as Ydb.Operations.IOperation; - YdbError.checkStatus(operation); + ctx.doSync(() => YdbError.checkStatus(operation)); if (!response.result) { - reject(new MissingValue('Missing result value!')); + ctx.doSync(() => reject(new MissingValue('Missing result value!'))); return; } - const result = transformer(response.result); + const result = ctx.doSync(() => transformer(response.result!)); - consumer(result); + ctx.doSync(() => consumer(result)); } } catch (error_) { - reject(error_); + ctx.doSync(() => reject(error_)); // TODO: doSync in doSync } - }); + })); }); } public async explainQuery(query: string, operationParams?: Ydb.Operations.IOperationParams): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:Session.explainQuery', this); const request: Ydb.Table.IExplainDataQueryRequest = { sessionId: this.sessionId, yqlText: query, operationParams, }; - const response = await this.api.explainDataQuery(request); - const payload = getOperationPayload(this.processResponseMetadata(request, response)); + const response = await ctx.do(() => this.api.explainDataQuery(request)); + const payload = ctx.doSync(() => getOperationPayload(ctx.doSync(() => this.processResponseMetadata(request, response)))); - return ExplainQueryResult.decode(payload); + return ctx.doSync(() => ExplainQueryResult.decode(payload)); } } -type SessionCallback = (session: Session) => Promise; + type SessionCallback = (session: Session) => Promise; interface ITableClientSettings { database: string; @@ -808,6 +847,7 @@ export class SessionPool extends EventEmitter { private newSessionsRequested: number; private sessionsBeingDeleted: number; private readonly sessionKeepAliveId: NodeJS.Timeout; + // @ts-ignore private readonly logger: Logger; private readonly waiters: ((session: Session) => void)[] = []; @@ -815,6 +855,8 @@ export class SessionPool extends EventEmitter { private static SESSION_MAX_LIMIT = 20; constructor(settings: ITableClientSettings) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.constructor', settings.logger); + super(); this.database = settings.database; this.authService = settings.authService; @@ -828,56 +870,65 @@ export class SessionPool extends EventEmitter { this.sessions = new Set(); this.newSessionsRequested = 0; this.sessionsBeingDeleted = 0; - this.sessionKeepAliveId = this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD); + this.sessionKeepAliveId = ctx.doSync(() => this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD)); this.sessionCreators = new Map(); this.discoveryService = discoveryService; - this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { - this.sessionCreators.delete(endpoint); - }); - this.prepopulateSessions(); + ctx.doSync(() => this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { + ctx.doSync(() => this.sessionCreators.delete(endpoint)); + })); + ctx.doSync(() => this.prepopulateSessions()); } public async destroy(): Promise { - this.logger.debug('Destroying pool...'); - clearInterval(this.sessionKeepAliveId); - await Promise.all(_.map([...this.sessions], (session: Session) => this.deleteSession(session))); - this.logger.debug('Pool has been destroyed.'); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.destroy', this); + + ctx.logger.debug('Destroying pool...'); + ctx.doSync(() => clearInterval(this.sessionKeepAliveId)); + await Promise.all(ctx.doSync(() => _.map([...this.sessions], (session: Session) => ctx.doSync(() => this.deleteSession(session))))); + ctx.logger.debug('Pool has been destroyed.'); } private initListeners(keepAlivePeriod: number) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.initListeners', this); + return setInterval(async () => Promise.all( - _.map([...this.sessions], (session: Session) => session.keepAlive() - // delete session if error - .catch(() => this.deleteSession(session)) - // ignore errors to avoid UnhandledPromiseRejectionWarning - .catch(() => {})), + ctx.doSync(() => _.map([...this.sessions], (session: Session) => ctx.doSync(() => ctx.doSync(() => ctx.doSync(() => session.keepAlive()) + // delete session if error + .catch(() => ctx.doSync(() => this.deleteSession(session)))) + // ignore errors to avoid UnhandledPromiseRejectionWarning + .catch(() => {})))), ), keepAlivePeriod); } private prepopulateSessions() { // eslint-disable-next-line unicorn/no-array-for-each - _.forEach(_.range(this.minLimit), () => this.createSession()); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.prepopulateSessions', this); + + ctx.doSync(() => _.forEach(ctx.doSync(() => _.range(this.minLimit)), () => ctx.doSync(() => this.createSession()))); } private async getSessionCreator(): Promise { - const endpoint = await this.discoveryService.getEndpoint(); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.getSessionCreator', this); + const endpoint = await ctx.do(() => this.discoveryService.getEndpoint()); - if (!this.sessionCreators.has(endpoint)) { + if (!ctx.doSync(() => this.sessionCreators.has(endpoint))) { // eslint-disable-next-line max-len - const sessionService = new SessionService(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); + const sessionService = new SessionService(endpoint, this.database, this.authService, ctx.logger, this.sslCredentials, this.clientOptions); - this.sessionCreators.set(endpoint, sessionService); + ctx.doSync(() => this.sessionCreators.set(endpoint, sessionService)); } - return this.sessionCreators.get(endpoint) as SessionService; + return ctx.doSync(() => this.sessionCreators.get(endpoint)) as SessionService; } private maybeUseSession(session: Session) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.maybeUseSession', this); + if (this.waiters.length > 0) { - const waiter = this.waiters.shift(); + const waiter = ctx.doSync(() => this.waiters.shift()); if (typeof waiter === 'function') { - waiter(session); + ctx.doSync(() => waiter(session)); return true; } @@ -887,26 +938,29 @@ export class SessionPool extends EventEmitter { } private async createSession(): Promise { - const sessionCreator = await this.getSessionCreator(); - const session = await sessionCreator.create(); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.createSession', this); + const sessionCreator = await ctx.do(() => this.getSessionCreator()); + const session = await ctx.do(() => sessionCreator.create()); - session.on(SessionEvent.SESSION_RELEASE, async () => { - if (session.isClosing()) { - await this.deleteSession(session); + ctx.doSync(() => session.on(SessionEvent.SESSION_RELEASE, async () => { + if (ctx.doSync(() => session.isClosing())) { + await ctx.do(() => this.deleteSession(session)); } else { - this.maybeUseSession(session); + ctx.doSync(() => this.maybeUseSession(session)); } - }); - session.on(SessionEvent.SESSION_BROKEN, async () => { - await this.deleteSession(session); - }); - this.sessions.add(session); + })); + ctx.doSync(() => session.on(SessionEvent.SESSION_BROKEN, async () => { + await ctx.do(() => this.deleteSession(session)); + })); + ctx.doSync(() => this.sessions.add(session)); return session; } private deleteSession(session: Session): Promise { - if (session.isDeleted()) { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.deleteSession', this); + + if (ctx.doSync(() => session.isDeleted())) { return Promise.resolve(); } @@ -914,126 +968,140 @@ export class SessionPool extends EventEmitter { // acquire new session as soon one of existing ones is deleted if (this.waiters.length > 0) { // eslint-disable-next-line @typescript-eslint/no-shadow - this.acquire().then((session) => { - if (!this.maybeUseSession(session)) { - session.release(); + ctx.doSync(() => ctx.doSync(() => this.acquire()).then((session) => { + if (!ctx.doSync(() => this.maybeUseSession(session))) { + ctx.doSync(() => session.release()); } - }); + })); } - return session.delete() - // delete session in any case + return ctx.doSync(() => ctx.doSync(() => session.delete()) + // delete session in any case .finally(() => { - this.sessions.delete(session); + ctx.doSync(() => this.sessions.delete(session)); this.sessionsBeingDeleted--; - }); + })); } private acquire(timeout = 0): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.acquire', this); + for (const session of this.sessions) { - if (session.isFree()) { - return Promise.resolve(session.acquire()); + if (ctx.doSync(() => session.isFree())) { + return Promise.resolve(ctx.doSync(() => session.acquire())); } } if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { this.newSessionsRequested++; - return this.createSession() - .then((session) => session.acquire()) + return ctx.doSync(() => ctx.doSync(() => ctx.doSync(() => this.createSession()) + .then((session) => ctx.doSync(() => session.acquire()))) .finally(() => { this.newSessionsRequested--; - }); + })); } return new Promise((resolve, reject) => { let timeoutId: NodeJS.Timeout; const waiter = (session: Session) => { - clearTimeout(timeoutId); - resolve(session.acquire()); + ctx.doSync(() => clearTimeout(timeoutId)); + ctx.doSync(() => resolve(ctx.doSync(() => session.acquire()))); }; if (timeout) { timeoutId = setTimeout(() => { - this.waiters.splice(this.waiters.indexOf(waiter), 1); - reject( + ctx.doSync(() => this.waiters.splice(ctx.doSync(() => this.waiters.indexOf(waiter)), 1)); + ctx.doSync(() => reject( new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`), - ); + )); }, timeout); } - this.waiters.push(waiter); + ctx.doSync(() => this.waiters.push(waiter)); }); } private async _withSession(session: Session, callback: SessionCallback, maxRetries = 0): Promise { + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool._withSession', this); + try { - const result = await callback(session); + const result = await ctx.do(() => callback(session)); - session.release(); + ctx.doSync(() => session.release()); return result; } catch (error) { if (error instanceof BadSession || error instanceof SessionBusy) { - this.logger.debug('Encountered bad or busy session, re-creating the session'); - session.emit(SessionEvent.SESSION_BROKEN); + ctx.logger.debug('Encountered bad or busy session, re-creating the session'); + ctx.doSync(() => session.emit(SessionEvent.SESSION_BROKEN)); // eslint-disable-next-line no-param-reassign - session = await this.createSession(); + session = await ctx.do(() => this.createSession()); if (maxRetries > 0) { - this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); - session.acquire(); + ctx.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); + ctx.doSync(() => session.acquire()); - return this._withSession(session, callback, maxRetries - 1); + return ctx.doSync(() => this._withSession(session, callback, maxRetries - 1)); } } else { - session.release(); + ctx.doSync(() => session.release()); } throw error; } } public async withSession(callback: SessionCallback, timeout = 0): Promise { - const session = await this.acquire(timeout); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.withSession', this); + const session = await ctx.do(() => this.acquire(timeout)); - return this._withSession(session, callback); + return ctx.doSync(() => this._withSession(session, callback)); } public async withSessionRetry(callback: SessionCallback, timeout = 0, maxRetries = 10): Promise { - const session = await this.acquire(timeout); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:SessionPool.withSessionRetry', this); + const session = await ctx.do(() => this.acquire(timeout)); - return this._withSession(session, callback, maxRetries); + return ctx.doSync(() => this._withSession(session, callback, maxRetries)); } } // eslint-disable-next-line unicorn/prefer-event-target export class TableClient extends EventEmitter { - // local-rules/context: no-trace - private pool: SessionPool; constructor(settings: ITableClientSettings) { + ContextWithLogger.getSafe('ydb-nodejs-sdk:TableClient.constructor', settings.logger); super(); this.pool = new SessionPool(settings); } public async withSession(callback: (session: Session) => Promise, timeout = 0): Promise { - return this.pool.withSession(callback, timeout); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:TableClient.withSession', this); + + return ctx.doSync(() => this.pool.withSession(callback, timeout)); } public async withSessionRetry(callback: (session: Session) => Promise, timeout = 0, maxRetries = 10): Promise { - return this.pool.withSessionRetry(callback, timeout, maxRetries); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:TableClient.withSessionRetry', this); + + return ctx.doSync(() => this.pool.withSessionRetry(callback, timeout, maxRetries)); } public async destroy() { - await this.pool.destroy(); + const ctx = ContextWithLogger.getSafe('ydb-nodejs-sdk:TableClient.destroy', this); + + await ctx.do(() => this.pool.destroy()); } } export class Column implements Ydb.Table.IColumnMeta { + // local-rules/context: no-trace + constructor(public name: string, public type: IType, public family?: string) {} } export class StorageSettings implements Ydb.Table.IStoragePool { + // local-rules/context: no-trace constructor(public media: string) {} } @@ -1134,6 +1202,7 @@ export class StoragePolicy implements Ydb.Table.IStoragePolicy { } export class ExplicitPartitions implements Ydb.Table.IExplicitPartitions { + // local-rules/context: no-trace constructor(public splitPoints: ITypedValue[]) {} } @@ -1204,14 +1273,17 @@ export class ReplicationPolicy implements Ydb.Table.IReplicationPolicy { } export class CompactionPolicy implements Ydb.Table.ICompactionPolicy { + // local-rules/context: no-trace constructor(public presetName: string) {} } export class ExecutionPolicy implements Ydb.Table.IExecutionPolicy { + // local-rules/context: no-trace constructor(public presetName: string) {} } export class CachingPolicy implements Ydb.Table.ICachingPolicy { + // local-rules/context: no-trace constructor(public presetName: string) {} }