diff --git a/src/Documents/BulkInsertOperation.ts b/src/Documents/BulkInsertOperation.ts index d2ad6fc8..e5dd087d 100644 --- a/src/Documents/BulkInsertOperation.ts +++ b/src/Documents/BulkInsertOperation.ts @@ -8,7 +8,6 @@ import { CONSTANTS, HEADERS } from "../Constants"; import { getError, throwError } from "../Exceptions"; import { GetOperationStateCommand } from "./Operations/GetOperationStateOperation"; import { StringUtil } from "../Utility/StringUtil"; -import * as StreamUtil from "../Utility/StreamUtil"; import { JsonSerializer } from "../Mapping/Json/Serializer"; import { RequestExecutor } from "../Http/RequestExecutor"; import { IDocumentStore } from "./IDocumentStore"; @@ -29,6 +28,71 @@ import { TimeSeriesValuesHelper } from "./Session/TimeSeries/TimeSeriesValuesHel import { Timer } from "../Primitives/Timer"; import { EventEmitter } from "node:events"; import { BulkInsertOnProgressEventArgs } from "./Session/SessionEvents"; +import * as semaphore from "semaphore"; +import { acquireSemaphore } from "../Utility/SemaphoreUtil"; +import { Buffer } from "node:buffer"; + +class BulkInsertStream { + + private readonly _items: Array = []; + private totalLength = 0; + + public push(data: string | Buffer) { + this._items.push(data); + this.totalLength += Buffer.isBuffer(data) ? data.length : Buffer.byteLength(data); + } + + public toBuffer(): Buffer { + const result = Buffer.allocUnsafe(this.totalLength); + let idx = 0; + for (const inputElement of this._items) { + if (Buffer.isBuffer(inputElement)) { + inputElement.copy(result, idx); + idx += inputElement.length; + } else { + result.write(inputElement, idx); + idx += Buffer.byteLength(inputElement); + } + } + + return result; + } + + public get length() { + return this.totalLength; + } +} + +class RequestBodyStream extends stream.Readable { + constructor() { + super({ + highWaterMark: 1024 * 1024 + }); + } + + private _pending: Promise; + private _resume: () => void; + + _read(size: number) { + this._resume?.(); + } + + write(data: Buffer | string) { + const canConsumeMore = this.push(data); + if (!canConsumeMore) { + this._pending = new Promise(resolve => { + this._resume = () => { + this._resume = null; + resolve(); + }; + }); + } + } + + async flush(): Promise { + await this._pending; + } +} export class BulkInsertOperation { private _emitter = new EventEmitter(); @@ -39,7 +103,9 @@ export class BulkInsertOperation { private readonly _requestExecutor: RequestExecutor; private _bulkInsertExecuteTask: Promise; - private _completedWithError = false; + private _bulkInsertExecuteTaskErrored = false; + + private _stream: RequestBodyStream; private _first: boolean = true; private _inProgressCommand: CommandType; @@ -48,47 +114,48 @@ export class BulkInsertOperation { private _operationId = -1; private _nodeTag: string; - private _useCompression: boolean = false; private readonly _timeSeriesBatchSize: number; private _concurrentCheck: number = 0; private _isInitialWrite: boolean = true; - private _bulkInsertAborted: Promise; - private _abortReject: (error: Error) => void; - private _aborted: boolean; - private _currentWriter: stream.Readable; - private _requestBodyStream: stream.PassThrough; - private _pipelineFinished: Promise; + private _useCompression: boolean = false; private _unsubscribeChanges: IDisposable; private _onProgressInitialized = false; + private _timer: Timer; private _lastWriteToStream: Date; + private readonly _streamLock: semaphore.Semaphore; private _heartbeatCheckInterval = 40_000; + //TODO: private GZipStream _compressedStream; + private _requestBodyStream: RequestBodyStream; //TODO: raw reableable or wrapped with compressed + private _requestBodyStreamFinished: boolean = false; + private _currentWriter: BulkInsertStream; + private _backgroundWriter: BulkInsertStream; + private _asyncWrite: Promise = Promise.resolve(); + private _asyncWriteDone: boolean = true; + private static readonly _maxSizeInBuffer = 1024 * 1024; + public constructor(database: string, store: IDocumentStore, options?: BulkInsertOptions) { + this._useCompression = options ? options.useCompression : false; + this._options = options ?? {}; + this._database = database; this._conventions = store.conventions; this._store = store; if (StringUtil.isNullOrEmpty(database)) { this._throwNoDatabase(); } this._requestExecutor = store.getRequestExecutor(database); - this._useCompression = options ? options.useCompression : false; - - this._options = options ?? {}; - this._database = database; - + this._currentWriter = new BulkInsertStream(); + this._backgroundWriter = new BulkInsertStream(); this._timeSeriesBatchSize = this._conventions.bulkInsert.timeSeriesBatchSize; this._generateEntityIdOnTheClient = new GenerateEntityIdOnTheClient(this._requestExecutor.conventions, entity => this._requestExecutor.conventions.generateDocumentId(database, entity)); - this._bulkInsertAborted = new Promise((_, reject) => this._abortReject = reject); - - this._bulkInsertAborted.catch(err => { - // we're awaiting it elsewhere - }); + this._streamLock = semaphore(1); this._lastWriteToStream = new Date(); const timerState: TimerState = { @@ -111,23 +178,40 @@ export class BulkInsertOperation { } private async sendHeartBeat(): Promise { - if (Date.now() - this._lastWriteToStream.getTime() < this._heartbeatCheckInterval) { + if (!this.isHeartbeatIntervalExceeded()) { return; } - await this._executeBeforeStore(); - this._endPreviousCommandIfNeeded(); - if (!BulkInsertOperation._checkServerVersion(this._requestExecutor.lastServerVersion)) { - return ; + const context = acquireSemaphore(this._streamLock, { + timeout: 0 + }); + try { + await context.promise; + } catch { + return ; // if locked we are already writing } - if (!this._first) { - this._writeComma(); - } + try { + await this._executeBeforeStore(); + this._endPreviousCommandIfNeeded(); + if (!BulkInsertOperation._checkServerVersion(this._requestExecutor.lastServerVersion)) { + return; + } - this._first = false; - this._inProgressCommand = "None"; - this._currentWriter.push("{\"Type\":\"HeartBeat\"}"); + if (!this._first) { + this._writeComma(); + } + + this._first = false; + this._inProgressCommand = "None"; + this._currentWriter.push("{\"Type\":\"HeartBeat\"}"); + + await this.flushIfNeeded(true); + } catch { + //Ignore the heartbeat if failed + } finally { + context.dispose(); + } } private static _checkServerVersion(serverVersion: string): boolean { @@ -165,13 +249,14 @@ export class BulkInsertOperation { this._useCompression = value; } - private async _throwBulkInsertAborted(e: Error) { + private async _throwBulkInsertAborted(e: Error, flushEx: Error = null) { let errorFromServer: Error; try { errorFromServer = await this._getExceptionFromOperation(); } catch { // server is probably down, will propagate the original exception } + //TODO: use flushEx variable if (errorFromServer) { throw errorFromServer; @@ -196,7 +281,7 @@ export class BulkInsertOperation { this._nodeTag = bulkInsertGetIdRequest.nodeTag; if (this._onProgressInitialized && !this._unsubscribeChanges) { - const observable = this._store.changes() + const observable = this._store.changes(this._database, this._nodeTag) .forOperationId(this._operationId); const handler = value => { @@ -241,33 +326,25 @@ export class BulkInsertOperation { return { id, metadata, getId }; } - public async store(entity: object); - public async store(entity: object, id: string); - public async store(entity: object, metadata: IMetadataDictionary); - public async store(entity: object, id: string, metadata: IMetadataDictionary); + public async store(entity: object): Promise; + public async store(entity: object, id: string): Promise; + public async store(entity: object, metadata: IMetadataDictionary): Promise; + public async store(entity: object, id: string, metadata: IMetadataDictionary): Promise; public async store( entity: object, idOrMetadata?: string | IMetadataDictionary, - optionalMetadata?: IMetadataDictionary) { + optionalMetadata?: IMetadataDictionary): Promise { - const check = this._concurrencyCheck(); + const check = await this._concurrencyCheck(); try { const opts = BulkInsertOperation._typeCheckStoreArgs(idOrMetadata, optionalMetadata); let metadata = opts.metadata; const id = opts.getId ? await this._getId(entity) : opts.id; - this._lastWriteToStream = new Date(); BulkInsertOperation._verifyValidId(id); - if (!this._currentWriter) { - await this._waitForId(); - await this._ensureStream(); - } - - if (this._completedWithError || this._aborted) { - await this._checkIfBulkInsertWasAborted(); - } + await this._executeBeforeStore(); if (!metadata) { metadata = createMetadataDictionary({ @@ -292,34 +369,38 @@ export class BulkInsertOperation { this._endPreviousCommandIfNeeded(); - this._writeToStream(entity, id, metadata, "PUT"); + await this._writeToStream(entity, id, metadata, "PUT"); } finally { check.dispose(); } } - private _writeToStream(entity: object, id: string, metadata: IMetadataDictionary, type: CommandType) { - if (this._first) { - this._first = false; - } else { - this._writeComma(); - } - - this._inProgressCommand = "None"; + private async _writeToStream(entity: object, id: string, metadata: IMetadataDictionary, type: CommandType) { + try { + if (this._first) { + this._first = false; + } else { + this._writeComma(); + } - const documentInfo = new DocumentInfo(); - documentInfo.metadataInstance = metadata; - let json = EntityToJson.convertEntityToJson(entity, this._conventions, documentInfo, true); + this._inProgressCommand = "None"; - if (this._conventions.remoteEntityFieldNameConvention) { - json = this._conventions.transformObjectKeysToRemoteFieldNameConvention(json); - } + const documentInfo = new DocumentInfo(); + documentInfo.metadataInstance = metadata; + let json = EntityToJson.convertEntityToJson(entity, this._conventions, documentInfo, true); - this._currentWriter.push(`{"Id":"`); - this._writeString(id); - const jsonString = JsonSerializer.getDefault().serialize(json); - this._currentWriter.push(`","Type":"PUT","Document":${jsonString}}`); + if (this._conventions.remoteEntityFieldNameConvention) { + json = this._conventions.transformObjectKeysToRemoteFieldNameConvention(json); + } + this._currentWriter.push(`{"Id":"`); + this._writeString(id); + const jsonString = JsonSerializer.getDefault().serialize(json); + this._currentWriter.push(`","Type":"PUT","Document":${jsonString}}`); + await this.flushIfNeeded(); + } catch (e) { + this._handleErrors(id, e); + } } private _handleErrors(documentId: string, e: Error) { @@ -331,18 +412,58 @@ export class BulkInsertOperation { throw error; } - throwError("InvalidOperationException", "Bulk insert error", e); + throwError("InvalidOperationException", "Bulk insert error, Document Id: " + documentId, e); } - private _concurrencyCheck(): IDisposable { + private async _concurrencyCheck(): Promise { if (this._concurrentCheck) { throwError("BulkInsertInvalidOperationException", "Bulk Insert store methods cannot be executed concurrently."); } - this._concurrentCheck = 1; + const context = acquireSemaphore(this._streamLock); + await context.promise; + return { - dispose: () => this._concurrentCheck = 0 + dispose: () => { + context.dispose(); + this._concurrentCheck = 0; + } + } + } + + private async flushIfNeeded(force = false): Promise { + if (this._currentWriter.length > BulkInsertOperation._maxSizeInBuffer || this._asyncWriteDone) { + await this._asyncWrite; + + const tmp = this._currentWriter; + this._currentWriter = this._backgroundWriter; + this._backgroundWriter = tmp; + + this._currentWriter = new BulkInsertStream(); + + const buffer = this._backgroundWriter.toBuffer(); + force = true; // original version: force || this.isHeartbeatIntervalExceeded() || ; in node.js we need to force flush to use backpressure in steams + this._asyncWriteDone = false; + this._asyncWrite = this.writeToRequestBodyStream(buffer, force); + } + } + + private isHeartbeatIntervalExceeded(): boolean { + return Date.now() - this._lastWriteToStream.getTime() >= this._heartbeatCheckInterval; + } + + private async writeToRequestBodyStream(buffer: Buffer, force = false): Promise { + try { + this._requestBodyStream.write(buffer); + if (this._isInitialWrite || force) { + this._isInitialWrite = false; + + await this._requestBodyStream.flush(); + this._lastWriteToStream = new Date(); + } + } finally { + this._asyncWriteDone = true; } } @@ -372,30 +493,16 @@ export class BulkInsertOperation { } private async _executeBeforeStore() { - if (!this._currentWriter) { + if (!this._requestBodyStream) { await this._waitForId(); await this._ensureStream(); } - await this._checkIfBulkInsertWasAborted(); - } - - private async _checkIfBulkInsertWasAborted() { - if (this._completedWithError) { + if (this._bulkInsertExecuteTaskErrored) { try { await this._bulkInsertExecuteTask; } catch (error) { await this._throwBulkInsertAborted(error); - } finally { - this._currentWriter.emit("end"); - } - } - - if (this._aborted) { - try { - await this._bulkInsertAborted; - } finally { - this._currentWriter.emit("end"); } } } @@ -428,26 +535,31 @@ export class BulkInsertOperation { } private async _ensureStream() { - try { - this._currentWriter = new stream.PassThrough(); + //TODO: sync with c# + //TODO if (CompressionLevel != CompressionLevel.NoCompression) + // _streamExposerContent.Headers.ContentEncoding.Add("gzip"); - this._requestBodyStream = new stream.PassThrough(); + try { + this._requestBodyStream = new RequestBodyStream(); + this._stream = this._requestBodyStream; //TODO: const bulkCommand = new BulkInsertCommand(this._operationId, this._requestBodyStream, this._nodeTag, this._options.skipOverwriteIfUnchanged); bulkCommand.useCompression = this._useCompression; - const bulkCommandPromise = this._requestExecutor.execute(bulkCommand); + this._bulkInsertExecuteTask = this._requestExecutor.execute(bulkCommand); - this._pipelineFinished = StreamUtil.pipelineAsync(this._currentWriter, this._requestBodyStream); this._currentWriter.push("["); - this._bulkInsertExecuteTask = Promise.all([ - bulkCommandPromise, - this._pipelineFinished - ]); + /* TODO + if (CompressionLevel != CompressionLevel.NoCompression) + { + _compressedStream = new GZipStream(_stream, CompressionLevel, leaveOpen: true); + _requestBodyStream = _compressedStream; + } + */ this._bulkInsertExecuteTask - .catch(() => this._completedWithError = true); + .catch(() => this._bulkInsertExecuteTaskErrored = true); } catch (e) { throwError("RavenException", "Unable to open bulk insert stream.", e); @@ -455,57 +567,67 @@ export class BulkInsertOperation { } public async abort(): Promise { - this._aborted = true; - if (this._operationId !== -1) { await this._waitForId(); try { await this._requestExecutor.execute(new KillOperationCommand(this._operationId, this._nodeTag)); } catch (err) { - const bulkInsertError = getError("BulkInsertAbortedException", + throwError("BulkInsertAbortedException", "Unable to kill bulk insert operation, because it was not found on the server.", err); - this._abortReject(bulkInsertError); - return; } } - - this._abortReject(getError( - "BulkInsertAbortedException", "Bulk insert was aborted by the user.")); } public async finish(): Promise { - try { - this._endPreviousCommandIfNeeded(); + if (this._requestBodyStreamFinished) { + return; + } - if (this._currentWriter) { - this._currentWriter.push("]"); - this._currentWriter.push(null); - } + this._timer?.dispose(); // in node.js we destroy timer in different place - if (this._operationId === -1) { - // closing without calling a single store. - return; - } + this._endPreviousCommandIfNeeded(); - if (this._completedWithError || this._aborted) { - await this._checkIfBulkInsertWasAborted(); - } + let flushEx: Error; + + if (this._stream) { + try { + const context = acquireSemaphore(this._streamLock); + await context.promise; - if (this._unsubscribeChanges) { - this._unsubscribeChanges.dispose(); + try { + this._currentWriter.push("]"); + await this._asyncWrite; + this._requestBodyStream.write(this._currentWriter.toBuffer()); + //TODO: _compressedStream?.Dispose(); + await this._stream.flush(); + } finally { + context.dispose(); + } + } catch (e) { + flushEx = e; } - await Promise.race( - [ - this._bulkInsertExecuteTask || Promise.resolve(), - this._bulkInsertAborted || Promise.resolve() - ]); - } finally { - if (this._timer) { - this._timer.dispose(); + this._requestBodyStream.push(null); + this._requestBodyStreamFinished = true; + } + + if (this._operationId === -1) { + // closing without calling a single store. + return; + } + + if (this._bulkInsertExecuteTask) { + try { + await this._bulkInsertExecuteTask; + } catch (e) { + await this._throwBulkInsertAborted(e, flushEx) } } + + if (this._unsubscribeChanges) { + this._unsubscribeChanges.dispose(); + } } private readonly _conventions: DocumentConventions; @@ -620,7 +742,7 @@ export class BulkInsertOperation { public async increment(id: string, name: string); public async increment(id: string, name: string, delta: number); public async increment(id: string, name: string, delta: number = 1) { - const check = this._operation._concurrencyCheck(); + const check = await this._operation._concurrencyCheck(); try { @@ -669,6 +791,8 @@ export class BulkInsertOperation { this._operation._currentWriter.push(delta.toString()); this._operation._currentWriter.push("}"); + await this._operation.flushIfNeeded(); + } catch (e) { this._operation._handleErrors(this._id, e); } @@ -716,7 +840,7 @@ export class BulkInsertOperation { } protected async _appendInternal(timestamp: Date, values: number[], tag: string): Promise { - const check = this._operation._concurrencyCheck(); + const check = await this._operation._concurrencyCheck(); try { this._operation._lastWriteToStream = new Date(); @@ -757,7 +881,7 @@ export class BulkInsertOperation { } firstValue = false; - this._operation._currentWriter.push((value ?? 0).toString()); + this._operation._currentWriter.push(((value ?? 0).toString())); } if (tag) { @@ -767,6 +891,8 @@ export class BulkInsertOperation { } this._operation._currentWriter.push("]"); + + await this._operation.flushIfNeeded(); } catch (e) { this._operation._handleErrors(this._id, e); } @@ -862,7 +988,7 @@ export class BulkInsertOperation { public async store(id: string, name: string, bytes: Buffer): Promise; public async store(id: string, name: string, bytes: Buffer, contentType: string): Promise; public async store(id: string, name: string, bytes: Buffer, contentType?: string): Promise { - const check = this._operation._concurrencyCheck(); + const check = await this._operation._concurrencyCheck(); try { this._operation._lastWriteToStream = new Date(); @@ -889,7 +1015,11 @@ export class BulkInsertOperation { this._operation._currentWriter.push(bytes.length.toString()); this._operation._currentWriter.push("}"); - this._operation._currentWriter.push(bytes); + await this._operation.flushIfNeeded(); + + this._operation._currentWriter.push(bytes); //TODO: do we want to stream here? + + await this._operation.flushIfNeeded(); } catch (e) { this._operation._handleErrors(id, e); } @@ -961,7 +1091,6 @@ export class BulkInsertCommand extends RavenCommand { public async setResponseAsync(bodyStream: stream.Stream, fromCache: boolean): Promise { return throwError("NotImplementedException", "Not implemented"); } - } export interface BulkInsertOptions { diff --git a/src/Utility/StringBuilder.ts b/src/Utility/StringBuilder.ts index 65215af8..b27c3c3f 100644 --- a/src/Utility/StringBuilder.ts +++ b/src/Utility/StringBuilder.ts @@ -1,4 +1,4 @@ -import { Stream } from "node:stream"; +import { Stream } from "readable-stream"; export class StringBuilder { private s: any[] = []; diff --git a/test/Ported/Attachments/AttachmentsSessionTest.ts b/test/Ported/Attachments/AttachmentsSessionTest.ts index c22cd298..e32ac2f2 100644 --- a/test/Ported/Attachments/AttachmentsSessionTest.ts +++ b/test/Ported/Attachments/AttachmentsSessionTest.ts @@ -5,12 +5,12 @@ import { AttachmentName, IDocumentStore, DeleteAttachmentOperation, - DeleteCommandData + DeleteCommandData, AttachmentData } from "../../../src"; import * as stream from "readable-stream"; import { User } from "../../Assets/Entities"; import { CONSTANTS } from "../../../src/Constants"; -import * as StreamUtil from "../../../src/Utility/StreamUtil"; +import { bufferToReadable, finishedAsync } from "../../../src/Utility/StreamUtil"; describe("Attachments Session", function () { @@ -130,7 +130,15 @@ describe("Attachments Session", function () { } }); - it("can get & delete attachments", async () => { + it("can get & delete attachments - buffer", async () => { + await getGetAndDeleteAttachments(x => x); + }); + + it("can get & delete attachments - readable", async () => { + await getGetAndDeleteAttachments(bufferToReadable); + }); + + async function getGetAndDeleteAttachments(attachmentContentCreator: (buffer: Buffer) => AttachmentData) { const stream1 = Buffer.from([1, 2, 3]); const stream2 = Buffer.from([1, 2, 3, 4, 5, 6]); const stream3 = Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9]); @@ -143,10 +151,10 @@ describe("Attachments Session", function () { user.name = "Fitzchak"; await session.store(user, "users/1"); - session.advanced.attachments.store(user, "file1", stream1, "image/png"); - session.advanced.attachments.store(user, "file2", stream2, "image/png"); - session.advanced.attachments.store(user, "file3", stream3, "image/png"); - session.advanced.attachments.store(user, "file4", stream4, "image/png"); + session.advanced.attachments.store(user, "file1", attachmentContentCreator(stream1), "image/png"); + session.advanced.attachments.store(user, "file2", attachmentContentCreator(stream2), "image/png"); + session.advanced.attachments.store(user, "file3", attachmentContentCreator(stream3), "image/png"); + session.advanced.attachments.store(user, "file4", attachmentContentCreator(stream4), "image/png"); await session.saveChanges(); } @@ -196,11 +204,11 @@ describe("Attachments Session", function () { } })); - await StreamUtil.finishedAsync(result.data); + await finishedAsync(result.data); result.dispose(); assert.ok(Buffer.compare(bufResult, stream1) === 0); } - }); + } it("can delete attachment using command", async () => { { diff --git a/test/Ported/BulkInsert/BulkInsertsTest.ts b/test/Ported/BulkInsert/BulkInsertsTest.ts index 98b077df..0b7a2aa3 100644 --- a/test/Ported/BulkInsert/BulkInsertsTest.ts +++ b/test/Ported/BulkInsert/BulkInsertsTest.ts @@ -72,23 +72,15 @@ describe("bulk insert", function () { try { await bulkInsert.store(new FooBar()); await bulkInsert.abort(); - await bulkInsert.store(new FooBar()); + + for (let i = 0; i < 100; i++) { + await bulkInsert.store(new FooBar()); + await delay(250); + } assert.fail("Should have thrown."); } catch (error) { assert.strictEqual(error.name, "BulkInsertAbortedException", error.message); - const bulkInsertCanceled = /TaskCanceledException/i.test(error.message); - const bulkInsertNotRegisteredYet = - /Unable to kill bulk insert operation, because it was not found on the server./i.test(error.message); - const bulkInsertSuccessfullyKilled = - /Bulk insert was aborted by the user./i.test(error.message); - - // this one's racy, so it's one or the other - assert.ok( - bulkInsertCanceled - || bulkInsertNotRegisteredYet - || bulkInsertSuccessfullyKilled, - "Unexpected error message:" + error.message); } finally { try { await bulkInsert.finish(); @@ -108,23 +100,14 @@ describe("bulk insert", function () { await bulkInsert.store(new FooBar()); await delay(500); await bulkInsert.abort(); - await bulkInsert.store(new FooBar()); + for (let i = 0; i < 100; i++) { + await bulkInsert.store(new FooBar()); + await delay(250); + } assert.fail("Should have thrown."); } catch (error) { assert.strictEqual(error.name, "BulkInsertAbortedException", error.message); - const bulkInsertCanceled = /TaskCanceledException/i.test(error.message); - const bulkInsertNotRegisteredYet = - /Unable to kill bulk insert operation, because it was not found on the server./i.test(error.message); - const bulkInsertSuccessfullyKilled = - /Bulk insert was aborted by the user./i.test(error.message); - - // this one's racy, so it's one or the other - assert.ok( - bulkInsertCanceled - || bulkInsertNotRegisteredYet - || bulkInsertSuccessfullyKilled, - "Unexpected error message:" + error.message); } finally { try { await bulkInsert.finish();