Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: add hard limit on notion db size allowed to be turned into CSV. ContinueAsNew each time #7444

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import type { DataSourceConfig } from "@connectors/types/data_source_config";
const logger = mainLogger.child({ provider: "notion" });

const GARBAGE_COLLECTION_INTERVAL_HOURS = 12;
const DATABASE_TO_CSV_MAX_SIZE = 256 * 1024 * 1024; // 256MB

export async function fetchDatabaseChildPages({
connectorId,
Expand Down Expand Up @@ -2453,27 +2454,68 @@ export async function upsertDatabaseStructuredDataFromCache({
return;
}

const pageCacheEntries = await NotionConnectorPageCacheEntry.findAll({
const pageCacheEntriesCount = await NotionConnectorPageCacheEntry.count({
where: {
parentId: databaseId,
connectorId,
workflowId: topLevelWorkflowId,
},
});

if (!pageCacheEntries.length) {
if (!pageCacheEntriesCount) {
localLogger.info("No pages found in cache (skipping).");
return;
}

const pagesProperties = pageCacheEntries.map(
(p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties
);
let pagesProperties: PageObjectProperties[] = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could also avoid holding onto this entirely by generating smaller CSVs as you go through the chunks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was going for initially but:

  • there is some header management to do if I have several small csv
  • in the end, the text would be turned into text (as csv) in memory anyway, I wouldn't be saving anything :(

let dustIdColumn: string[] = [];

// Loop by chunks of 250 and use raw data to avoid memory issues
const chunkSize = 250;
let currentSizeInBytes = 0;
for (let i = 0; i < pageCacheEntriesCount; i += chunkSize) {
const pageCacheEntries: {
notionPageId: string;
pagePropertiesText: string;
}[] = await NotionConnectorPageCacheEntry.findAll({
attributes: ["notionPageId", "pagePropertiesText"],
raw: true,
where: {
parentId: databaseId,
connectorId,
workflowId: topLevelWorkflowId,
},
limit: chunkSize,
offset: i,
});

currentSizeInBytes += pageCacheEntries.reduce(
(acc, p) => acc + p.pagePropertiesText.length,
0
);

if (currentSizeInBytes > DATABASE_TO_CSV_MAX_SIZE) {
localLogger.info(
"Database size is too large to upsert, skipping. Action: maybe add a skipReason to avoid even trying."
);
return;
}

pagesProperties = pagesProperties.concat(
pageCacheEntries.map(
(p) => JSON.parse(p.pagePropertiesText) as PageObjectProperties
)
);

dustIdColumn = dustIdColumn.concat(
pageCacheEntries.map((p) => p.notionPageId)
);
}

const csv = await renderDatabaseFromPages({
databaseTitle: null,
pagesProperties,
dustIdColumn: pageCacheEntries.map((p) => `notion-${p.notionPageId}`),
dustIdColumn,
cellSeparator: ",",
rowBoundary: "",
});
Expand Down
262 changes: 122 additions & 140 deletions connectors/src/connectors/notion/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ const { saveSuccessSync, saveStartSync, shouldGarbageCollect } =
startToCloseTimeout: "1 minute",
});

// soft limit on the number of iterations of the loop that should be ran in a single workflow
// before "continuing as new" to avoid hitting the workflow log size limit
const MAX_ITERATIONS_PER_WORKFLOW = 32;

// Notion's "last edited" timestamp is precise to the minute
const SYNC_PERIOD_DURATION_MS = 60_000;

Expand Down Expand Up @@ -89,8 +85,6 @@ export async function notionSyncWorkflow({
forceResync: boolean;
garbageCollectionMode: NotionGarbageCollectionMode;
}) {
let iterations = 0;

const topLevelWorkflowId = workflowInfo().workflowId;

let lastSyncedPeriodTs: number | null = startFromTs
Expand Down Expand Up @@ -120,164 +114,152 @@ export async function notionSyncWorkflow({

const isInitialSync = !lastSyncedPeriodTs;

do {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks 🙏

if (!isGarbageCollectionRun) {
await saveStartSync(connectorId);
}
if (!isGarbageCollectionRun) {
await saveStartSync(connectorId);
}

// clear the connector cache before each sync
await clearWorkflowCache({ connectorId, topLevelWorkflowId });
// clear the connector cache before each sync
await clearWorkflowCache({ connectorId, topLevelWorkflowId });

const runTimestamp = Date.now();
const runTimestamp = Date.now();

let cursors: (string | null)[] = [null, null];
let pageIndex = 0;
const childWorkflowQueue = new PQueue({
concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS,
});
let cursors: (string | null)[] = [null, null];
let pageIndex = 0;
const childWorkflowQueue = new PQueue({
concurrency: MAX_CONCURRENT_CHILD_WORKFLOWS,
});

const promises: Promise<void>[] = [];
const promises: Promise<void>[] = [];

// we go through each result page of the notion search API
do {
// We only want to fetch pages that were updated since the last sync unless it's a garbage
// collection run or a force resync.
const skipUpToDatePages = !isGarbageCollectionRun && !forceResync;
// we go through each result page of the notion search API
do {
// We only want to fetch pages that were updated since the last sync unless it's a garbage
// collection run or a force resync.
const skipUpToDatePages = !isGarbageCollectionRun && !forceResync;

const { pageIds, databaseIds, nextCursor } =
await getPagesAndDatabasesToSync({
connectorId,
// If we're doing a garbage collection run, we want to fetch all pages otherwise, we only
// want to fetch pages that were updated since the last sync.
lastSyncedAt: !isGarbageCollectionRun ? lastSyncedPeriodTs : null,
// Only pass non-null cursors.
cursors: cursors.filter((c) => c !== null) as string[],
excludeUpToDatePages: skipUpToDatePages,
loggerArgs: {
pageIndex,
runType: isGarbageCollectionRun
? "garbageCollection"
: isInitialSync
? "initialSync"
: "incrementalSync",
},
});
const { pageIds, databaseIds, nextCursor } =
await getPagesAndDatabasesToSync({
connectorId,
// If we're doing a garbage collection run, we want to fetch all pages otherwise, we only
// want to fetch pages that were updated since the last sync.
lastSyncedAt: !isGarbageCollectionRun ? lastSyncedPeriodTs : null,
// Only pass non-null cursors.
cursors: cursors.filter((c) => c !== null) as string[],
excludeUpToDatePages: skipUpToDatePages,
loggerArgs: {
pageIndex,
runType: isGarbageCollectionRun
? "garbageCollection"
: isInitialSync
? "initialSync"
: "incrementalSync",
},
});

// Update the cursors array to keep only the last 2 cursors.
cursors = [cursors[1] ?? null, nextCursor];
// Update the cursors array to keep only the last 2 cursors.
cursors = [cursors[1] ?? null, nextCursor];

pageIndex += 1;
pageIndex += 1;

// this function triggers child workflows to process batches of pages and databases.
// the worflow that processes databases will itself trigger child workflows to process
// batches of child pages.
promises.push(
performUpserts({
// this function triggers child workflows to process batches of pages and databases.
// the worflow that processes databases will itself trigger child workflows to process
// batches of child pages.
promises.push(
performUpserts({
connectorId,
pageIds,
databaseIds,
isGarbageCollectionRun: isGarbageCollectionRun,
runTimestamp,
pageIndex,
isBatchSync: isInitialSync,
queue: childWorkflowQueue,
topLevelWorkflowId,
forceResync,
})
);
} while (cursors[1]);

// wait for all child workflows to finish
await Promise.all(promises);

if (isGarbageCollectionRun || isInitialSync) {
// These are resources (pages/DBs) that we didn't get from the search API but that are
// child/parent pages/DBs of other pages that we did get from the search API. We upsert those as
// well.
let discoveredResources: {
pageIds: string[];
databaseIds: string[];
} | null;
do {
discoveredResources = await getDiscoveredResourcesFromCache({
connectorId,
topLevelWorkflowId,
});
if (discoveredResources) {
await performUpserts({
connectorId,
pageIds,
databaseIds,
pageIds: discoveredResources.pageIds,
databaseIds: discoveredResources.databaseIds,
isGarbageCollectionRun: isGarbageCollectionRun,
runTimestamp,
pageIndex,
pageIndex: null,
isBatchSync: isInitialSync,
queue: childWorkflowQueue,
childWorkflowsNameSuffix: "discovered",
topLevelWorkflowId,
forceResync,
})
);
} while (cursors[1]);

// wait for all child workflows to finish
await Promise.all(promises);

if (isGarbageCollectionRun || isInitialSync) {
// These are resources (pages/DBs) that we didn't get from the search API but that are
// child/parent pages/DBs of other pages that we did get from the search API. We upsert those as
// well.
let discoveredResources: {
pageIds: string[];
databaseIds: string[];
} | null;
do {
discoveredResources = await getDiscoveredResourcesFromCache({
connectorId,
topLevelWorkflowId,
});
if (discoveredResources) {
await performUpserts({
connectorId,
pageIds: discoveredResources.pageIds,
databaseIds: discoveredResources.databaseIds,
isGarbageCollectionRun: isGarbageCollectionRun,
runTimestamp,
pageIndex: null,
isBatchSync: isInitialSync,
queue: childWorkflowQueue,
childWorkflowsNameSuffix: "discovered",
topLevelWorkflowId,
forceResync,
});
}
} while (discoveredResources && PROCESS_ALL_DISCOVERED_RESOURCES);
}
}
} while (discoveredResources && PROCESS_ALL_DISCOVERED_RESOURCES);
}

// Compute parents after all documents are added/updated.
// We only do this if it's not a garbage collection run, to prevent race conditions.
if (!isGarbageCollectionRun) {
await updateParentsFields(connectorId);
}
// Compute parents after all documents are added/updated.
// We only do this if it's not a garbage collection run, to prevent race conditions.
if (!isGarbageCollectionRun) {
await updateParentsFields(connectorId);
}

if (!isGarbageCollectionRun) {
await saveSuccessSync(connectorId);
lastSyncedPeriodTs = preProcessTimestampForNotion(runTimestamp);
} else {
// Look at pages and databases that were not visited in this run, check with the notion API if
// they were really deleted and delete them from the database if they were.
// Find the resources not seen in the GC run

// Create batches of resources to check, by chunk of 100
const nbOfBatches =
await createResourcesNotSeenInGarbageCollectionRunBatches({
connectorId,
batchSize: 100,
});
if (!isGarbageCollectionRun) {
await saveSuccessSync(connectorId);
lastSyncedPeriodTs = preProcessTimestampForNotion(runTimestamp);
} else {
// Look at pages and databases that were not visited in this run, check with the notion API if
// they were really deleted and delete them from the database if they were.
// Find the resources not seen in the GC run

// For each chunk, run a garbage collection activity
const queue = new PQueue({
concurrency: MAX_PENDING_GARBAGE_COLLECTION_ACTIVITIES,
// Create batches of resources to check, by chunk of 100
const nbOfBatches =
await createResourcesNotSeenInGarbageCollectionRunBatches({
connectorId,
batchSize: 100,
});
const promises: Promise<void>[] = [];
for (let batchIndex = 0; batchIndex < nbOfBatches; batchIndex++) {
promises.push(
queue.add(async () =>
garbageCollectBatch({
connectorId,
runTimestamp,
batchIndex,
startTs: new Date().getTime(),
})
)
);
}

await Promise.all(promises);

// Once done, clear all the redis keys used for garbage collection
await completeGarbageCollectionRun(connectorId, nbOfBatches);
// For each chunk, run a garbage collection activity
const queue = new PQueue({
concurrency: MAX_PENDING_GARBAGE_COLLECTION_ACTIVITIES,
});
const promises: Promise<void>[] = [];
for (let batchIndex = 0; batchIndex < nbOfBatches; batchIndex++) {
promises.push(
queue.add(async () =>
garbageCollectBatch({
connectorId,
runTimestamp,
batchIndex,
startTs: new Date().getTime(),
})
)
);
}

iterations += 1;
await sleep(INTERVAL_BETWEEN_SYNCS_MS);
} while (
// We run the loop for MAX_ITERATIONS_PER_WORKFLOW iterations to avoid hitting the workflow log
// size limit and "continue as new" to start a new workflow.
// If it's the initial sync, a force resync, or a garbage collection run, we only do one
// iteration.
!isInitialSync &&
!forceResync &&
!isGarbageCollectionRun &&
iterations < MAX_ITERATIONS_PER_WORKFLOW
);
await Promise.all(promises);

// Once done, clear all the redis keys used for garbage collection
await completeGarbageCollectionRun(connectorId, nbOfBatches);
}

await sleep(INTERVAL_BETWEEN_SYNCS_MS);

await continueAsNew<typeof notionSyncWorkflow>({
connectorId,
Expand Down
Loading