Skip to content

Commit

Permalink
Merge pull request #19 from Jank1310/feat/18-call-callbackurl-with-re…
Browse files Browse the repository at this point in the history
…sult

#18 feat(app): call callbackurl with result
  • Loading branch information
Flofie committed Jan 25, 2024
2 parents dfb2cd0 + 4a44dec commit 3959d6c
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 182 deletions.
3 changes: 2 additions & 1 deletion app/.env.local.template
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
MINIO_HOST=localhost
MINIO_ACCESS_KEY=
MINIO_SECRET_KEY=
MINIO_SECRET_KEY=
API_URL=http://localhost:3000
164 changes: 50 additions & 114 deletions app/package-lock.json

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions app/src/app/api/download/[slug]/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { NextRequest } from "next/server";

import minioClient from "../../../../lib/minioClient";

export async function GET(
req: NextRequest,
{ params }: { params: { slug: string } }
) {
const { slug: importerId } = params;
const fileReference = "export.json";

if (!importerId) {
return new Response("importerId missing", { status: 500 });
}

const bucket = importerId;
const bucketExists = await minioClient.bucketExists(bucket);

if (bucketExists === false) {
return new Response("bucket does not exist", { status: 500 });
}

const fileBuffer = await getFile(bucket, fileReference);

return new Response(fileBuffer, { status: 200 });
}

async function getFile(bucket: string, fileReference: string): Promise<Buffer> {
try {
const fileStats = await minioClient.statObject(bucket, fileReference);
if (!fileStats) {
throw new Error();
}
} catch (error) {
throw new Error(`could not find file ${fileReference} in bucket ${bucket}`);
}
const stream = await minioClient.getObject(bucket, fileReference);
const data = await new Promise<Buffer>((resolve, reject) => {
let chunks: Buffer[] = [];
stream.on("error", reject);
stream.on("end", () => resolve(Buffer.concat(chunks)));
stream.on("data", (chunk) => chunks.push(chunk));
});
return data;
}
10 changes: 1 addition & 9 deletions app/src/app/api/upload/route.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
import { getTemporalWorkflowClient } from "@/lib/temporalClient";
import { randomUUID } from "crypto";
import * as env from "env-var";
import * as Minio from "minio";
import { NextRequest } from "next/server";
import { extname } from "path";

const minioClient = new Minio.Client({
endPoint: env.get("MINIO_HOST").required().asString(),
port: 9000,
useSSL: false,
accessKey: env.get("MINIO_ACCESS_KEY").required().asString(),
secretKey: env.get("MINIO_SECRET_KEY").required().asString(),
});
import minioClient from "../../../lib/minioClient";

export async function POST(req: NextRequest) {
const formData = await req.formData();
Expand Down
100 changes: 67 additions & 33 deletions app/temporal/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import {
ColumnValidators,
DataAnalyzer,
DataMappingRecommendation,
SourceFileStatsPerColumn,
} from "./domain/DataAnalyzer";
import { DataSet, DataSetPatch, DataSetRow } from "./domain/DataSet";
import { FileStore } from "./infrastructure/FileStore";
import { Mapping } from "./workflows/importer.workflow";
export interface DownloadSourceFileParams {
Expand Down Expand Up @@ -79,7 +81,7 @@ export function makeActivities(
`Unsupported format ${params.format}`
);
}
const jsonWithRowIds = json.map((row, index) => ({
const jsonWithRowIds: DataSet = json.map((row, index) => ({
__rowId: index,
...row,
}));
Expand All @@ -100,12 +102,9 @@ export function makeActivities(
params.bucket,
params.fileReference
);
const sourceJsonData: Record<string, unknown>[] = JSON.parse(
fileData.toString()
);
const sourceJsonData: DataSet = JSON.parse(fileData.toString());
const mappedData = sourceJsonData.map((row) => {
const newRow: Record<string, unknown> = {};
newRow.__rowId = row.__rowId;
const newRow: DataSetRow = { __rowId: row.__rowId };
const mappingsWithTargetColumn = params.dataMapping.filter(
(mapping) => mapping.targetColumn
);
Expand Down Expand Up @@ -148,65 +147,58 @@ export function makeActivities(
processDataValidations: async (params: {
bucket: string;
fileReference: string;
statsFileReference: string;
validatorColumns: ColumnValidators;
}) => {
outputFileReference: string;
stats: SourceFileStatsPerColumn;
patches: DataSetPatch[];
}): Promise<{ errorFileReference: string; errorCount: number }> => {
console.time("validations");
const referenceId = params.fileReference.split("-")[1].split(".")[0];
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData = JSON.parse(fileData.toString());

const statsFileData = await fileStore.getFile(
params.bucket,
params.statsFileReference
);
const statsData = JSON.parse(statsFileData.toString());
const patchedData = applyPatches(jsonData, params.patches);
const errorData = dataAnalyzer.processDataValidations(
jsonData,
patchedData,
params.validatorColumns,
statsData
params.stats
);
const errorFileReference = `errors-${referenceId}.json`;
await fileStore.putFile(
params.bucket,
errorFileReference,
params.outputFileReference,
Buffer.from(JSON.stringify(errorData))
);
console.timeEnd("validations");
return errorFileReference;
return {
errorFileReference: params.outputFileReference,
errorCount: errorData.length,
};
},
generateStatsFile: async (params: {
generateStats: async (params: {
bucket: string;
fileReference: string;
outputFileReference: string;
uniqueColumns: string[];
}): Promise<void> => {
patches: DataSetPatch[];
}): Promise<SourceFileStatsPerColumn> => {
console.time("generate-stats");
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const jsonData: Record<string, unknown>[] = JSON.parse(
fileData.toString()
);
const stats = dataAnalyzer.getStats(jsonData, params.uniqueColumns);
const statsData = Buffer.from(JSON.stringify(stats));
await fileStore.putFile(
params.bucket,
params.outputFileReference,
statsData
);
const jsonData: DataSet = JSON.parse(fileData.toString());
const patchedData = applyPatches(jsonData, params.patches);
const stats = dataAnalyzer.getStats(patchedData, params.uniqueColumns);
console.timeEnd("generate-stats");
return stats;
},
mergeChunks: async (params: {
bucket: string;
fileReferences: string[];
outputFileReference: string;
}) => {
let allJsonData: Record<string, unknown>[] = [];
let allJsonData: DataSet = [];
for (const fileReference of params.fileReferences) {
const fileData = await fileStore.getFile(params.bucket, fileReference);
allJsonData.push(...JSON.parse(fileData.toString()));
Expand All @@ -217,5 +209,47 @@ export function makeActivities(
Buffer.from(JSON.stringify(allJsonData))
);
},
export: async (params: {
bucket: string;
fileReference: string;
patches: DataSetPatch[];
callbackUrl: string;
exportFileReference: string;
}): Promise<string> => {
const fileData = await fileStore.getFile(
params.bucket,
params.fileReference
);
const allJsonData: DataSet = JSON.parse(fileData.toString());
const patchedData = applyPatches(allJsonData, params.patches ?? []);
await fileStore.putFile(
params.bucket,
params.exportFileReference,
Buffer.from(JSON.stringify(patchedData))
);
const host = process.env.API_URL ?? "http://localhost:3000";
const downloadUrl = `${host}/api/download/${params.bucket}`;
fetch(params.callbackUrl, {
method: "POST",
body: downloadUrl,
});
return params.exportFileReference;
},
};
}

function applyPatches(data: DataSet, patches: DataSetPatch[]): DataSet {
const newData = data.slice();
for (const patch of patches) {
const indexToUpdate = newData.findIndex(
(item) => item.__rowId === patch.rowId
);
if (indexToUpdate !== -1) {
newData[indexToUpdate] = {
...newData[indexToUpdate],
[patch.column]: patch.newValue,
};
}
}
return newData;
}
6 changes: 5 additions & 1 deletion app/temporal/src/domain/DataAnalyzer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DataAnalyzer,
SourceFileStatsPerColumn,
} from "./DataAnalyzer";
import { DataSetRow } from "./DataSet";

describe("DataAnalyzer", () => {
const analyzer = new DataAnalyzer();
Expand Down Expand Up @@ -101,7 +102,7 @@ describe("DataAnalyzer", () => {

describe("validation", () => {
it("should validate required columns", () => {
const rowsWithMissingName: Record<string, string | number | null>[] = [
const rowsWithMissingName: DataSetRow[] = [
{
__rowId: 0,
name: "John",
Expand Down Expand Up @@ -570,14 +571,17 @@ describe("DataAnalyzer", () => {
it("should return stats", () => {
const jsonData = [
{
__rowId: 0,
name: "Florian",
id: 1,
},
{
__rowId: 1,
name: "Florian",
id: 2,
},
{
__rowId: 2,
name: "Egon",
id: 1,
},
Expand Down
5 changes: 3 additions & 2 deletions app/temporal/src/domain/DataAnalyzer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Fuse from "fuse.js";
import { ColumnConfig } from "./ColumnConfig";
import { ColumnValidation } from "./ColumnValidation";
import { DataSet } from "./DataSet";
import { ValidationError } from "./ValidationError";
import { ValidatorType, validators } from "./validators";

Expand Down Expand Up @@ -66,7 +67,7 @@ export class DataAnalyzer {
}

public processDataValidations(
data: Record<string, string | number | null>[],
data: DataSet,
validatorColumns: ColumnValidators,
stats: SourceFileStatsPerColumn
): { rowId: number; column: string; errors: ValidationError[] }[] {
Expand Down Expand Up @@ -107,7 +108,7 @@ export class DataAnalyzer {
}

public getStats(
data: Record<string, unknown>[],
data: DataSet,
columnsToVerify: string[]
): SourceFileStatsPerColumn {
// nonunique
Expand Down
9 changes: 8 additions & 1 deletion app/temporal/src/domain/DataSet.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
export type FieldValues = string | number | null;

export type DataSetRow = {
__rowId: number;
} & Record<string, FieldValues>;

export type DataSet = DataSetRow[];
export interface DataSetPatch {
rowId: number;
/**
* target column
*/
column: string;
newValue: string | number | null;
newValue: FieldValues;
}
3 changes: 2 additions & 1 deletion app/temporal/src/domain/validators/EmailValidator.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Validator } from ".";
import { ColumnValidation } from "../ColumnValidation";
import { DataSetRow } from "../DataSet";
import { ValidationError } from "../ValidationError";

const EMAIL_REGEX =
/^[-!#$%&'*+\/0-9=?A-Z^_a-z`{|}~](\.?[-!#$%&'*+\/0-9=?A-Z^_a-z`{|}~])*@[a-zA-Z0-9](-*\.?[a-zA-Z0-9])*\.[a-zA-Z](-?[a-zA-Z0-9])+$/;

export class EmailValidator implements Validator {
validate(
row: Record<string, unknown>,
row: DataSetRow,
columnConfig: { column: string; config: ColumnValidation }[]
): Record<string, ValidationError> {
const errors: Record<string, ValidationError> = {};
Expand Down
3 changes: 2 additions & 1 deletion app/temporal/src/domain/validators/EnumValidator.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Validator } from ".";
import { EnumerationColumnValidation } from "../ColumnValidation";
import { DataSetRow } from "../DataSet";
import { ValidationError } from "../ValidationError";

export class EnumValidator implements Validator {
public validate(
row: Record<string, string | number | null>,
row: DataSetRow,
columnConfigs: { column: string; config: EnumerationColumnValidation }[]
): Record<string, ValidationError> {
const errors: Record<string, ValidationError> = {};
Expand Down
3 changes: 2 additions & 1 deletion app/temporal/src/domain/validators/PhoneValidator.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { isPossiblePhoneNumber } from "libphonenumber-js";
import { DataSetRow } from "../DataSet";
import { ValidationError } from "../ValidationError";

export class PhoneValidator {
validate(
row: Record<string, unknown>,
row: DataSetRow,
columnConfig: { column: string; regex?: string }[]
): Record<string, ValidationError> {
const errors: Record<string, ValidationError> = {};
Expand Down
3 changes: 2 additions & 1 deletion app/temporal/src/domain/validators/RegexValidator.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Validator } from ".";
import { RegexColumnValidation } from "../ColumnValidation";
import { DataSetRow } from "../DataSet";
import { ValidationError } from "../ValidationError";

export class RegexValidator implements Validator {
validate(
row: Record<string, string | number | null>,
row: DataSetRow,
columnConfig: { column: string; config: RegexColumnValidation }[]
): Record<string, ValidationError> {
const errors: Record<string, ValidationError> = {};
Expand Down
3 changes: 2 additions & 1 deletion app/temporal/src/domain/validators/RequiredValidator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { DataSetRow } from "../DataSet";
import { ValidationError } from "../ValidationError";

export class RequiredValidator {
validate(
row: Record<string, unknown>,
row: DataSetRow,
columnConfig: { column: string; regex?: string }[]
): Record<string, ValidationError> {
const errors: Record<string, ValidationError> = {};
Expand Down
3 changes: 2 additions & 1 deletion app/temporal/src/domain/validators/UniqueValidator.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Validator } from ".";
import { ColumnValidation } from "../ColumnValidation";
import { SourceFileStatsPerColumn } from "../DataAnalyzer";
import { DataSetRow } from "../DataSet";
import { ValidationError } from "../ValidationError";

export class UniqueValidator implements Validator {
constructor() {}

validate(
row: Record<string, unknown>,
row: DataSetRow,
columnConfig: { column: string; config: ColumnValidation }[],
stats: SourceFileStatsPerColumn = {}
) {
Expand Down
Loading

0 comments on commit 3959d6c

Please sign in to comment.