Skip to content

Commit

Permalink
Update fullsync with signals to avoid complete resync
Browse files Browse the repository at this point in the history
  • Loading branch information
tdraier committed Sep 18, 2024
1 parent a309b58 commit 79fde6f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 16 deletions.
10 changes: 7 additions & 3 deletions connectors/src/connectors/microsoft/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
typeAndPathFromInternalId,
} from "@connectors/connectors/microsoft/lib/utils";
import {
getRootNodesToSync,
getRootNodesToSyncFromResources,
populateDeltas,
} from "@connectors/connectors/microsoft/temporal/activities";
import {
Expand Down Expand Up @@ -363,9 +363,13 @@ export class MicrosoftConnectorManager extends BaseConnectorManager<null> {
internalId: id,
}));

await MicrosoftRootResource.batchMakeNew(newResourcesBlobs);
const addedResources =
await MicrosoftRootResource.batchMakeNew(newResourcesBlobs);

const nodesToSync = await getRootNodesToSync(this.connectorId);
const nodesToSync = await getRootNodesToSyncFromResources(
this.connectorId,
addedResources
);

// poupulates deltas for the nodes so that if incremental sync starts before
// fullsync populated, there's no error
Expand Down
24 changes: 18 additions & 6 deletions connectors/src/connectors/microsoft/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ import { concurrentExecutor } from "@connectors/lib/async_utils";
import { updateDocumentParentsField } from "@connectors/lib/data_sources";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import { MicrosoftRootResource } from "@connectors/resources/microsoft_resource";
import {
MicrosoftConfigurationResource,
MicrosoftNodeResource,
MicrosoftRootResource,
} from "@connectors/resources/microsoft_resource";
import type { DataSourceConfig } from "@connectors/types/data_source_config";

Expand All @@ -54,6 +54,16 @@ const DELETE_CONCURRENCY = 5;

export async function getRootNodesToSync(
connectorId: ModelId
): Promise<string[]> {
const rootResources =
await MicrosoftRootResource.listRootsByConnectorId(connectorId);

return getRootNodesToSyncFromResources(connectorId, rootResources);
}

