Skip to content

Commit

Permalink
feat: add idempotent option to tableClient session.executeQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
Zork33 committed Sep 27, 2024
1 parent ee14b9f commit 89df57e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ export class ClientResourceExhausted extends TransportError {
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Slow, false, true, true);
}

export class ClientCancelled extends TransportError {
export class ClientCancelled extends TransportError { // TODO: "Call cancelled" error appears also when connection string is wrong - would be right to avoid such dead lock retrying
static status = StatusCode.CLIENT_CANCELED;
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.No, false, false, false);
public readonly [RetryPolicySymbol] = retryPolicy(Backoff.Fast, false, true, false);
}

const TRANSPORT_ERROR_CODES = new Map([
Expand Down
2 changes: 1 addition & 1 deletion src/retries_obsoleted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const RETRYABLE_ERRORS_FAST = [
];
const RETRYABLE_ERRORS_SLOW = [errors.Overloaded, errors.ClientResourceExhausted];

class RetryStrategy {
export class RetryStrategy {
// private logger: Logger;
constructor(
public methodName = 'UnknownClass::UnknownMethod',
Expand Down
20 changes: 17 additions & 3 deletions src/table/table-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import * as grpc from "@grpc/grpc-js";
import EventEmitter from "events";
import {ICreateSessionResult, SessionEvent, TableService} from "./table-session-pool";
import {Endpoint} from "../discovery";
import {retryable} from "../retries_obsoleted";
import {retryable, RetryParameters, RetryStrategy} from "../retries_obsoleted";
import {MissingStatus, MissingValue, SchemeError, YdbError} from "../errors";
import {ResponseMetadataKeys} from "../constants";
import {pessimizable} from "../utils";
Expand Down Expand Up @@ -171,15 +171,21 @@ export class PrepareQuerySettings extends OperationParamsSettings {
}

export class ExecuteQuerySettings extends OperationParamsSettings {
keepInCache: boolean = false;
keepInCache?: boolean = false;
collectStats?: Ydb.Table.QueryStatsCollection.Mode;
onResponseMetadata?: (metadata: grpc.Metadata) => void;
idempotent: boolean = false;

withKeepInCache(keepInCache: boolean) {
this.keepInCache = keepInCache;
return this;
}

withIdempotent(idempotent: boolean) {
this.idempotent = idempotent;
return this;
}

withCollectStats(collectStats: Ydb.Table.QueryStatsCollection.Mode) {
this.collectStats = collectStats;
return this;
Expand Down Expand Up @@ -258,6 +264,8 @@ export class ExecuteScanQuerySettings {
}
}

let executeQueryRetryer: RetryStrategy;

export class TableSession extends EventEmitter implements ICreateSessionResult {
private beingDeleted = false;
private free = true;
Expand Down Expand Up @@ -518,7 +526,13 @@ export class TableSession extends EventEmitter implements ICreateSessionResult {
if (keepInCache) {
request.queryCachePolicy = {keepInCache};
}
const response = await this.api.executeDataQuery(request);

if (!executeQueryRetryer) executeQueryRetryer = new RetryStrategy('TableSession:executeQuery', new RetryParameters(), this.logger);

const response =
settings?.idempotent
? await executeQueryRetryer.retry(() => this.api.executeDataQuery(request))
: await this.api.executeDataQuery(request);
const payload = getOperationPayload(this.processResponseMetadata(request, response, settings?.onResponseMetadata));
return ExecuteQueryResult.decode(payload);
}
Expand Down
27 changes: 20 additions & 7 deletions src/utils/test/create-table.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Column, TableDescription, TableSession} from "../../table";
import {withRetries} from "../../retries_obsoleted";
import {AUTO_TX, Column, ExecuteQuerySettings, TableDescription, TableSession} from "../../table";
// import {withRetries} from "../../retries_obsoleted";
import {Types} from "../../types";
import {Row} from "./row";

Expand Down Expand Up @@ -29,10 +29,23 @@ DECLARE $data AS List<Struct<id: Uint64, title: Utf8>>;
REPLACE INTO ${TABLE}
SELECT * FROM AS_TABLE($data);`;

await withRetries(async () => {
const preparedQuery = await session.prepareQuery(query);
await session.executeQuery(preparedQuery, {
// Now we can specify that the operation should be repeated in case of an error by specifying that it is idempotent

// Old code:

// await withRetries(async () => {
// const preparedQuery = await session.prepareQuery(query);
// await session.executeQuery(preparedQuery, {
// '$data': Row.asTypedCollection(rows),
// });
// });

// New code variant:

const preparedQuery = await session.prepareQuery(query);
await session.executeQuery(preparedQuery, {
'$data': Row.asTypedCollection(rows),
});
});
},
AUTO_TX,
new ExecuteQuerySettings().withIdempotent(true));
}

0 comments on commit 89df57e

Please sign in to comment.