Skip to content

Commit

Permalink
refactor(app): dont save stats to file / apply patches
Browse files Browse the repository at this point in the history
  • Loading branch information
Flofie committed Jan 25, 2024
1 parent aaf88aa commit 2626fb7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 101 deletions.
11 changes: 1 addition & 10 deletions app/src/app/api/download/[slug]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,16 @@ const minioClient = new Minio.Client({
secretKey: env.get("MINIO_SECRET_KEY").required().asString(),
});

/*
TODO: refactor this
This method is a huge security risk.
It allows anyone to download any file from any bucket.
*/
export async function GET(
req: NextRequest,
{ params }: { params: { slug: string } }
) {
const { slug: importerId } = params;
const fileReference = req.nextUrl.searchParams.get("fileReference");
const fileReference = "target.json";

if (!importerId) {
return new Response("importerId missing", { status: 500 });
}
if (!fileReference) {
return new Response("file missing", { status: 500 });
}

const bucket = importerId;
const bucketExists = await minioClient.bucketExists(bucket);
Expand All @@ -42,7 +34,6 @@ export async function GET(
}

async function getFile(bucket: string, fileReference: string): Promise<Buffer> {
// TODO: this throws S3Error - not found
try {
const fileStats = await minioClient.statObject(bucket, fileReference);
if (!fileStats) {
Expand Down
93 changes: 39 additions & 54 deletions app/temporal/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import csv from "csv";
import { chunk } from "lodash";
import XLSX from "xlsx";
import { ColumnConfig } from "./domain/ColumnConfig";
import { DataAnalyzer, DataMappingRecommendation } from "./domain/DataAnalyzer";
import {
DataAnalyzer,
DataMappingRecommendation,
Stats,
} from "./domain/DataAnalyzer";
import { DataSetPatch } from "./domain/DataSet";
import { ValidatorType } from "./domain/validators";
import { FileStore } from "./infrastructure/FileStore";
Expand Down Expand Up @@ -148,25 +152,21 @@ export function makeActivities(
processDataValidations: async (params: {
bucket: string;
fileReference: string;
statsFileReference: string;
validatorColumns: ValidatorColumns;
outputFileReference: string;
stats: Stats;
patches: DataSetPatch[];
}): Promise<{ errorFileReference: string; errorCount: number }> => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData = JSON.parse(fileData.toString());

const statsFileData = await fileStore.getFile(
params.bucket,
params.statsFileReference
);
const statsData = JSON.parse(statsFileData.toString());
const patchedData = applyPatches(jsonData, params.patches);
const errorData = dataAnalyzer.processDataValidations(
jsonData,
patchedData,
params.validatorColumns,
statsData
params.stats
);
await fileStore.putFile(
params.bucket,
Expand All @@ -178,77 +178,43 @@ export function makeActivities(
errorCount: errorData.length,
};
},
generateStatsFile: async (params: {
generateStats: async (params: {
bucket: string;
fileReference: string;
outputFileReference: string;
uniqueColumns: string[];
}): Promise<void> => {
patches: DataSetPatch[];
}): Promise<Stats> => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData: Record<string, unknown>[] = JSON.parse(
fileData.toString()
);
const stats = dataAnalyzer.getStats(jsonData, params.uniqueColumns);
const statsData = Buffer.from(JSON.stringify(stats));
await fileStore.putFile(
params.bucket,
params.outputFileReference,
statsData
);
const patchedData = applyPatches(jsonData, params.patches);
return dataAnalyzer.getStats(patchedData, params.uniqueColumns);
},
mergeChunks: async (params: {
bucket: string;
fileReferences: string[];
outputFileReference: string;
patches?: DataSetPatch[];
}) => {
let allJsonData: Record<string, unknown>[] = [];
for (const fileReference of params.fileReferences) {
const fileData = await fileStore.getFile(params.bucket, fileReference);
allJsonData.push(...JSON.parse(fileData.toString()));
}
const patchedData = applyPatches(allJsonData, params.patches ?? []);
await fileStore.putFile(
params.bucket,
params.outputFileReference,
Buffer.from(JSON.stringify(allJsonData))
);
},
applyPatches: async (params: {
bucket: string;
fileReference: string;
outputFileReference: string;
patches: DataSetPatch[];
}) => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const allJsonData: Record<string, unknown>[] = JSON.parse(
fileData.toString()
);

for (const patch of params.patches) {
const indexToUpdate = allJsonData.findIndex(
(item) => item.__rowId === patch.row
);
allJsonData[indexToUpdate][patch.col] = patch.newValue;
}

await fileStore.putFile(
params.bucket,
params.outputFileReference,
Buffer.from(JSON.stringify(allJsonData))
Buffer.from(JSON.stringify(patchedData))
);
},
export: async (params: {
bucket: string;
fileReference: string;
callbackUrl: string;
}) => {
invokeCallback: async (params: { bucket: string; callbackUrl: string }) => {
const host = process.env.API_URL ?? "http://localhost:3000";
const downloadUrl = `${host}/api/download/${params.bucket}/?fileReference=${params.fileReference}`;
const downloadUrl = `${host}/api/download/${params.bucket}`;
console.log("downloadUrl", downloadUrl);
fetch(params.callbackUrl, {
method: "POST",
Expand All @@ -257,3 +223,22 @@ export function makeActivities(
},
};
}

function applyPatches(
data: Record<string, unknown>[],
patches: DataSetPatch[]
): Record<string, unknown>[] {
const newData = data.slice();
for (const patch of patches) {
const indexToUpdate = newData.findIndex(
(item) => item.__rowId === patch.row
);
if (indexToUpdate !== -1) {
newData[indexToUpdate] = {
...newData[indexToUpdate],
[patch.col]: patch.newValue,
};
}
}
return newData;
}
2 changes: 1 addition & 1 deletion app/temporal/src/domain/DataSet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface DataSetPatch {
row: number;
col: number;
col: string;
newValue: string | number | null;
}
47 changes: 11 additions & 36 deletions app/temporal/src/workflows/importer.workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,11 @@ export async function importer(params: ImporterWorkflowParams) {
);
}

// we dont have any more errors and the patched file includes all patches
exportFileReference = "patched.json";
// we dont have any more errors and target.json has all patches applied
exportFileReference = "target.json";

// we dont need to await the callbackUrl call
void acts.export({
void acts.invokeCallback({
bucket: sourceFile!.bucket,
fileReference: exportFileReference,
callbackUrl: params.callbackUrl,
});
} catch (err) {
Expand Down Expand Up @@ -247,51 +245,27 @@ export async function importer(params: ImporterWorkflowParams) {
}
}
}
// TODO: we are applying patches twice here, once for the whole file and once for each chunk
const patchedFileReference = "patched.json";
if (patches.length > 0) {
await acts.applyPatches({
bucket: sourceFile!.bucket,
fileReference: sourceFileReference,
outputFileReference: patchedFileReference,
patches,
});
}

const statsFileReference = "stats.json";
const startStats = Date.now();
await acts.generateStatsFile({
const stats = await acts.generateStats({
bucket: sourceFile!.bucket,
fileReference:
patches.length > 0 ? patchedFileReference : sourceFileReference,
outputFileReference: statsFileReference,
fileReference: sourceFileReference,
uniqueColumns: validatorColumns.unique.map((item) => item.column),
patches,
});
console.log(`generate stats file took ${Date.now() - startStats}ms`);

const startAllValidations = Date.now();
const limit = pLimit(100);
const parallelValidations = chunkedFileReferences.map((fileReference) =>
limit(async () => {
limit(() => {
const referenceId = fileReference.split("-")[1].split(".")[0];
// TODO: we are applying patches twice here, once for the whole file and once for each chunk
const patchedFileReference = `patched-${referenceId}.json`;
if (patches.length > 0) {
await acts.applyPatches({
bucket: sourceFile!.bucket,
fileReference: sourceFileReference,
outputFileReference: patchedFileReference,
patches,
});
}
const errorFileReference = `errors-${referenceId}.json`;
return acts.processDataValidations({
bucket: sourceFile!.bucket,
fileReference:
patches.length > 0 ? patchedFileReference : fileReference,
statsFileReference,
fileReference,
validatorColumns,
outputFileReference: errorFileReference,
stats,
patches,
});
})
);
Expand All @@ -306,6 +280,7 @@ export async function importer(params: ImporterWorkflowParams) {
bucket: sourceFile!.bucket,
fileReferences: chunkedFileReferences,
outputFileReference: "target.json",
patches,
});
console.log(`all validations took ${Date.now() - startAllValidations}ms`);
isValidating = false;
Expand Down

0 comments on commit 2626fb7

Please sign in to comment.