Skip to content

Commit

Permalink
refactor: worker pool (#7553)
Browse files Browse the repository at this point in the history
* feat: add fasterDownloads feature flag for users

* feat: add feature flag for fasterDownloads and timeout fn

* fix: remove worker pool limitation since we have convoy algo

* feat: add datadog monitoring

* fix: weight the latency by the number of submissions

* fix: remove statsdclient and enable flag for devmode

* fix: remove unused fe feature flag

* fix: remove unused variable

* feat: use blob url instead of copying blob

* fix: check correct variable

* fix: add console log for staging test

* fix: remove perf measurements and increase timeout

* fix: change timeout to 50ms
  • Loading branch information
g-tejas committed Aug 8, 2024
1 parent bac86b2 commit 90fbd45
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { useCallback, useEffect, useRef, useState } from 'react'
import { useMutation, UseMutationOptions } from 'react-query'
import { datadogLogs } from '@datadog/browser-logs'
import { useFeature } from '@growthbook/growthbook-react'

import { waitForMs } from '~utils/waitForMs'

Expand All @@ -14,7 +15,10 @@ import {
} from '~features/analytics/AnalyticsService'
import { useUser } from '~features/user/queries'

import { downloadResponseAttachment } from './utils/downloadCsv'
import {
downloadResponseAttachment,
downloadResponseAttachmentURL,
} from './utils/downloadCsv'
import { EncryptedResponseCsvGenerator } from './utils/EncryptedResponseCsvGenerator'
import {
EncryptedResponsesStreamParams,
Expand Down Expand Up @@ -55,6 +59,19 @@ interface UseDecryptionWorkersProps {
>
}

function timeout(
ms: number,
errorMessage = 'Operation timed out',
): Promise<never> {
return new Promise((_, reject) =>
setTimeout(() => reject(new Error(errorMessage)), ms),
)
}

function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return Promise.race([promise, timeout(ms)])
}

const useDecryptionWorkers = ({
onProgress,
mutateProps,
Expand All @@ -65,6 +82,11 @@ const useDecryptionWorkers = ({
const { data: adminForm } = useAdminForm()
const { user } = useUser()

const isDev = process.env.NODE_ENV === 'development'

const fasterDownloadsFeature = useFeature('faster-downloads')
const fasterDownloads = fasterDownloadsFeature.on || isDev

useEffect(() => {
return () => killWorkers(workers)
}, [workers])
Expand Down Expand Up @@ -161,13 +183,16 @@ const useDecryptionWorkers = ({
// round-robin scheduling
const { workerApi } =
workerPool[receivedRecordCount % numWorkers]
const decryptResult = await workerApi.decryptIntoCsv({
line: result.value,
secretKey,
downloadAttachments,
formId: adminForm._id,
hostOrigin: window.location.origin,
})
const decryptResult = await workerApi.decryptIntoCsv(
{
line: result.value,
secretKey,
downloadAttachments,
formId: adminForm._id,
hostOrigin: window.location.origin,
},
fasterDownloads,
)
progress += 1
onProgress(progress)

Expand Down Expand Up @@ -348,11 +373,320 @@ const useDecryptionWorkers = ({
})
})
},
[adminForm, onProgress, user?._id, workers],
[adminForm, onProgress, user?._id, workers, fasterDownloads],
)

const downloadEncryptedResponsesFaster = useCallback(
async ({
responsesCount,
downloadAttachments,
secretKey,
endDate,
startDate,
}: DownloadEncryptedParams) => {
if (!adminForm || !responsesCount) {
return Promise.resolve({
expectedCount: 0,
successCount: 0,
errorCount: 0,
})
}

console.log('Faster downloads is enabled ⚡')

abortControllerRef.current.abort()
const freshAbortController = new AbortController()
abortControllerRef.current = freshAbortController

if (workers.length) killWorkers(workers)

const numWorkers = window.navigator.hardwareConcurrency || 4
let errorCount = 0
let unverifiedCount = 0
let attachmentErrorCount = 0
let unknownStatusCount = 0

const logMeta = {
action: 'downloadEncryptedReponses',
formId: adminForm._id,
formTitle: adminForm.title,
downloadAttachments: downloadAttachments,
num_workers: numWorkers,
expectedNumSubmissions: NUM_OF_METADATA_ROWS,
adminId: user?._id,
}
// Trigger analytics here before starting decryption worker
trackDownloadResponseStart(adminForm, numWorkers, NUM_OF_METADATA_ROWS)
datadogLogs.logger.info('Download response start', {
meta: {
...logMeta,
},
})

const workerPool: CleanableDecryptionWorkerApi[] = []
const idleWorkers: number[] = []

for (let i = workerPool.length; i < numWorkers; i++) {
workerPool.push(makeWorkerApiAndCleanup())
idleWorkers.push(i)
}

setWorkers(workerPool)

const csvGenerator = new EncryptedResponseCsvGenerator(
responsesCount,
NUM_OF_METADATA_ROWS,
)

const stream = await getEncryptedResponsesStream(
adminForm._id,
{ downloadAttachments, endDate, startDate },
freshAbortController,
)

const processTask = async (value: string, workerIdx: number) => {
const { workerApi } = workerPool[workerIdx]

const decryptResult = await workerApi.decryptIntoCsv(
{
line: value,
secretKey,
downloadAttachments,
formId: adminForm._id,
hostOrigin: window.location.origin,
},
fasterDownloads,
)

switch (decryptResult.status) {
case CsvRecordStatus.Ok:
try {
csvGenerator.addRecord(decryptResult.submissionData)
} catch (e) {
errorCount++
console.error('Error in getResponseInstance', e)
}

// It's fine to hog on to the worker here while waiting for the browser
// rate limit to pass. If decryption is fast, we would wait regardless.
// If decryption is slow, we won't hit rate limits.
if (downloadAttachments && decryptResult.downloadBlobURL) {
await downloadResponseAttachmentURL(
decryptResult.downloadBlobURL,
decryptResult.id,
)
URL.revokeObjectURL(decryptResult.downloadBlobURL)
}
break
case CsvRecordStatus.Unknown:
unknownStatusCount++
break
case CsvRecordStatus.Error:
errorCount++
break
case CsvRecordStatus.AttachmentError:
errorCount++
attachmentErrorCount++
break
case CsvRecordStatus.Unverified:
unverifiedCount++
break
default: {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const _: never = decryptResult.status
throw new Error('Invalid decryptResult status encountered.')
}
}
return workerIdx
}

const readAndQueueTask = async () => {
const reader = stream.getReader()
let progress = 0
let pendingTasks: Promise<number>[] = []

try {
while (progress < responsesCount) {
const { done, value } = await reader.read()
if (done) break

progress += 1
onProgress(progress)

while (idleWorkers.length === 0) {
const finishedTasks: number[] = []
for (let i = 0; i < pendingTasks.length; i++) {
try {
const freedWorkerIdx = await withTimeout(pendingTasks[i], 50)
idleWorkers.push(freedWorkerIdx)
finishedTasks.push(i)
} catch (e) {
if (
e instanceof Error &&
e.message === 'Operation timed out'
) {
continue
}
console.error(`Error in task ${i}`, e)
}
}
pendingTasks = pendingTasks.filter(
(_, i) => !finishedTasks.includes(i),
)
}

const workerIdx = idleWorkers.shift()!
pendingTasks.push(processTask(value, workerIdx))
}
await Promise.all(pendingTasks)
} catch (e) {
console.error('Error reading stream', e)
} finally {
reader.releaseLock()
}
}

const downloadStartTime = performance.now()

return new Promise<DownloadResult>((resolve, reject) => {
readAndQueueTask()
.catch((err) => {
if (!downloadStartTime) {
// No start time, means did not even start http request.
datadogLogs.logger.info('Download network failure', {
meta: {
...logMeta,
error: {
message: err.message,
name: err.name,
stack: err.stack,
},
},
})

trackDownloadNetworkFailure(adminForm, err)
} else {
const downloadFailedTime = performance.now()
const timeDifference = downloadFailedTime - downloadStartTime

datadogLogs.logger.info('Download response failure', {
meta: {
...logMeta,
duration: timeDifference,
error: {
message: err.message,
name: err.name,
stack: err.stack,
},
},
})

trackDownloadResponseFailure(
adminForm,
numWorkers,
NUM_OF_METADATA_ROWS,
timeDifference,
err,
)

console.error(
'Failed to download data, is there a network issue?',
err,
)
killWorkers(workerPool)
reject(err)
}
})
.finally(() => {
const checkComplete = () => {
// If all the records could not be decrypted
if (errorCount + unverifiedCount === responsesCount) {
const failureEndTime = performance.now()
// todo: check the timedifference redeclaration
const timeDifference = failureEndTime - downloadStartTime

datadogLogs.logger.info('Partial decryption failure', {
meta: {
...logMeta,
duration: timeDifference,
error_count: errorCount,
unverified_count: unverifiedCount,
attachment_error_count: attachmentErrorCount,
unknown_status_count: unknownStatusCount,
},
})

trackPartialDecryptionFailure(
adminForm,
numWorkers,
csvGenerator.length(),
timeDifference,
errorCount,
attachmentErrorCount,
)

killWorkers(workerPool)
resolve({
expectedCount: responsesCount,
successCount: csvGenerator.length(),
errorCount,
unverifiedCount,
})
} else if (
// All results have been decrypted
csvGenerator.length() + errorCount + unverifiedCount >=
responsesCount
) {
killWorkers(workerPool)
// Generate first three rows of meta-data before download
csvGenerator.addMetaDataFromSubmission(
errorCount,
unverifiedCount,
)
csvGenerator.downloadCsv(
`${adminForm.title}-${adminForm._id}.csv`,
)

const downloadEndTime = performance.now()
const timeDifference = downloadEndTime - downloadStartTime

datadogLogs.logger.info('Download response success', {
meta: {
...logMeta,
duration: timeDifference,
},
})

trackDownloadResponseSuccess(
adminForm,
numWorkers,
NUM_OF_METADATA_ROWS,
timeDifference,
)

resolve({
expectedCount: responsesCount,
successCount: csvGenerator.length(),
errorCount,
unverifiedCount,
})
} else {
setTimeout(checkComplete, 100)
}
}

checkComplete()
})
})
},
[adminForm, onProgress, user?._id, workers, fasterDownloads],
)

const handleExportCsvMutation = useMutation(
(params: DownloadEncryptedParams) => downloadEncryptedResponses(params),
(params: DownloadEncryptedParams) =>
fasterDownloads
? downloadEncryptedResponsesFaster(params)
: downloadEncryptedResponses(params),
mutateProps,
)

Expand Down
Loading

0 comments on commit 90fbd45

Please sign in to comment.