From 851e819dfeb5a302dad1db4a9f2f35f1bc16f514 Mon Sep 17 00:00:00 2001 From: Rok Pajk Kosec Date: Wed, 31 Jul 2024 13:59:17 +0000 Subject: [PATCH 1/2] AWS Bedrock embeddings call retry extension --- .../AWSBedrockEmbedding.ts | 57 ++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts b/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts index 5c5f6352b9..b5c8bd2eaa 100644 --- a/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts +++ b/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts @@ -83,6 +83,24 @@ class AWSBedrockEmbedding_Embeddings implements INode { } ], optional: true + }, + { + label: 'Batch Size', + name: 'batchSize', + description: 'Documents batch size to send to AWS API for Titan model embeddings. Used to avoid throttling.', + type: 'number', + optional: true, + default: 50, + additionalParams: true + }, + { + label: 'Max AWS API retries', + name: 'maxRetries', + description: 'This will limit the nubmer of AWS API for Titan model embeddings call retries. Used to avoid throttling.', + type: 'number', + optional: true, + default: 5, + additionalParams: true } ] } @@ -144,7 +162,10 @@ class AWSBedrockEmbedding_Embeddings implements INode { if (iModel.startsWith('cohere')) { return await embedTextCohere(documents, client, iModel, inputType) } else { - return Promise.all(documents.map((document) => embedTextTitan(document, client, iModel))) + // return Promise.all(documents.map((document) => embedTextTitan(document, client, iModel))) + const batchSize = nodeData.inputs?.batchSize as number + const maxRetries = nodeData.inputs?.maxRetries as number + return processInBatches(documents, batchSize, maxRetries, (document) => embedTextTitan(document, client, iModel)) } } return model @@ -195,4 +216,38 @@ const embedTextCohere = async (texts: string[], client: BedrockRuntimeClient, mo } } +const processInBatches = async ( + documents: string[], + batchSize: number, + maxRetries: number, + processFunc: (document: string) => Promise +): Promise => { + let sleepTime = 0 + let retryCounter = 0 + let result: number[][] = [] + for (let i = 0; i < documents.length; i += batchSize) { + let chunk = documents.slice(i, i + batchSize) + try { + let chunkResult = await Promise.all(chunk.map(processFunc)) + result.push(...chunkResult) + retryCounter = 0 + } catch (e) { + if (retryCounter < maxRetries && e.name.includes('ThrottlingException')) { + retryCounter = retryCounter + 1 + i = i - batchSize + sleepTime = sleepTime + 100 + } else { + // Split to distinguish between throttling retry error and other errors in trance + if (e.name.includes('ThrottlingException')) { + throw new Error('AWS Bedrock retry limit reached: ' + e) + } else { + throw new Error(e) + } + } + } + await new Promise((resolve) => setTimeout(resolve, sleepTime)) + } + return result +} + module.exports = { nodeClass: AWSBedrockEmbedding_Embeddings } From 932f25c2f76c0b5579bf118976b643e9c0e655b4 Mon Sep 17 00:00:00 2001 From: Rok Pajk Kosec Date: Fri, 2 Aug 2024 09:37:24 +0000 Subject: [PATCH 2/2] Removed unnecessary commented line --- .../nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts b/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts index b5c8bd2eaa..4946fa8bbc 100644 --- a/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts +++ b/packages/components/nodes/embeddings/AWSBedrockEmbedding/AWSBedrockEmbedding.ts @@ -162,7 +162,6 @@ class AWSBedrockEmbedding_Embeddings implements INode { if (iModel.startsWith('cohere')) { return await embedTextCohere(documents, client, iModel, inputType) } else { - // return Promise.all(documents.map((document) => embedTextTitan(document, client, iModel))) const batchSize = nodeData.inputs?.batchSize as number const maxRetries = nodeData.inputs?.maxRetries as number return processInBatches(documents, batchSize, maxRetries, (document) => embedTextTitan(document, client, iModel))