-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: add hard limit on notion db size allowed to be turned into CSV. ContinueAsNew each time #7444
Merged
Fraggle
merged 1 commit into
main
from
1337-add-hard-limit-for-notion-database-import-size
Sep 17, 2024
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,10 +46,6 @@ const { saveSuccessSync, saveStartSync, shouldGarbageCollect } = | |
startToCloseTimeout: "1 minute", | ||
}); | ||
|
||
// soft limit on the number of iterations of the loop that should be ran in a single workflow | ||
// before "continuing as new" to avoid hitting the workflow log size limit | ||
const MAX_ITERATIONS_PER_WORKFLOW = 32; | ||
|
||
// Notion's "last edited" timestamp is precise to the minute | ||
const SYNC_PERIOD_DURATION_MS = 60_000; | ||
|
||
|
@@ -89,8 +85,6 @@ export async function notionSyncWorkflow({ | |
forceResync: boolean; | ||
garbageCollectionMode: NotionGarbageCollectionMode; | ||
}) { | ||
let iterations = 0; | ||
|
||
const topLevelWorkflowId = workflowInfo().workflowId; | ||
|
||
let lastSyncedPeriodTs: number | null = startFromTs | ||
|
@@ -120,164 +114,152 @@ export async function notionSyncWorkflow({ | |
|
||
const isInitialSync = !lastSyncedPeriodTs; | ||
|
||
do { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks 🙏 |
||
if (!isGarbageCollectionRun) { | ||
await saveStartSync(connectorId); | ||
} | ||
if (!isGarbageCollectionRun) { | ||
await saveStartSync(connectorId); | ||
} | ||
|
||
// clear the connector cache before each sync | ||
await clearWorkflowCache({ connectorId, topLevelWorkflowId }); | ||
// clear the connector cache before each sync | ||
await clearWorkflowCache({ connectorId, topLevelWorkflowId }); | ||
|
||
const runTimestamp = Date.now(); | ||
const runTimestamp = Date.now(); | ||
|
||
let cursors: (string | null)[] = [null, null]; | ||
let pageIndex = 0; | ||
const childWorkflowQueue = new PQueue({ | ||
concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS, | ||
}); | ||
let cursors: (string | null)[] = [null, null]; | ||
let pageIndex = 0; | ||
const childWorkflowQueue = new PQueue({ | ||
concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS, | ||
}); | ||
|
||
const promises: Promise<void>[] = []; | ||
const promises: Promise<void>[] = []; | ||
|
||
// we go through each result page of the notion search API | ||
do { | ||
// We only want to fetch pages that were updated since the last sync unless it's a garbage | ||
// collection run or a force resync. | ||
const skipUpToDatePages = !isGarbageCollectionRun && !forceResync; | ||
// we go through each result page of the notion search API | ||
do { | ||
// We only want to fetch pages that were updated since the last sync unless it's a garbage | ||
// collection run or a force resync. | ||
const skipUpToDatePages = !isGarbageCollectionRun && !forceResync; | ||
|
||
const { pageIds, databaseIds, nextCursor } = | ||
await getPagesAndDatabasesToSync({ | ||
connectorId, | ||
// If we're doing a garbage collection run, we want to fetch all pages otherwise, we only | ||
// want to fetch pages that were updated since the last sync. | ||
lastSyncedAt: !isGarbageCollectionRun ? lastSyncedPeriodTs : null, | ||
// Only pass non-null cursors. | ||
cursors: cursors.filter((c) => c !== null) as string[], | ||
excludeUpToDatePages: skipUpToDatePages, | ||
loggerArgs: { | ||
pageIndex, | ||
runType: isGarbageCollectionRun | ||
? "garbageCollection" | ||
: isInitialSync | ||
? "initialSync" | ||
: "incrementalSync", | ||
}, | ||
}); | ||
const { pageIds, databaseIds, nextCursor } = | ||
await getPagesAndDatabasesToSync({ | ||
connectorId, | ||
// If we're doing a garbage collection run, we want to fetch all pages otherwise, we only | ||
// want to fetch pages that were updated since the last sync. | ||
lastSyncedAt: !isGarbageCollectionRun ? lastSyncedPeriodTs : null, | ||
// Only pass non-null cursors. | ||
cursors: cursors.filter((c) => c !== null) as string[], | ||
excludeUpToDatePages: skipUpToDatePages, | ||
loggerArgs: { | ||
pageIndex, | ||
runType: isGarbageCollectionRun | ||
? "garbageCollection" | ||
: isInitialSync | ||
? "initialSync" | ||
: "incrementalSync", | ||
}, | ||
}); | ||
|
||
// Update the cursors array to keep only the last 2 cursors. | ||
cursors = [cursors[1] ?? null, nextCursor]; | ||
// Update the cursors array to keep only the last 2 cursors. | ||
cursors = [cursors[1] ?? null, nextCursor]; | ||
|
||
pageIndex += 1; | ||
pageIndex += 1; | ||
|
||
// this function triggers child workflows to process batches of pages and databases. | ||
// the worflow that processes databases will itself trigger child workflows to process | ||
// batches of child pages. | ||
promises.push( | ||
performUpserts({ | ||
// this function triggers child workflows to process batches of pages and databases. | ||
// the worflow that processes databases will itself trigger child workflows to process | ||
// batches of child pages. | ||
promises.push( | ||
performUpserts({ | ||
connectorId, | ||
pageIds, | ||
databaseIds, | ||
isGarbageCollectionRun: isGarbageCollectionRun, | ||
runTimestamp, | ||
pageIndex, | ||
isBatchSync: isInitialSync, | ||
queue: childWorkflowQueue, | ||
topLevelWorkflowId, | ||
forceResync, | ||
}) | ||
); | ||
} while (cursors[1]); | ||
|
||
// wait for all child workflows to finish | ||
await Promise.all(promises); | ||
|
||
if (isGarbageCollectionRun || isInitialSync) { | ||
// These are resources (pages/DBs) that we didn't get from the search API but that are | ||
// child/parent pages/DBs of other pages that we did get from the search API. We upsert those as | ||
// well. | ||
let discoveredResources: { | ||
pageIds: string[]; | ||
databaseIds: string[]; | ||
} | null; | ||
do { | ||
discoveredResources = await getDiscoveredResourcesFromCache({ | ||
connectorId, | ||
topLevelWorkflowId, | ||
}); | ||
if (discoveredResources) { | ||
await performUpserts({ | ||
connectorId, | ||
pageIds, | ||
databaseIds, | ||
pageIds: discoveredResources.pageIds, | ||
databaseIds: discoveredResources.databaseIds, | ||
isGarbageCollectionRun: isGarbageCollectionRun, | ||
runTimestamp, | ||
pageIndex, | ||
pageIndex: null, | ||
isBatchSync: isInitialSync, | ||
queue: childWorkflowQueue, | ||
childWorkflowsNameSuffix: "discovered", | ||
topLevelWorkflowId, | ||
forceResync, | ||
}) | ||
); | ||
} while (cursors[1]); | ||
|
||
// wait for all child workflows to finish | ||
await Promise.all(promises); | ||
|
||
if (isGarbageCollectionRun || isInitialSync) { | ||
// These are resources (pages/DBs) that we didn't get from the search API but that are | ||
// child/parent pages/DBs of other pages that we did get from the search API. We upsert those as | ||
// well. | ||
let discoveredResources: { | ||
pageIds: string[]; | ||
databaseIds: string[]; | ||
} | null; | ||
do { | ||
discoveredResources = await getDiscoveredResourcesFromCache({ | ||
connectorId, | ||
topLevelWorkflowId, | ||
}); | ||
if (discoveredResources) { | ||
await performUpserts({ | ||
connectorId, | ||
pageIds: discoveredResources.pageIds, | ||
databaseIds: discoveredResources.databaseIds, | ||
isGarbageCollectionRun: isGarbageCollectionRun, | ||
runTimestamp, | ||
pageIndex: null, | ||
isBatchSync: isInitialSync, | ||
queue: childWorkflowQueue, | ||
childWorkflowsNameSuffix: "discovered", | ||
topLevelWorkflowId, | ||
forceResync, | ||
}); | ||
} | ||
} while (discoveredResources && PROCESS_ALL_DISCOVERED_RESOURCES); | ||
} | ||
} | ||
} while (discoveredResources && PROCESS_ALL_DISCOVERED_RESOURCES); | ||
} | ||
|
||
// Compute parents after all documents are added/updated. | ||
// We only do this if it's not a garbage collection run, to prevent race conditions. | ||
if (!isGarbageCollectionRun) { | ||
await updateParentsFields(connectorId); | ||
} | ||
// Compute parents after all documents are added/updated. | ||
// We only do this if it's not a garbage collection run, to prevent race conditions. | ||
if (!isGarbageCollectionRun) { | ||
await updateParentsFields(connectorId); | ||
} | ||
|
||
if (!isGarbageCollectionRun) { | ||
await saveSuccessSync(connectorId); | ||
lastSyncedPeriodTs = preProcessTimestampForNotion(runTimestamp); | ||
} else { | ||
// Look at pages and databases that were not visited in this run, check with the notion API if | ||
// they were really deleted and delete them from the database if they were. | ||
// Find the resources not seen in the GC run | ||
|
||
// Create batches of resources to check, by chunk of 100 | ||
const nbOfBatches = | ||
await createResourcesNotSeenInGarbageCollectionRunBatches({ | ||
connectorId, | ||
batchSize: 100, | ||
}); | ||
if (!isGarbageCollectionRun) { | ||
await saveSuccessSync(connectorId); | ||
lastSyncedPeriodTs = preProcessTimestampForNotion(runTimestamp); | ||
} else { | ||
// Look at pages and databases that were not visited in this run, check with the notion API if | ||
// they were really deleted and delete them from the database if they were. | ||
// Find the resources not seen in the GC run | ||
|
||
// For each chunk, run a garbage collection activity | ||
const queue = new PQueue({ | ||
concurrency: MAX_PENDING_GARBAGE_COLLECTION_ACTIVITIES, | ||
// Create batches of resources to check, by chunk of 100 | ||
const nbOfBatches = | ||
await createResourcesNotSeenInGarbageCollectionRunBatches({ | ||
connectorId, | ||
batchSize: 100, | ||
}); | ||
const promises: Promise<void>[] = []; | ||
for (let batchIndex = 0; batchIndex < nbOfBatches; batchIndex++) { | ||
promises.push( | ||
queue.add(async () => | ||
garbageCollectBatch({ | ||
connectorId, | ||
runTimestamp, | ||
batchIndex, | ||
startTs: new Date().getTime(), | ||
}) | ||
) | ||
); | ||
} | ||
|
||
await Promise.all(promises); | ||
|
||
// Once done, clear all the redis keys used for garbage collection | ||
await completeGarbageCollectionRun(connectorId, nbOfBatches); | ||
// For each chunk, run a garbage collection activity | ||
const queue = new PQueue({ | ||
concurrency: MAX_PENDING_GARBAGE_COLLECTION_ACTIVITIES, | ||
}); | ||
const promises: Promise<void>[] = []; | ||
for (let batchIndex = 0; batchIndex < nbOfBatches; batchIndex++) { | ||
promises.push( | ||
queue.add(async () => | ||
garbageCollectBatch({ | ||
connectorId, | ||
runTimestamp, | ||
batchIndex, | ||
startTs: new Date().getTime(), | ||
}) | ||
) | ||
); | ||
} | ||
|
||
iterations += 1; | ||
await sleep(INTERVAL_BETWEEN_SYNCS_MS); | ||
} while ( | ||
// We run the loop for MAX_ITERATIONS_PER_WORKFLOW iterations to avoid hitting the workflow log | ||
// size limit and "continue as new" to start a new workflow. | ||
// If it's the initial sync, a force resync, or a garbage collection run, we only do one | ||
// iteration. | ||
!isInitialSync && | ||
!forceResync && | ||
!isGarbageCollectionRun && | ||
iterations < MAX_ITERATIONS_PER_WORKFLOW | ||
); | ||
await Promise.all(promises); | ||
|
||
// Once done, clear all the redis keys used for garbage collection | ||
await completeGarbageCollectionRun(connectorId, nbOfBatches); | ||
} | ||
|
||
await sleep(INTERVAL_BETWEEN_SYNCS_MS); | ||
|
||
await continueAsNew<typeof notionSyncWorkflow>({ | ||
connectorId, | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could also avoid holding onto this entirely by generating smaller CSVs as you go through the chunks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I was going for initially but: