Skip to content

Commit

Permalink
Merge branch 'main' into 16-show-data-in-frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Kühnlein committed Jan 24, 2024
2 parents 43e4707 + 04c0125 commit 58de044
Show file tree
Hide file tree
Showing 16 changed files with 1,183 additions and 623 deletions.
928 changes: 321 additions & 607 deletions app/package-lock.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
"dotenv": "^16.3.2",
"env-var": "^7.4.1",
"fuse.js": "^6.6.2",
"libphonenumber-js": "^1.10.54",
"immer": "^10.0.3",
"lodash": "^4.17.21",
"lucide-react": "^0.312.0",
"minio": "^7.1.3",
"next": "14.1.0",
"p-limit": "^3.1.0",
"react": "^18",
"react-dom": "^18",
"swr": "^2.2.4",
"tailwind-merge": "^2.2.0",
"tailwindcss-animate": "^1.0.7",
"xlsx": "^0.18.5"
Expand Down
131 changes: 122 additions & 9 deletions app/temporal/src/activities.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { ApplicationFailure } from "@temporalio/workflow";
import csv from "csv";
import { chunk } from "lodash";
import XLSX from "xlsx";
import { ColumnConfig } from "./domain/ColumnConfig";
import { DataAnalyzer, DataMappingRecommendation } from "./domain/DataAnalyzer";
import { ValidatorType } from "./domain/validators";
import { FileStore } from "./infrastructure/FileStore";

import { Mapping } from "./workflows/importer.workflow";
export interface DownloadSourceFileParams {
filename: string;
importerId: string;
Expand All @@ -15,6 +17,11 @@ export interface DownloadSourceFileReturnType {
localFilePath: string;
}

export type ValidatorColumns = Record<
ValidatorType,
{ column: string; regex?: string | undefined }[]
>;

export function makeActivities(
fileStore: FileStore,
dataAnalyzer: DataAnalyzer
Expand All @@ -29,15 +36,15 @@ export function makeActivities(
format: string;
formatOptions: { delimiter?: string };
outputFileReference: string;
}) => {
}): Promise<void> => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
let jsonData: Buffer;
let json: Record<string, unknown>[];
switch (params.format) {
case "csv":
const rows = await new Promise<Record<string, string>[]>(
json = await new Promise<Record<string, unknown>[]>(
(resolve, reject) => {
csv.parse(
fileData,
Expand All @@ -56,31 +63,70 @@ export function makeActivities(
);
}
);
jsonData = Buffer.from(JSON.stringify(rows));
console.log("received rows", json.length);

break;
case "xlsx":
const workbook = XLSX.read(fileData, { type: "buffer" });
const sheetName = workbook.SheetNames[0];
const worksheet = workbook.Sheets[sheetName];
const json = XLSX.utils.sheet_to_json(worksheet, {
json = XLSX.utils.sheet_to_json<Record<string, unknown>>(worksheet, {
//! this is needed to get the header columns on all rows
raw: true,
defval: "",
});
console.log(json);
jsonData = Buffer.from(JSON.stringify(json));
console.log("received rows", json.length);

break;
default:
throw ApplicationFailure.nonRetryable(
`Unsupported format ${params.format}`
);
}
const jsonWithRowIds = json.map((row, index) => ({
__rowId: index,
...row,
}));

const jsonData = Buffer.from(JSON.stringify(jsonWithRowIds));
await fileStore.putFile(
params.bucket,
params.outputFileReference,
jsonData
);
},
applyMappings: async (params: {
bucket: string;
fileReference: string;
dataMapping: Mapping[];
}): Promise<string[]> => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData: Record<string, unknown>[] = JSON.parse(
fileData.toString()
);
const mappedData = jsonData.map((row) => {
const newRow: Record<string, unknown> = {};
newRow.__rowId = row.__rowId;
for (const mapping of params.dataMapping.filter(
(mapping) => mapping.targetColumn
)) {
newRow[mapping.targetColumn as string] = row[mapping.sourceColumn!];
}
return newRow;
});

return await Promise.all(
chunk(mappedData, 5000).map(async (json, index) => {
const jsonData = Buffer.from(JSON.stringify(json));
const chunktFileReference = `mapped-${index}.json`;
await fileStore.putFile(params.bucket, chunktFileReference, jsonData);
return chunktFileReference;
})
);
},
getMappingRecommendations: async (params: {
bucket: string;
fileReference: string;
Expand All @@ -91,13 +137,80 @@ export function makeActivities(
params.fileReference
);
const jsonData = JSON.parse(fileData.toString());
// only the first 10 rows are used to detect the columns
// all rows should have all available headers (see source file processing)
const sourceColumns = Object.keys(jsonData[0]);
return dataAnalyzer.generateMappingRecommendations(
sourceColumns,
params.columnConfig
);
},
processDataValidations: async (params: {
bucket: string;
fileReference: string;
statsFileReference: string;
validatorColumns: ValidatorColumns;
}) => {
const referenceId = params.fileReference.split("-")[1].split(".")[0];
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData = JSON.parse(fileData.toString());

const statsFileData = await fileStore.getFile(
params.bucket,
params.statsFileReference
);
const statsData = JSON.parse(statsFileData.toString());
const errorData = dataAnalyzer.processDataValidations(
jsonData,
params.validatorColumns,
statsData
);
const errorFileReference = `errors-${referenceId}.json`;
await fileStore.putFile(
params.bucket,
errorFileReference,
Buffer.from(JSON.stringify(errorData))
);
return errorFileReference;
},
generateStatsFile: async (params: {
bucket: string;
fileReference: string;
outputFileReference: string;
uniqueColumns: string[];
}): Promise<void> => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData: Record<string, unknown>[] = JSON.parse(
fileData.toString()
);
const stats = dataAnalyzer.getStats(jsonData, params.uniqueColumns);
const statsData = Buffer.from(JSON.stringify(stats));
await fileStore.putFile(
params.bucket,
params.outputFileReference,
statsData
);
},
mergeChunks: async (params: {
bucket: string;
fileReferences: string[];
outputFileReference: string;
}) => {
let allJsonData: Record<string, unknown>[] = [];
for (const fileReference of params.fileReferences) {
const fileData = await fileStore.getFile(params.bucket, fileReference);
allJsonData.push(...JSON.parse(fileData.toString()));
}
await fileStore.putFile(
params.bucket,
params.outputFileReference,
Buffer.from(JSON.stringify(allJsonData))
);
},
};
}
3 changes: 3 additions & 0 deletions app/temporal/src/domain/ColumnConfig.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { ColumnValidation } from "./ColumnValidation";

export interface ColumnConfig {
key: string;
label: string;
Expand All @@ -6,4 +8,5 @@ export interface ColumnConfig {
*/
keyAlternatives?: string[];
type: "text" | "number" | "date";
validations?: ColumnValidation[];
}
6 changes: 6 additions & 0 deletions app/temporal/src/domain/ColumnValidation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { ValidatorType } from "./validators";

export interface ColumnValidation {
type: ValidatorType;
regex?: string;
}
Loading

0 comments on commit 58de044

Please sign in to comment.