Skip to content

Commit

Permalink
Merge pull request #370 from ydb-platform/add-opFinished-in-query-ser…
Browse files Browse the repository at this point in the history
…vice-result

Add opFinished in query service result
  • Loading branch information
Zork33 committed Mar 28, 2024
2 parents b7e397d + 65cd6d4 commit 4b4062f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
16 changes: 4 additions & 12 deletions src/__tests__/e2e/query-service/method-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,16 @@ describe('Query.execute()', () => {
const linesInserted = await insertCupleLinesInTestTable();
const res = await simpleSelect();

let linesCount = 0;
for await (const resultSet of res.resultSets)
for await (const _row of resultSet.rows)
linesCount++;

expect(linesCount).toBe(2 * linesInserted);
});

it('simple select', async () => {
await createTestTable();
const linesInserted = await insertCupleLinesInTestTable();
const res = await simpleSelect();
expect(async () => await simpleSelect()).rejects
.toThrowError(new Error('There\'s another active operation in the session'));

let linesCount = 0;
for await (const resultSet of res.resultSets)
for await (const _row of resultSet.rows)
linesCount++;

await res.opFinished;

expect(linesCount).toBe(2 * linesInserted);
});

Expand Down
16 changes: 16 additions & 0 deletions src/query/query-session-execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import {resultsetYdbColumns} from "./symbols";
export type IExecuteResult = {
resultSets: AsyncGenerator<ResultSet>,
execStats?: Ydb.TableStats.IQueryStats;
/**
* Gets resolved when all data is received from stream and execute() operation become completed. At that moment
* is allowed to start next operation within session.
*
* Wait for this promise is equivalent to get read all data from all result sets.
*/
opFinished: Promise<void>;
};

export const CANNOT_MANAGE_TRASACTIONS_ERROR = 'Cannot manage transactions at the session level if do() has the txSettings parameter or doTx() is used';
Expand Down Expand Up @@ -108,6 +115,8 @@ export function execute(this: QuerySession, opts: {
let lastRowsIterator: IAsyncQueueIterator<Ydb.IValue>;
let resultResolve: ((data: IExecuteResult) => void) | undefined
let resultReject: ((reason?: any) => void) | undefined;
let finishedResolve: (() => void) | undefined;
let finishedReject: ((reason?: any) => void) | undefined;
let responseStream: ClientReadableStream<Ydb.Query.ExecuteQueryResponsePart> | undefined;
let execStats: Ydb.TableStats.IQueryStats | undefined;

Expand Down Expand Up @@ -138,6 +147,7 @@ export function execute(this: QuerySession, opts: {
iterator.error(reason);
});
}
if (finishedReject) finishedReject(reason);
delete this[symbols.sessionCurrentOperation];
}

Expand Down Expand Up @@ -221,6 +231,10 @@ export function execute(this: QuerySession, opts: {
get execStats() {
return execStats
},
opFinished: new Promise<void>((resolve, reject) => {
finishedResolve = resolve;
finishedReject = reject;
})
});
resultResolve = resultReject = undefined;
}
Expand Down Expand Up @@ -265,10 +279,12 @@ export function execute(this: QuerySession, opts: {
get execStats() {
return execStats
},
opFinished: Promise.resolve()
});
resultResolve = resultReject = undefined;
}

if (finishedResolve) finishedResolve();
delete this[symbols.sessionCurrentOperation];
finished = true;
});
Expand Down

0 comments on commit 4b4062f

Please sign in to comment.