export async function getRootNodesToSyncFromResources(
connectorId: ModelId,
rootResources: MicrosoftRootResource[]
): Promise<string[]> {
const connector = await ConnectorResource.fetchById(connectorId);

Expand All @@ -63,9 +73,6 @@ export async function getRootNodesToSync(

const client = await getClient(connector.connectionId);

const rootResources =
await MicrosoftRootResource.listRootsByConnectorId(connectorId);

// get root folders and drives and drill down site-root and sites to their
// child drives (converted to MicrosoftNode types)
const rootFolderAndDriveNodes = await Promise.all(
Expand Down Expand Up @@ -276,7 +283,8 @@ export async function markNodeAsSeen(connectorId: ModelId, internalId: string) {
);

if (!node) {
throw new Error(`Node ${internalId} not found`);
logger.warn(`Node ${internalId} not found`);
return;
}

// if node was updated more recently than this sync, we don't need to mark it
Expand Down Expand Up @@ -310,7 +318,11 @@ export async function syncFiles({
);

if (!parent) {
throw new Error(`Unexpected: parent node not found: ${parentInternalId}`);
logger.warn(`Unexpected: parent node not found: ${parentInternalId}`);
return {
count: 0,
childNodes: [],
};
}

if (parent.nodeType !== "folder" && parent.nodeType !== "drive") {
Expand Down
19 changes: 16 additions & 3 deletions connectors/src/connectors/microsoft/temporal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import { Err, Ok } from "@dust-tt/types";
import type { WorkflowHandle } from "@temporalio/client";
import { WorkflowNotFoundError } from "@temporalio/common";

import { getRootNodesToSync } from "@connectors/connectors/microsoft/temporal/activities";
import { QUEUE_NAME } from "@connectors/connectors/microsoft/temporal/config";
import type { FolderUpdatesSignal } from "@connectors/connectors/microsoft/temporal/signal";
import { folderUpdatesSignal } from "@connectors/connectors/microsoft/temporal/signal";
import { microsoftGarbageCollectionWorkflow } from "@connectors/connectors/microsoft/temporal/workflows";
import {
fullSyncWorkflow,
Expand All @@ -27,16 +30,24 @@ export async function launchMicrosoftFullSyncWorkflow(
if (!connector) {
return new Err(new Error(`Connector ${connectorId} not found`));
}

const client = await getTemporalClient();

const dataSourceConfig = dataSourceConfigFromConnector(connector);

const workflowId = microsoftFullSyncWorkflowId(connectorId);

if (!nodeIdsToSync) {
nodeIdsToSync = await getRootNodesToSync(connectorId);
}

const signalArgs: FolderUpdatesSignal[] =
nodeIdsToSync.map((sId) => ({
action: "added",
folderId: sId,
})) ?? [];

try {
await terminateWorkflow(workflowId);
await client.workflow.start(fullSyncWorkflow, {
await client.workflow.signalWithStart(fullSyncWorkflow, {
args: [{ connectorId, nodeIdsToSync }],
taskQueue: QUEUE_NAME,
workflowId: workflowId,
Expand All @@ -46,6 +57,8 @@ export async function launchMicrosoftFullSyncWorkflow(
memo: {
connectorId: connectorId,
},
signal: folderUpdatesSignal,
signalArgs: [signalArgs],
});
logger.info(
{
Expand Down
9 changes: 9 additions & 0 deletions connectors/src/connectors/microsoft/temporal/signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { defineSignal } from "@temporalio/workflow";

export interface FolderUpdatesSignal {
action: "added" | "removed";
folderId: string;
}

export const folderUpdatesSignal =
defineSignal<[FolderUpdatesSignal[]]>("folderUpdateSignal");
10 changes: 10 additions & 0 deletions connectors/src/connectors/microsoft/temporal/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import TsconfigPathsPlugin from "tsconfig-paths-webpack-plugin";

import { MicrosoftCastKnownErrorsInterceptor } from "@connectors/connectors/microsoft/temporal/cast_known_errors";
import * as sync_status from "@connectors/lib/sync_status";
Expand Down Expand Up @@ -32,6 +33,15 @@ export async function runMicrosoftWorker() {
() => new MicrosoftCastKnownErrorsInterceptor(),
],
},
bundlerOptions: {
// Update the webpack config to use aliases from our tsconfig.json.
webpackConfigHook: (config) => {
const plugins = config.resolve?.plugins ?? [];
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
config.resolve!.plugins = [...plugins, new TsconfigPathsPlugin({})];
return config;
},
},
});

await workerFullSync.run();
Expand Down
23 changes: 19 additions & 4 deletions connectors/src/connectors/microsoft/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import type { ModelId } from "@dust-tt/types";
import {
continueAsNew,
proxyActivities,
setHandler,
sleep,
workflowInfo,
} from "@temporalio/workflow";

import type { FolderUpdatesSignal } from "@connectors/connectors/google_drive/temporal/signals";
import { folderUpdatesSignal } from "@connectors/connectors/google_drive/temporal/signals";
import type * as activities from "@connectors/connectors/microsoft/temporal/activities";
import type * as sync_status from "@connectors/lib/sync_status";

Expand Down Expand Up @@ -37,7 +40,7 @@ const { reportInitialSyncProgress, syncSucceeded, syncStarted } =
export async function fullSyncWorkflow({
connectorId,
startSyncTs,
nodeIdsToSync,
nodeIdsToSync = [],
totalCount = 0,
}: {
connectorId: ModelId;
Expand All @@ -47,9 +50,21 @@ export async function fullSyncWorkflow({
}) {
await syncStarted(connectorId);

if (nodeIdsToSync === undefined) {
nodeIdsToSync = await getRootNodesToSync(connectorId);
}
setHandler(folderUpdatesSignal, (folderUpdates: FolderUpdatesSignal[]) => {
// If we get a signal, update the workflow state by adding/removing folder ids.
for (const { action, folderId } of folderUpdates) {
switch (action) {
case "added":
nodeIdsToSync.push(folderId);
break;
case "removed":
nodeIdsToSync.splice(nodeIdsToSync.indexOf(folderId), 1);
break;
default:
//
}
}
});

if (startSyncTs === undefined) {
startSyncTs = new Date().getTime();
Expand Down

0 comments on commit 79fde6f

Please sign in to comment.