From 0f7daf43b20e664f860b2bef38f1e91e8315f79e Mon Sep 17 00:00:00 2001 From: Alexey Zorkaltsev Date: Thu, 26 Oct 2023 17:01:23 +0300 Subject: [PATCH] chore: WIP --- src/DriverContext.ts | 66 ------------- .../integration/bytestring-identity.test.ts | 4 +- .../unit/context-with-logger.test.ts | 33 +++++++ src/__tests__/unit/driverContext.test.ts | 54 ----------- src/context-with-logger.ts | 82 ++++++++++++++++ src/driver.ts | 34 ++++--- src/retries/withRetries.ts | 8 +- src/table/sessionPool.ts | 94 ++++++++++++------- src/utils/context.ts | 28 ++++++ 9 files changed, 227 insertions(+), 176 deletions(-) delete mode 100644 src/DriverContext.ts create mode 100644 src/__tests__/unit/context-with-logger.test.ts delete mode 100644 src/__tests__/unit/driverContext.test.ts create mode 100644 src/context-with-logger.ts diff --git a/src/DriverContext.ts b/src/DriverContext.ts deleted file mode 100644 index 3e181141..00000000 --- a/src/DriverContext.ts +++ /dev/null @@ -1,66 +0,0 @@ -import {Context, getContext, NOT_A_CONTEXT} from "./utils/context"; -import Driver from "./driver"; -import {getLoggerFromObject} from "./utils/getLoggerFromObject"; -import {Logger} from "./utils/simple-logger"; - -/** - * Context with reference to the head object - driver. - */ -export class DriverContext extends Context { - - readonly logger: Logger; - - private constructor(context: Context, readonly driver: Driver) { - super(context); - - this.logger = getLoggerFromObject(this.driver); - } - - - /** - * This method should be called in methods that can be called by a client code - if this type of context - * does not already exist, it will be created. It is important to have access to Driver object to build new context. - */ - static getSafe(driver: Driver, methodName: string) { - const ctx = getContext(); - - if (!(driver instanceof Driver)) { - throw Error('Not the Driver! Probably the object does not have such field'); - } - - let context = ctx.findContextByClass(DriverContext); - - if (context === NOT_A_CONTEXT) { - context = new DriverContext(ctx, driver); - } - - context.trace(methodName); - - return context; - } - - /** - * Returns the context of this type. If there is no such context - throws an error. - */ - static get(methodName: string) { - const ctx = getContext(); - - let context = ctx.findContextByClass(DriverContext); - - if (context === NOT_A_CONTEXT) { - throw new Error('RiverContext is not in the context chain. Consider using RiverContext.getSafe()') - } - - context.trace(methodName); - - return context; - } - - /** - * Writes trace to logger and creates span if tracing is enabled. - */ - private trace(methodName: string) { - this.driver.logger.trace(methodName); - // TODO: name span - } -} diff --git a/src/__tests__/integration/bytestring-identity.test.ts b/src/__tests__/integration/bytestring-identity.test.ts index 387e8107..a3c7a543 100644 --- a/src/__tests__/integration/bytestring-identity.test.ts +++ b/src/__tests__/integration/bytestring-identity.test.ts @@ -3,7 +3,7 @@ import {destroyDriver, initDriver, TABLE} from '../../test-utils'; import {Column, Session, TableDescription} from '../../table'; import {declareType, TypedData, Types} from '../../types'; import {withRetries} from '../../retries'; -import {DriverContext} from "../../DriverContext"; +import {ContextWithLogger} from "../../context-with-logger"; async function createTable(session: Session) { await session.dropTable(TABLE); @@ -78,7 +78,7 @@ describe('bytestring identity', () => { beforeAll(async () => { driver = await initDriver(); - await DriverContext.getSafe(driver, 'test.beforeAll').do(() => + await ContextWithLogger.getSafe(driver.logger, 'test.beforeAll').do(() => driver.tableClient.withSession(async (session) => { await createTable(session); await fillTableWithData(session, initialRows); diff --git a/src/__tests__/unit/context-with-logger.test.ts b/src/__tests__/unit/context-with-logger.test.ts new file mode 100644 index 00000000..6b7c7d56 --- /dev/null +++ b/src/__tests__/unit/context-with-logger.test.ts @@ -0,0 +1,33 @@ +import {ContextWithLogger} from '../../context-with-logger'; +import Driver from "./../../driver"; +import {buildTestLogger} from "../../utils/tests/test-logger"; + +describe('driverConext', () => { + + it('getSafe', async () => { + const {testLogger, testLoggerFn} = buildTestLogger() + const ctx1 = ContextWithLogger.getSafe(testLogger, 'method1'); + await ctx1.do(() => { + const ctx2 = ContextWithLogger.getSafe(testLogger, 'method2'); + expect(ctx2).toBe(ctx1); + }); + expect(testLoggerFn.mock.calls).toEqual([['trace', 'method1'], ['trace', 'method2']]); + }); + + it('get - ok', async () => { + const {testLogger, testLoggerFn} = buildTestLogger() + const driver = Object.create(Driver.prototype) as Driver; + (driver as any).logger = testLogger; + + const ctx1 = ContextWithLogger.getSafe(testLogger,'method1'); + await ctx1.do(() => { + const ctx2 = ContextWithLogger.get('method2'); + expect(ctx2).toBe(ctx1); + }); + expect(testLoggerFn.mock.calls).toEqual([['trace', 'method1'], ['trace', 'method2']]); + }); + + it('get - error', async () => { + expect(() => ContextWithLogger.get('method')).toThrow(); + }); +}); diff --git a/src/__tests__/unit/driverContext.test.ts b/src/__tests__/unit/driverContext.test.ts deleted file mode 100644 index 9e356731..00000000 --- a/src/__tests__/unit/driverContext.test.ts +++ /dev/null @@ -1,54 +0,0 @@ -import {DriverContext} from '../../DriverContext'; -import Driver from "./../../driver"; -import {buildTestLogger} from "../../utils/tests/test-logger"; - -describe('driverConext', () => { - - it('getSafe', async () => { - const {testLogger, testLoggerFn} = buildTestLogger() - - // Note: This way does not go for unit test - since Driver components initiates async start - // const driver = new Driver({ - // connectionString: 'http://test.com:1111/?database=test', - // authService: new AnonymousAuthService(), - // logger: testLogger}); - - // testLoggerFn.mockReset(); - - const driver = Object.create(Driver.prototype) as Driver; - (driver as any).logger = testLogger; - - const ctx1 = DriverContext.getSafe(driver, 'method1'); - await ctx1.do(() => { - const ctx2 = DriverContext.getSafe(driver, 'method2'); - expect(ctx2).toBe(ctx1); - }); - expect(testLoggerFn.mock.calls).toEqual([['trace', 'method1'], ['trace', 'method2']]); - }); - - it('get - ok', async () => { - const {testLogger, testLoggerFn} = buildTestLogger() - - // Note: This way does not go for unit test - since Driver components initiates async start - // const driver = new Driver({ - // connectionString: 'http://test.com:1111/?database=test', - // authService: new AnonymousAuthService(), - // logger: testLogger}); - - // testLoggerFn.mockReset(); - - const driver = Object.create(Driver.prototype) as Driver; - (driver as any).logger = testLogger; - - const ctx1 = DriverContext.getSafe(driver,'method1'); - await ctx1.do(() => { - const ctx2 = DriverContext.get('method2'); - expect(ctx2).toBe(ctx1); - }); - expect(testLoggerFn.mock.calls).toEqual([['trace', 'method1'], ['trace', 'method2']]); - }); - - it('get - error', async () => { - expect(() => DriverContext.get('method')).toThrow(); - }); -}); diff --git a/src/context-with-logger.ts b/src/context-with-logger.ts new file mode 100644 index 00000000..0bbb4014 --- /dev/null +++ b/src/context-with-logger.ts @@ -0,0 +1,82 @@ +import {Context, getContext, NOT_A_CONTEXT} from "./utils/context"; +import {Logger} from "./utils/simple-logger"; + +/** + * Context with reference to the head object - driver. + */ +export class ContextWithLogger extends Context { + private constructor(context: Context, readonly logger: Logger) { + super(context); + } + + /** + * This method should be called in methods that can be called by a client code - if this type of context + * does not already exist, it will be created. It is important to have access to Logger object to build new context. + */ + static getSafe(logger: Logger, methodName: string) { + const ctx = getContext(); + + let context = ctx.findContextByClass(ContextWithLogger); + + if (context === NOT_A_CONTEXT) { + context = new ContextWithLogger(ctx, logger); + } + + context.trace(methodName, ctx); + + return context; + } + + /** + * Returns the context of this type. If there is no such context - throws an error. + */ + static get(methodName: string) { + const ctx = getContext(); + + let context = ctx.findContextByClass(ContextWithLogger); + + if (context === NOT_A_CONTEXT) { + throw new Error('ContextWithLogger is not in the context chain. Consider using RiverContext.getSafe()') + } + + context.trace(methodName, ctx); + + return context; + } + + /** + * Guarantees error logging if the code is called from a thread other than + * the main thread, such as setTimeout or setInterval. + * + * An error is NOT thrown after logging. And NO result. + */ + async doHandleError(callback: () => T): Promise { + try { + await super.do(callback); + } catch(error) { + this.logger.error(error); + } + } + + /** + * Guarantees error logging if the code is called from a thread other than + * the main thread, such as setTimeout or setInterval. + * + * An error is NOT thrown after logging. And NO result. + */ + doHandleErrorSync(callback: () => T): void { + try { + super.doSync(callback); + } catch(error) { + this.logger.error(error); + } + } + + /** + * Writes trace to logger and creates span if tracing is enabled. + */ + private trace(methodName: string, ctx: Context) { + this.logger.trace(methodName, ctx); // as parameter goes las ontext in the chain + // TODO: name span + } +} diff --git a/src/driver.ts b/src/driver.ts index 40be3485..5bef7adf 100644 --- a/src/driver.ts +++ b/src/driver.ts @@ -9,6 +9,7 @@ import SchemeClient from './scheme'; import {ClientOptions} from './utils'; import {parseConnectionString} from './parse-connection-string'; import {makeSslCredentials, ISslCredentials} from './ssl-credentials'; +import {ContextWithLogger} from "./context-with-logger"; export interface IPoolSettings { minLimit?: number; @@ -42,9 +43,10 @@ export default class Driver { constructor(settings: IDriverSettings) { this.logger = settings.logger || new SimpleLogger(); + const ctx = ContextWithLogger.getSafe(this.logger, 'ydb_nodejs_sdk.driver.ctor'); if (settings.connectionString) { - const {endpoint, database} = parseConnectionString(settings.connectionString); + const {endpoint, database} = ctx.doSync(() => parseConnectionString(settings.connectionString!)); this.endpoint = endpoint; this.database = database; } else if (!settings.endpoint) { @@ -56,21 +58,21 @@ export default class Driver { this.database = settings.database; } - this.sslCredentials = makeSslCredentials(this.endpoint, this.logger, settings.sslCredentials); + this.sslCredentials = ctx.doSync(() => makeSslCredentials(this.endpoint, this.logger, settings.sslCredentials)); this.authService = settings.authService; this.poolSettings = settings.poolSettings; this.clientOptions = settings.clientOptions; - this.discoveryService = new DiscoveryService({ + this.discoveryService = ctx.doSync(() => new DiscoveryService({ endpoint: this.endpoint, database: this.database, authService: this.authService, sslCredentials: this.sslCredentials, discoveryPeriod: ENDPOINT_DISCOVERY_PERIOD, logger: this.logger, - }); - this.tableClient = new TableClient({ + })); + this.tableClient = ctx.doSync(() => new TableClient({ database: this.database, authService: this.authService, sslCredentials: this.sslCredentials, @@ -78,21 +80,22 @@ export default class Driver { clientOptions: this.clientOptions, discoveryService: this.discoveryService, logger: this.logger, - }); - this.schemeClient = new SchemeClient({ + })); + this.schemeClient = ctx.doSync(() => new SchemeClient({ database: this.database, authService: this.authService, sslCredentials: this.sslCredentials, clientOptions: this.clientOptions, discoveryService: this.discoveryService, logger: this.logger, - }); + })); } public async ready(timeout: number): Promise { + const ctx = ContextWithLogger.getSafe(this.logger, 'ydb_nodejs_sdk.driver.ready'); try { - await this.discoveryService.ready(timeout); - this.logger.debug('Driver is ready!'); + await ctx.do(() => this.discoveryService.ready(timeout)); + ctx.logger.debug('Driver is ready!'); return true; } catch (e) { if (e instanceof TimeoutExpired) { @@ -104,10 +107,11 @@ export default class Driver { } public async destroy(): Promise { - this.logger.debug('Destroying driver...'); - this.discoveryService.destroy(); - await this.tableClient.destroy(); - await this.schemeClient.destroy(); - this.logger.debug('Driver has been destroyed.'); + const ctx = ContextWithLogger.getSafe(this.logger, 'ydb_nodejs_sdk.driver.destroy'); + ctx.logger.debug('Destroying driver...'); + ctx.do(() => this.discoveryService.destroy()); + await ctx.do(() => this.tableClient.destroy()); + await ctx.do(() => this.schemeClient.destroy()); + ctx.logger.debug('Driver has been destroyed.'); } } diff --git a/src/retries/withRetries.ts b/src/retries/withRetries.ts index b79b0211..887e7e76 100644 --- a/src/retries/withRetries.ts +++ b/src/retries/withRetries.ts @@ -1,6 +1,6 @@ import {RetryParameters} from "./RetryParameters"; import {RetryStrategy} from "./RetryStrategy"; -import {DriverContext} from "../DriverContext"; +import {ContextWithLogger} from "../context-with-logger"; import {Trace} from "./consts"; /** @@ -12,12 +12,12 @@ export async function withRetries( originalFunction: () => Promise, strategyParams?: RetryParameters, ) { - const ydbSdkContext = DriverContext.get(Trace.withRetries); + const ctx = ContextWithLogger.get(Trace.withRetries); const wrappedMethodName = originalFunction.name; if (!strategyParams) { strategyParams = new RetryParameters(); } - const strategy = new RetryStrategy(wrappedMethodName, strategyParams, ydbSdkContext.logger); - return await strategy.retry(originalFunction); + const strategy = ctx.doSync(() => new RetryStrategy(wrappedMethodName, strategyParams!, ctx.logger)); + return await ctx.do(() => strategy.retry(originalFunction)); } diff --git a/src/table/sessionPool.ts b/src/table/sessionPool.ts index 9614d7a5..b66d7878 100644 --- a/src/table/sessionPool.ts +++ b/src/table/sessionPool.ts @@ -16,6 +16,7 @@ import { import {SessionEvent} from "./internal/sessionEvent"; import {ITableClientSettings} from "./internal/ITableClientSettings"; import {SessionService} from "./sessionService"; +import {ContextWithLogger} from "../context-with-logger"; export class SessionPool extends EventEmitter { private readonly database: string; @@ -30,6 +31,7 @@ export class SessionPool extends EventEmitter { private newSessionsRequested: number; private sessionsBeingDeleted: number; private readonly sessionKeepAliveId: NodeJS.Timeout; + // @ts-ignore private readonly logger: Logger; private readonly waiters: ((session: Session) => void)[] = []; @@ -37,64 +39,77 @@ export class SessionPool extends EventEmitter { private static SESSION_MAX_LIMIT = 20; constructor(settings: ITableClientSettings) { + const ctx = ContextWithLogger.getSafe(settings.logger, 'ydb_nodejs_sdk.sessionPool.ctor'); + super(); + + this.logger = settings.logger; this.database = settings.database; this.authService = settings.authService; this.sslCredentials = settings.sslCredentials; this.clientOptions = settings.clientOptions; - this.logger = settings.logger; const poolSettings = settings.poolSettings; this.minLimit = poolSettings?.minLimit || SessionPool.SESSION_MIN_LIMIT; this.maxLimit = poolSettings?.maxLimit || SessionPool.SESSION_MAX_LIMIT; this.sessions = new Set(); this.newSessionsRequested = 0; this.sessionsBeingDeleted = 0; - this.sessionKeepAliveId = this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD); + this.sessionKeepAliveId = ctx.doSync(() => this.initListeners(poolSettings?.keepAlivePeriod || SESSION_KEEPALIVE_PERIOD)); this.sessionCreators = new Map(); this.discoveryService = settings.discoveryService; this.discoveryService.on(Events.ENDPOINT_REMOVED, (endpoint: Endpoint) => { - this.sessionCreators.delete(endpoint); + ctx.doHandleError(() => this.sessionCreators.delete(endpoint)); }); - this.prepopulateSessions(); + ctx.doSync(() => this.prepopulateSessions()); } public async destroy(): Promise { - this.logger.debug('Destroying pool...'); + const ctx = ContextWithLogger.getSafe(this.logger,'ydb_nodejs_sdk.sessionPool.destroy'); + + ctx.logger.debug('Destroying pool...'); clearInterval(this.sessionKeepAliveId); - await Promise.all(_.map([...this.sessions], (session: Session) => this.deleteSession(session))); - this.logger.debug('Pool has been destroyed.'); + await Promise.all(_.map([...this.sessions], (session: Session) => ctx.do(() => this.deleteSession(session)))); + ctx.logger.debug('Pool has been destroyed.'); } private initListeners(keepAlivePeriod: number) { - return setInterval(async () => Promise.all( + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.initListeners'); + + return setInterval(async () => ctx.doHandleError(() => Promise.all( _.map([...this.sessions], (session: Session) => { return session.keepAlive() // delete session if error - .catch(() => this.deleteSession(session)) + .catch(() => ctx.do(() => this.deleteSession(session))) // ignore errors to avoid UnhandledPromiseRejectionWarning .catch(() => Promise.resolve()) }) - ), keepAlivePeriod); + )), keepAlivePeriod); } private prepopulateSessions() { - _.forEach(_.range(this.minLimit), () => this.createSession()); + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.prepopulateSessions'); + + _.forEach(_.range(this.minLimit), () => ctx.do(() => this.createSession())); } private async getSessionCreator(): Promise { - const endpoint = await this.discoveryService.getEndpoint(); + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.getSessionCreator'); + + const endpoint = await ctx.do(() => this.discoveryService.getEndpoint()); if (!this.sessionCreators.has(endpoint)) { - const sessionService = new SessionService(endpoint, this.database, this.authService, this.logger, this.sslCredentials, this.clientOptions); + const sessionService = await ctx.do(() => new SessionService(endpoint, this.database, this.authService, ctx.logger, this.sslCredentials, this.clientOptions)); this.sessionCreators.set(endpoint, sessionService); } return this.sessionCreators.get(endpoint) as SessionService; } private maybeUseSession(session: Session) { + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.maybeUseSession'); + if (this.waiters.length > 0) { const waiter = this.waiters.shift(); if (typeof waiter === "function") { - waiter(session); + ctx.do(() => waiter(session)); return true; } } @@ -102,23 +117,27 @@ export class SessionPool extends EventEmitter { } private async createSession(): Promise { + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.createSession'); + const sessionCreator = await this.getSessionCreator(); - const session = await sessionCreator.create(); + const session = await ctx.do(() => sessionCreator.create()); session.on(SessionEvent.SESSION_RELEASE, async () => { if (session.isClosing()) { - await this.deleteSession(session); + await ctx.do(() => this.deleteSession(session)); } else { - this.maybeUseSession(session); + ctx.do(() => this.maybeUseSession(session)); } }) session.on(SessionEvent.SESSION_BROKEN, async () => { await this.deleteSession(session); }); - this.sessions.add(session); + ctx.do(() => this.sessions.add(session)); return session; } private deleteSession(session: Session): Promise { + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.deleteSession'); + if (session.isDeleted()) { return Promise.resolve(); } @@ -127,8 +146,8 @@ export class SessionPool extends EventEmitter { // acquire new session as soon one of existing ones is deleted if (this.waiters.length > 0) { this.acquire().then((session) => { - if (!this.maybeUseSession(session)) { - session.release(); + if (!ctx.do(() => this.maybeUseSession(session))) { + ctx.do(() => session.release()); } }); } @@ -141,6 +160,8 @@ export class SessionPool extends EventEmitter { } private acquire(timeout: number = 0): Promise { + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool.acquire'); + for (const session of this.sessions) { if (session.isFree()) { return Promise.resolve(session.acquire()); @@ -149,13 +170,13 @@ export class SessionPool extends EventEmitter { if (this.sessions.size + this.newSessionsRequested - this.sessionsBeingDeleted <= this.maxLimit) { this.newSessionsRequested++; - return this.createSession() + return ctx.doSync(() => this.createSession() .then((session) => { return session.acquire(); }) .finally(() => { this.newSessionsRequested--; - }); + })); } else { return new Promise((resolve, reject) => { let timeoutId: NodeJS.Timeout; @@ -166,12 +187,12 @@ export class SessionPool extends EventEmitter { } if (timeout) { - timeoutId = setTimeout(() => { + timeoutId = setTimeout(() => ctx.doHandleError(() => { this.waiters.splice(this.waiters.indexOf(waiter), 1); reject( new SessionPoolEmpty(`No session became available within timeout of ${timeout} ms`) ); - }, timeout); + }), timeout); } this.waiters.push(waiter); }); @@ -179,35 +200,38 @@ export class SessionPool extends EventEmitter { } private async _withSession(session: Session, callback: SessionCallback, maxRetries = 0): Promise { + const ctx = ContextWithLogger.get('ydb_nodejs_sdk.sessionPool._withSession'); try { - const result = await callback(session); - session.release(); + const result = await ctx.do(() => callback(session)); + await ctx.do(() => session.release()); return result; } catch (error) { if (error instanceof BadSession || error instanceof SessionBusy) { - this.logger.debug('Encountered bad or busy session, re-creating the session'); + ctx.logger.debug('Encountered bad or busy session, re-creating the session'); session.emit(SessionEvent.SESSION_BROKEN); - session = await this.createSession(); + session = await ctx.do(() => this.createSession()); if (maxRetries > 0) { - this.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); - session.acquire(); + ctx.logger.debug(`Re-running operation in new session, ${maxRetries} left.`); + ctx.do(() => session.acquire()); return this._withSession(session, callback, maxRetries - 1); } } else { - session.release(); + await ctx.do(() => session.release()); } throw error; } } public async withSession(callback: SessionCallback, timeout: number = 0): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback); + const ctx = ContextWithLogger.getSafe(this.logger, 'ydb_nodejs_sdk.sessionPool.withSession'); + const session = await ctx.do(() => this.acquire(timeout)); + return ctx.do(() => this._withSession(session, callback)); } public async withSessionRetry(callback: SessionCallback, timeout: number = 0, maxRetries = 10): Promise { - const session = await this.acquire(timeout); - return this._withSession(session, callback, maxRetries); + const ctx = ContextWithLogger.getSafe(this.logger,'ydb_nodejs_sdk.sessionPool.withSessionRetry'); + const session = await ctx.do(() => this.acquire(timeout)); + return ctx.do(() => this._withSession(session, callback, maxRetries)); } } diff --git a/src/utils/context.ts b/src/utils/context.ts index 68dbe4ed..f7d79f24 100644 --- a/src/utils/context.ts +++ b/src/utils/context.ts @@ -62,6 +62,34 @@ export class Context { } } + /** + * Calls the method passed as a callback with pass in the context from which the method was called. + * + * The context can be obtained in the first line of the called function - *const ctx = getContext();*. + * + * Sync version primarily required to call anything within constructors. + */ + doSync(callback: () => T): T { + const prevContext = _context; + let error: any; + try { + _context = this; + return callback(); + } catch (_error) { + error = _error; + throw error; + } finally { + _context = prevContext; + let ctx: Context | undefined = this; + while (ctx) { + if (ctx.done) { + ctx.done(error); + } + ctx = ctx.parent; + } + } + } + /** * Finds the context of the specified class in the context chain. *