diff --git a/connectors/src/connectors/notion/temporal/activities.ts b/connectors/src/connectors/notion/temporal/activities.ts index 053d71bf1a28..5eefabedf5db 100644 --- a/connectors/src/connectors/notion/temporal/activities.ts +++ b/connectors/src/connectors/notion/temporal/activities.ts @@ -78,6 +78,7 @@ import type { DataSourceConfig } from "@connectors/types/data_source_config"; const logger = mainLogger.child({ provider: "notion" }); const GARBAGE_COLLECTION_INTERVAL_HOURS = 12; +const DATABASE_TO_CSV_MAX_SIZE = 256 * 1024 * 1024; // 256MB export async function fetchDatabaseChildPages({ connectorId, @@ -2453,7 +2454,7 @@ export async function upsertDatabaseStructuredDataFromCache({ return; } - const pageCacheEntries = await NotionConnectorPageCacheEntry.findAll({ + const pageCacheEntriesCount = await NotionConnectorPageCacheEntry.count({ where: { parentId: databaseId, connectorId, @@ -2461,19 +2462,60 @@ export async function upsertDatabaseStructuredDataFromCache({ }, }); - if (!pageCacheEntries.length) { + if (!pageCacheEntriesCount) { localLogger.info("No pages found in cache (skipping)."); return; } - const pagesProperties = pageCacheEntries.map( - (p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties - ); + let pagesProperties: PageObjectProperties[] = []; + let dustIdColumn: string[] = []; + + // Loop by chunks of 250 and use raw data to avoid memory issues + const chunkSize = 250; + let currentSizeInBytes = 0; + for (let i = 0; i < pageCacheEntriesCount; i += chunkSize) { + const pageCacheEntries: { + notionPageId: string; + pagePropertiesText: string; + }[] = await NotionConnectorPageCacheEntry.findAll({ + attributes: ["notionPageId", "pagePropertiesText"], + raw: true, + where: { + parentId: databaseId, + connectorId, + workflowId: topLevelWorkflowId, + }, + limit: chunkSize, + offset: i, + }); + + currentSizeInBytes += pageCacheEntries.reduce( + (acc, p) => acc + p.pagePropertiesText.length, + 0 + ); + + if (currentSizeInBytes > DATABASE_TO_CSV_MAX_SIZE) { + localLogger.info( + "Database size is too large to upsert, skipping. Action: maybe add a skipReason to avoid even trying." + ); + return; + } + + pagesProperties = pagesProperties.concat( + pageCacheEntries.map( + (p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties + ) + ); + + dustIdColumn = dustIdColumn.concat( + pageCacheEntries.map((p) => p.notionPageId) + ); + } const csv = await renderDatabaseFromPages({ databaseTitle: null, pagesProperties, - dustIdColumn: pageCacheEntries.map((p) => `notion-${p.notionPageId}`), + dustIdColumn, cellSeparator: ",", rowBoundary: "", }); diff --git a/connectors/src/connectors/notion/temporal/workflows.ts b/connectors/src/connectors/notion/temporal/workflows.ts index 02c55eff92e0..180079af4d10 100644 --- a/connectors/src/connectors/notion/temporal/workflows.ts +++ b/connectors/src/connectors/notion/temporal/workflows.ts @@ -46,10 +46,6 @@ const { saveSuccessSync, saveStartSync, shouldGarbageCollect } = startToCloseTimeout: "1 minute", }); -// soft limit on the number of iterations of the loop that should be ran in a single workflow -// before "continuing as new" to avoid hitting the workflow log size limit -const MAX_ITERATIONS_PER_WORKFLOW = 32; - // Notion's "last edited" timestamp is precise to the minute const SYNC_PERIOD_DURATION_MS = 60_000; @@ -89,8 +85,6 @@ export async function notionSyncWorkflow({ forceResync: boolean; garbageCollectionMode: NotionGarbageCollectionMode; }) { - let iterations = 0; - const topLevelWorkflowId = workflowInfo().workflowId; let lastSyncedPeriodTs: number | null = startFromTs @@ -120,164 +114,152 @@ export async function notionSyncWorkflow({ const isInitialSync = !lastSyncedPeriodTs; - do { - if (!isGarbageCollectionRun) { - await saveStartSync(connectorId); - } + if (!isGarbageCollectionRun) { + await saveStartSync(connectorId); + } - // clear the connector cache before each sync - await clearWorkflowCache({ connectorId, topLevelWorkflowId }); + // clear the connector cache before each sync + await clearWorkflowCache({ connectorId, topLevelWorkflowId }); - const runTimestamp = Date.now(); + const runTimestamp = Date.now(); - let cursors: (string | null)[] = [null, null]; - let pageIndex = 0; - const childWorkflowQueue = new PQueue({ - concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS, - }); + let cursors: (string | null)[] = [null, null]; + let pageIndex = 0; + const childWorkflowQueue = new PQueue({ + concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS, + }); - const promises: Promise[] = []; + const promises: Promise[] = []; - // we go through each result page of the notion search API - do { - // We only want to fetch pages that were updated since the last sync unless it's a garbage - // collection run or a force resync. - const skipUpToDatePages = !isGarbageCollectionRun && !forceResync; + // we go through each result page of the notion search API + do { + // We only want to fetch pages that were updated since the last sync unless it's a garbage + // collection run or a force resync. + const skipUpToDatePages = !isGarbageCollectionRun && !forceResync; - const { pageIds, databaseIds, nextCursor } = - await getPagesAndDatabasesToSync({ - connectorId, - // If we're doing a garbage collection run, we want to fetch all pages otherwise, we only - // want to fetch pages that were updated since the last sync. - lastSyncedAt: !isGarbageCollectionRun ? lastSyncedPeriodTs : null, - // Only pass non-null cursors. - cursors: cursors.filter((c) => c !== null) as string[], - excludeUpToDatePages: skipUpToDatePages, - loggerArgs: { - pageIndex, - runType: isGarbageCollectionRun - ? "garbageCollection" - : isInitialSync - ? "initialSync" - : "incrementalSync", - }, - }); + const { pageIds, databaseIds, nextCursor } = + await getPagesAndDatabasesToSync({ + connectorId, + // If we're doing a garbage collection run, we want to fetch all pages otherwise, we only + // want to fetch pages that were updated since the last sync. + lastSyncedAt: !isGarbageCollectionRun ? lastSyncedPeriodTs : null, + // Only pass non-null cursors. + cursors: cursors.filter((c) => c !== null) as string[], + excludeUpToDatePages: skipUpToDatePages, + loggerArgs: { + pageIndex, + runType: isGarbageCollectionRun + ? "garbageCollection" + : isInitialSync + ? "initialSync" + : "incrementalSync", + }, + }); - // Update the cursors array to keep only the last 2 cursors. - cursors = [cursors[1] ?? null, nextCursor]; + // Update the cursors array to keep only the last 2 cursors. + cursors = [cursors[1] ?? null, nextCursor]; - pageIndex += 1; + pageIndex += 1; - // this function triggers child workflows to process batches of pages and databases. - // the worflow that processes databases will itself trigger child workflows to process - // batches of child pages. - promises.push( - performUpserts({ + // this function triggers child workflows to process batches of pages and databases. + // the worflow that processes databases will itself trigger child workflows to process + // batches of child pages. + promises.push( + performUpserts({ + connectorId, + pageIds, + databaseIds, + isGarbageCollectionRun: isGarbageCollectionRun, + runTimestamp, + pageIndex, + isBatchSync: isInitialSync, + queue: childWorkflowQueue, + topLevelWorkflowId, + forceResync, + }) + ); + } while (cursors[1]); + + // wait for all child workflows to finish + await Promise.all(promises); + + if (isGarbageCollectionRun || isInitialSync) { + // These are resources (pages/DBs) that we didn't get from the search API but that are + // child/parent pages/DBs of other pages that we did get from the search API. We upsert those as + // well. + let discoveredResources: { + pageIds: string[]; + databaseIds: string[]; + } | null; + do { + discoveredResources = await getDiscoveredResourcesFromCache({ + connectorId, + topLevelWorkflowId, + }); + if (discoveredResources) { + await performUpserts({ connectorId, - pageIds, - databaseIds, + pageIds: discoveredResources.pageIds, + databaseIds: discoveredResources.databaseIds, isGarbageCollectionRun: isGarbageCollectionRun, runTimestamp, - pageIndex, + pageIndex: null, isBatchSync: isInitialSync, queue: childWorkflowQueue, + childWorkflowsNameSuffix: "discovered", topLevelWorkflowId, forceResync, - }) - ); - } while (cursors[1]); - - // wait for all child workflows to finish - await Promise.all(promises); - - if (isGarbageCollectionRun || isInitialSync) { - // These are resources (pages/DBs) that we didn't get from the search API but that are - // child/parent pages/DBs of other pages that we did get from the search API. We upsert those as - // well. - let discoveredResources: { - pageIds: string[]; - databaseIds: string[]; - } | null; - do { - discoveredResources = await getDiscoveredResourcesFromCache({ - connectorId, - topLevelWorkflowId, }); - if (discoveredResources) { - await performUpserts({ - connectorId, - pageIds: discoveredResources.pageIds, - databaseIds: discoveredResources.databaseIds, - isGarbageCollectionRun: isGarbageCollectionRun, - runTimestamp, - pageIndex: null, - isBatchSync: isInitialSync, - queue: childWorkflowQueue, - childWorkflowsNameSuffix: "discovered", - topLevelWorkflowId, - forceResync, - }); - } - } while (discoveredResources && PROCESS_ALL_DISCOVERED_RESOURCES); - } + } + } while (discoveredResources && PROCESS_ALL_DISCOVERED_RESOURCES); + } - // Compute parents after all documents are added/updated. - // We only do this if it's not a garbage collection run, to prevent race conditions. - if (!isGarbageCollectionRun) { - await updateParentsFields(connectorId); - } + // Compute parents after all documents are added/updated. + // We only do this if it's not a garbage collection run, to prevent race conditions. + if (!isGarbageCollectionRun) { + await updateParentsFields(connectorId); + } - if (!isGarbageCollectionRun) { - await saveSuccessSync(connectorId); - lastSyncedPeriodTs = preProcessTimestampForNotion(runTimestamp); - } else { - // Look at pages and databases that were not visited in this run, check with the notion API if - // they were really deleted and delete them from the database if they were. - // Find the resources not seen in the GC run - - // Create batches of resources to check, by chunk of 100 - const nbOfBatches = - await createResourcesNotSeenInGarbageCollectionRunBatches({ - connectorId, - batchSize: 100, - }); + if (!isGarbageCollectionRun) { + await saveSuccessSync(connectorId); + lastSyncedPeriodTs = preProcessTimestampForNotion(runTimestamp); + } else { + // Look at pages and databases that were not visited in this run, check with the notion API if + // they were really deleted and delete them from the database if they were. + // Find the resources not seen in the GC run - // For each chunk, run a garbage collection activity - const queue = new PQueue({ - concurrency: MAX_PENDING_GARBAGE_COLLECTION_ACTIVITIES, + // Create batches of resources to check, by chunk of 100 + const nbOfBatches = + await createResourcesNotSeenInGarbageCollectionRunBatches({ + connectorId, + batchSize: 100, }); - const promises: Promise[] = []; - for (let batchIndex = 0; batchIndex < nbOfBatches; batchIndex++) { - promises.push( - queue.add(async () => - garbageCollectBatch({ - connectorId, - runTimestamp, - batchIndex, - startTs: new Date().getTime(), - }) - ) - ); - } - await Promise.all(promises); - - // Once done, clear all the redis keys used for garbage collection - await completeGarbageCollectionRun(connectorId, nbOfBatches); + // For each chunk, run a garbage collection activity + const queue = new PQueue({ + concurrency: MAX_PENDING_GARBAGE_COLLECTION_ACTIVITIES, + }); + const promises: Promise[] = []; + for (let batchIndex = 0; batchIndex < nbOfBatches; batchIndex++) { + promises.push( + queue.add(async () => + garbageCollectBatch({ + connectorId, + runTimestamp, + batchIndex, + startTs: new Date().getTime(), + }) + ) + ); } - iterations += 1; - await sleep(INTERVAL_BETWEEN_SYNCS_MS); - } while ( - // We run the loop for MAX_ITERATIONS_PER_WORKFLOW iterations to avoid hitting the workflow log - // size limit and "continue as new" to start a new workflow. - // If it's the initial sync, a force resync, or a garbage collection run, we only do one - // iteration. - !isInitialSync && - !forceResync && - !isGarbageCollectionRun && - iterations < MAX_ITERATIONS_PER_WORKFLOW - ); + await Promise.all(promises); + + // Once done, clear all the redis keys used for garbage collection + await completeGarbageCollectionRun(connectorId, nbOfBatches); + } + + await sleep(INTERVAL_BETWEEN_SYNCS_MS); await continueAsNew({ connectorId,