diff --git a/app/src/app/[locale]/importer/[id]/layout.tsx b/app/src/app/[locale]/importer/[id]/layout.tsx
index 960802f..8339b27 100644
--- a/app/src/app/[locale]/importer/[id]/layout.tsx
+++ b/app/src/app/[locale]/importer/[id]/layout.tsx
@@ -1,8 +1,7 @@
-import { ImporterDto } from "@/app/api/importer/[slug]/ImporterDto";
import TranslationsProvider from "@/components/TranslationsProvider";
import { Toaster } from "@/components/ui/toaster";
-import { fetchWithAuth } from "@/lib/frontendFetch";
-import { getHost, hexToCssHsl } from "@/lib/utils";
+import { getImporterManager } from "@/lib/ImporterManager";
+import { hexToCssHsl } from "@/lib/utils";
import SidebarMenu from "./SidebarMenu";
type PageProps = {
@@ -14,12 +13,8 @@ type PageProps = {
};
export default async function ImporterPage({ params, children }: PageProps) {
- const importerDto = (await fetchWithAuth(
- `${getHost()}/api/importer/${params.id}`,
- {
- cache: "no-cache",
- }
- ).then((res) => res.json())) as ImporterDto;
+ const importerManager = await getImporterManager();
+ const importerDto = await importerManager.getImporterDto(params.id);
const { primaryColor, primaryForegroundColor } =
importerDto.config.design ?? {};
return (
diff --git a/app/src/app/[locale]/importer/[id]/mapping/SelectMappings.tsx b/app/src/app/[locale]/importer/[id]/mapping/SelectMappings.tsx
index a5baed5..de374d5 100644
--- a/app/src/app/[locale]/importer/[id]/mapping/SelectMappings.tsx
+++ b/app/src/app/[locale]/importer/[id]/mapping/SelectMappings.tsx
@@ -26,7 +26,7 @@ import { produce } from "immer";
import { ChevronRightCircleIcon } from "lucide-react";
import { useRouter } from "next/navigation";
-import { fetchWithAuth } from "@/lib/frontendFetch";
+import { useFrontendFetchWithAuth } from "@/lib/frontendFetch";
import { getHost } from "@/lib/utils";
import React from "react";
import { useTranslation } from "react-i18next";
@@ -48,6 +48,7 @@ const SelectMappings = ({
}: Props) => {
const { push } = useRouter();
const { t } = useTranslation();
+ const frontendFetch = useFrontendFetchWithAuth();
const [enablePolling, setEnablePolling] = React.useState(false);
const { importer } = useGetImporter(
initialImporterDto.importerId,
@@ -103,7 +104,7 @@ const SelectMappings = ({
}
setIsSavingMapping(true);
try {
- await fetchWithAuth(
+ await frontendFetch(
`${getHost()}/api/importer/${importer.importerId}/mappings`,
{
method: "PUT",
diff --git a/app/src/app/[locale]/importer/[id]/mapping/page.tsx b/app/src/app/[locale]/importer/[id]/mapping/page.tsx
index 20404b9..8525365 100644
--- a/app/src/app/[locale]/importer/[id]/mapping/page.tsx
+++ b/app/src/app/[locale]/importer/[id]/mapping/page.tsx
@@ -1,9 +1,4 @@
-import {
- DataMappingRecommendation,
- ImporterDto,
-} from "@/app/api/importer/[slug]/ImporterDto";
-import { fetchWithAuth } from "@/lib/frontendFetch";
-import { getHost } from "@/lib/utils";
+import { getImporterManager } from "@/lib/ImporterManager";
import { redirect } from "next/navigation";
import { getPageForState } from "../redirectUtil";
import SelectMappings from "./SelectMappings";
@@ -16,21 +11,10 @@ type Props = {
const MappingPage = async (props: Props) => {
const importerId = props.params.id;
- const initialImporterDtoPromise = fetchWithAuth(
- `${getHost()}/api/importer/${importerId}`,
- {
- cache: "no-cache",
- }
- ).then(async (res) => (await res.json()) as ImporterDto);
- const initialDataMappingsPromise = fetchWithAuth(
- `${getHost()}/api/importer/${importerId}/mappings/recommendations`,
- {
- cache: "no-cache",
- }
- ).then(
- async (res) =>
- (await res.json()).recommendations as DataMappingRecommendation[] | null
- );
+ const importerManager = await getImporterManager();
+ const initialImporterDtoPromise = importerManager.getImporterDto(importerId);
+ const initialDataMappingsPromise =
+ importerManager.getMappingRecommendations(importerId);
const [initialImporterDto, initialDataMappings] = await Promise.all([
initialImporterDtoPromise,
initialDataMappingsPromise,
diff --git a/app/src/app/[locale]/importer/[id]/select-file/SelectFileUploader.tsx b/app/src/app/[locale]/importer/[id]/select-file/SelectFileUploader.tsx
index 6e7658c..e03953f 100644
--- a/app/src/app/[locale]/importer/[id]/select-file/SelectFileUploader.tsx
+++ b/app/src/app/[locale]/importer/[id]/select-file/SelectFileUploader.tsx
@@ -3,7 +3,7 @@ import { ImporterDto } from "@/app/api/importer/[slug]/ImporterDto";
import { useGetImporter } from "@/components/hooks/useGetImporter";
import { LoadingSpinner } from "@/components/ui/loadingSpinner";
import { useToast } from "@/components/ui/use-toast";
-import { fetchWithAuth } from "@/lib/frontendFetch";
+import { useFrontendFetchWithAuth } from "@/lib/frontendFetch";
import { getHost } from "@/lib/utils";
import { useRouter } from "next/navigation";
import React from "react";
@@ -23,7 +23,8 @@ const allowedMimeTypes = [
const SelectFileUploader = ({ importerDto: initialImporterDto }: Props) => {
const { t } = useTranslation();
- const { push, replace } = useRouter();
+ const { push } = useRouter();
+ const frontendFetch = useFrontendFetchWithAuth();
const { toast } = useToast();
const [isUploading, setIsUploading] = React.useState(false);
const [hasUploaded, setHasUploaded] = React.useState(false);
@@ -40,7 +41,7 @@ const SelectFileUploader = ({ importerDto: initialImporterDto }: Props) => {
const formData = new FormData();
formData.append("file", file);
formData.append("importerId", importer.importerId);
- await fetchWithAuth(`${getHost()}/api/upload`, {
+ await frontendFetch(`${getHost()}/api/upload`, {
method: "POST",
body: formData,
});
diff --git a/app/src/app/[locale]/importer/[id]/select-file/page.tsx b/app/src/app/[locale]/importer/[id]/select-file/page.tsx
index 5f095be..d9ce745 100644
--- a/app/src/app/[locale]/importer/[id]/select-file/page.tsx
+++ b/app/src/app/[locale]/importer/[id]/select-file/page.tsx
@@ -1,6 +1,4 @@
-import { ImporterDto } from "@/app/api/importer/[slug]/ImporterDto";
-import { fetchWithAuth } from "@/lib/frontendFetch";
-import { getHost } from "@/lib/utils";
+import { getImporterManager } from "@/lib/ImporterManager";
import { redirect } from "next/navigation";
import { getPageForState } from "../redirectUtil";
import SelectFileUploader from "./SelectFileUploader";
@@ -13,19 +11,15 @@ type Props = {
const SelectFilePage = async (props: Props) => {
const importerId = props.params.id;
- const initialImporterDto = (await fetchWithAuth(
- `${getHost()}/api/importer/${importerId}`,
- {
- cache: "no-cache",
- }
- ).then((res) => res.json())) as ImporterDto;
- const pageForState = getPageForState(initialImporterDto);
+ const importerManager = await getImporterManager();
+ const importerDto = await importerManager.getImporterDto(importerId);
+ const pageForState = getPageForState(importerDto);
if (pageForState !== "select-file") {
return redirect(pageForState);
}
return (
-
+
);
};
diff --git a/app/src/app/[locale]/importer/[id]/validate/Validation.tsx b/app/src/app/[locale]/importer/[id]/validate/Validation.tsx
index 08c7e6f..0b3fdc4 100644
--- a/app/src/app/[locale]/importer/[id]/validate/Validation.tsx
+++ b/app/src/app/[locale]/importer/[id]/validate/Validation.tsx
@@ -7,7 +7,7 @@ import { useGetImporter } from "@/components/hooks/useGetImporter";
import { Button } from "@/components/ui/button";
import { LoadingSpinner } from "@/components/ui/loadingSpinner";
import { useToast } from "@/components/ui/use-toast";
-import { fetchWithAuth } from "@/lib/frontendFetch";
+import { useFrontendFetchWithAuth } from "@/lib/frontendFetch";
import { getHost } from "@/lib/utils";
import { enableMapSet, produce } from "immer";
import { sum } from "lodash";
@@ -30,6 +30,7 @@ const Validation = ({
const { t } = useTranslation();
const { push } = useRouter();
const { toast } = useToast();
+ const frontendFetch = useFrontendFetchWithAuth();
const [enablePolling, setEnablePolling] = React.useState(false);
const [isStartingImport, setIsStartingImport] = React.useState(false);
const [currentValidations, setCurrentValidations] = React.useState<
@@ -154,7 +155,7 @@ const Validation = ({
})
);
try {
- const res = await fetchWithAuth(
+ const res = await frontendFetch(
`${getHost()}/api/importer/${initialImporterDto.importerId}/records`,
{
method: "PATCH",
@@ -182,7 +183,12 @@ const Validation = ({
}
}
},
- [handleRecordUpdate, initialImporterDto.importerId, isMounted]
+ [
+ frontendFetch,
+ handleRecordUpdate,
+ initialImporterDto.importerId,
+ isMounted,
+ ]
);
const handleStartImport = React.useCallback(async () => {
@@ -191,7 +197,7 @@ const Validation = ({
}
setIsStartingImport(true);
try {
- await fetchWithAuth(
+ await frontendFetch(
`${getHost()}/api/importer/${
initialImporterDto.importerId
}/start-import`,
@@ -211,7 +217,15 @@ const Validation = ({
setIsStartingImport(false);
}
}
- }, [initialImporterDto.importerId, isMounted, push, t, toast, totalErrors]);
+ }, [
+ frontendFetch,
+ initialImporterDto.importerId,
+ isMounted,
+ push,
+ t,
+ toast,
+ totalErrors,
+ ]);
const hasErrors = dataStats.totalErrors > 0;
diff --git a/app/src/app/[locale]/importer/[id]/validate/page.tsx b/app/src/app/[locale]/importer/[id]/validate/page.tsx
index f4357ea..cecfc7e 100644
--- a/app/src/app/[locale]/importer/[id]/validate/page.tsx
+++ b/app/src/app/[locale]/importer/[id]/validate/page.tsx
@@ -1,20 +1,13 @@
-import { ImporterDto } from "@/app/api/importer/[slug]/ImporterDto";
-import { fetchRecords } from "@/components/hooks/useFetchRecords";
-import { fetchWithAuth } from "@/lib/frontendFetch";
-import { getHost } from "@/lib/utils";
+import { getImporterManager } from "@/lib/ImporterManager";
import { redirect } from "next/navigation";
import { getPageForState } from "../redirectUtil";
import Validation from "./Validation";
export default async function page(props: { params: { id: string } }) {
const importerId = props.params.id;
- const initialImporterDtoPromise = fetchWithAuth(
- `${getHost()}/api/importer/${importerId}`,
- {
- cache: "no-cache",
- }
- ).then((res) => res.json() as Promise
);
- const initialRecordsPromise = fetchRecords(importerId, 0, 100);
+ const importerManager = await getImporterManager();
+ const initialImporterDtoPromise = importerManager.getImporterDto(importerId);
+ const initialRecordsPromise = importerManager.getRecords(importerId, 0, 100);
const [initialImporterDto, initialRecords] = await Promise.all([
initialImporterDtoPromise,
initialRecordsPromise,
diff --git a/app/src/app/api/importer/[slug]/close/route.ts b/app/src/app/api/importer/[slug]/close/route.ts
index e90bb65..6752225 100644
--- a/app/src/app/api/importer/[slug]/close/route.ts
+++ b/app/src/app/api/importer/[slug]/close/route.ts
@@ -1,4 +1,4 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateServerAuth } from "@/lib/validateAuth";
import { NextRequest, NextResponse } from "next/server";
@@ -10,8 +10,7 @@ export async function POST(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
- await handle.signal("importer:close");
+ const importerManager = await getImporterManager();
+ await importerManager.closeImporter(importerId);
return NextResponse.json({});
}
diff --git a/app/src/app/api/importer/[slug]/mappings/recommendations/route.ts b/app/src/app/api/importer/[slug]/mappings/recommendations/route.ts
index ae37ce0..72781ab 100644
--- a/app/src/app/api/importer/[slug]/mappings/recommendations/route.ts
+++ b/app/src/app/api/importer/[slug]/mappings/recommendations/route.ts
@@ -1,7 +1,6 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateAuth } from "@/lib/validateAuth";
import { NextRequest, NextResponse } from "next/server";
-import { ImporterStatus } from "../../ImporterDto";
export async function GET(
req: NextRequest,
@@ -11,14 +10,17 @@ export async function GET(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
- const workflowState = await handle.query("importer:status");
- if (workflowState.state === "closed") {
- return NextResponse.json({ error: "Importer is closed" }, { status: 410 });
+ const importerManager = await getImporterManager();
+ try {
+ const recommendations = await importerManager.getMappingRecommendations(
+ importerId
+ );
+ return NextResponse.json({ recommendations });
+ } catch (err) {
+ if ((err as Error).message === "Importer is closed") {
+ return NextResponse.json("Importer is closed", { status: 400 });
+ } else {
+ throw err;
+ }
}
- const recommendations = await handle.query(
- "importer:data-mapping-recommendations"
- );
- return NextResponse.json({ recommendations });
}
diff --git a/app/src/app/api/importer/[slug]/mappings/route.ts b/app/src/app/api/importer/[slug]/mappings/route.ts
index 245125f..47aa70f 100644
--- a/app/src/app/api/importer/[slug]/mappings/route.ts
+++ b/app/src/app/api/importer/[slug]/mappings/route.ts
@@ -1,7 +1,6 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateAuth } from "@/lib/validateAuth";
import { NextRequest, NextResponse } from "next/server";
-import { ImporterStatus } from "../ImporterDto";
export async function PUT(
req: NextRequest,
@@ -11,16 +10,9 @@ export async function PUT(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
+ const importerManager = await getImporterManager();
const mappings = await req.json();
- const workflowState = await handle.query("importer:status");
- if (workflowState.state === "closed") {
- return NextResponse.json({ error: "Importer is closed" }, { status: 410 });
- }
- await handle.executeUpdate("importer:update-mapping", {
- args: [{ mappings }],
- });
+ await importerManager.updateMappings(importerId, mappings);
return new NextResponse(undefined, {
status: 201,
});
diff --git a/app/src/app/api/importer/[slug]/records/route.ts b/app/src/app/api/importer/[slug]/records/route.ts
index 627a063..3d84c51 100644
--- a/app/src/app/api/importer/[slug]/records/route.ts
+++ b/app/src/app/api/importer/[slug]/records/route.ts
@@ -1,8 +1,6 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateAuth } from "@/lib/validateAuth";
import { NextRequest, NextResponse } from "next/server";
-import { getDb } from "../../../../../lib/mongoClient";
-import { DataSetPatch, ImporterStatus } from "../ImporterDto";
export async function GET(
req: NextRequest,
@@ -12,23 +10,10 @@ export async function GET(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
- const workflowState = await handle.query("importer:status");
- if (workflowState.state === "closed") {
- return NextResponse.json({ error: "Importer is closed" }, { status: 410 });
- }
- const db = await getDb(importerId);
+ const importerManager = await getImporterManager();
const page = parseInt(req.nextUrl.searchParams.get("page") ?? "0");
const size = parseInt(req.nextUrl.searchParams.get("size") ?? "100");
- // TODO validate access or so :)
- const records = await db
- .collection("data")
- .find()
- .sort({ __sourceRowId: 1 })
- .skip(page * size)
- .limit(size)
- .toArray();
+ const records = await importerManager.getRecords(importerId, page, size);
return NextResponse.json({ records });
}
@@ -40,29 +25,15 @@ export async function PATCH(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
+ const importerManager = await getImporterManager();
const updateData = await req.json();
- const updateResult = await handle.executeUpdate<
+ const patches = [
{
- changedColumns: string[];
- newMessages: Record;
+ column: updateData.columnId,
+ rowId: updateData._id,
+ newValue: updateData.value,
},
- [{ patches: DataSetPatch[] }]
- >("importer:update-record", {
- args: [
- {
- patches: [
- {
- column: updateData.columnId,
- rowId: updateData._id,
- newValue: updateData.value,
- },
- ],
- },
- ],
- });
- // CALL update for patches
- // TODO return new messages for cell and return if it whole column changed (client can reload pages and stats)
+ ];
+ const updateResult = await importerManager.patchRecords(importerId, patches);
return NextResponse.json(updateResult);
}
diff --git a/app/src/app/api/importer/[slug]/route.ts b/app/src/app/api/importer/[slug]/route.ts
index e2f8f00..d71baa5 100644
--- a/app/src/app/api/importer/[slug]/route.ts
+++ b/app/src/app/api/importer/[slug]/route.ts
@@ -1,7 +1,6 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateAuth } from "@/lib/validateAuth";
import { NextRequest, NextResponse } from "next/server";
-import { ImporterConfig, ImporterDto, ImporterStatus } from "./ImporterDto";
export async function GET(
req: NextRequest,
@@ -11,16 +10,7 @@ export async function GET(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
- const [status, config] = await Promise.all([
- (await handle.query("importer:status")) as ImporterStatus,
- (await handle.query("importer:config")) as ImporterConfig,
- ]);
- const importerDto: ImporterDto = {
- importerId,
- config,
- status,
- };
+ const importerManager = await getImporterManager();
+ const importerDto = await importerManager.getImporterDto(importerId);
return NextResponse.json(importerDto);
}
diff --git a/app/src/app/api/importer/[slug]/start-import/route.ts b/app/src/app/api/importer/[slug]/start-import/route.ts
index 32b4c1b..c5093d9 100644
--- a/app/src/app/api/importer/[slug]/start-import/route.ts
+++ b/app/src/app/api/importer/[slug]/start-import/route.ts
@@ -1,7 +1,6 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateAuth } from "@/lib/validateAuth";
import { NextRequest, NextResponse } from "next/server";
-import { ImporterStatus } from "../ImporterDto";
export async function POST(
req: NextRequest,
@@ -11,12 +10,7 @@ export async function POST(
return NextResponse.json("Unauthorized", { status: 401 });
}
const { slug: importerId } = params;
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
- const workflowState = await handle.query("importer:status");
- if (workflowState.state === "closed") {
- return NextResponse.json({ error: "Importer is closed" }, { status: 410 });
- }
- await handle.executeUpdate("importer:start-import");
+ const importerManager = await getImporterManager();
+ await importerManager.startImport(importerId);
return NextResponse.json({ importerId });
}
diff --git a/app/src/app/api/upload/route.ts b/app/src/app/api/upload/route.ts
index b432159..944b809 100644
--- a/app/src/app/api/upload/route.ts
+++ b/app/src/app/api/upload/route.ts
@@ -1,8 +1,8 @@
-import { getTemporalWorkflowClient } from "@/lib/temporalClient";
import { randomUUID } from "crypto";
import { NextRequest, NextResponse } from "next/server";
import { extname } from "path";
+import { getImporterManager } from "@/lib/ImporterManager";
import { validateAuth } from "@/lib/validateAuth";
import { getMinioClient } from "../../../lib/minioClient";
@@ -10,16 +10,24 @@ export async function POST(req: NextRequest) {
if (validateAuth(req) === false) {
return NextResponse.json("Unauthorized", { status: 401 });
}
+ const { importerId, bucket, destFileName, fileFormat } =
+ await handleFileUpload(req);
+ const importerManager = await getImporterManager();
+ await importerManager.addFile(importerId, destFileName, fileFormat, bucket);
+ return new Response(undefined, { status: 201 });
+}
+
+async function handleFileUpload(req: NextRequest) {
const formData = await req.formData();
const file: File | null = formData.get("file") as unknown as File;
const importerId = formData.get("importerId") as unknown as string;
const fileBuffer = Buffer.from(await file.arrayBuffer());
if (!importerId) {
- return new Response("importerId missing", { status: 500 });
+ throw new Error("importerId missing");
}
if (!fileBuffer) {
- return new Response("file missing", { status: 500 });
+ throw new Error("file missing");
}
const bucket = importerId;
@@ -45,18 +53,14 @@ export async function POST(req: NextRequest) {
);
} catch (error) {
console.error(error);
- return new Response("Failed to upload file", { status: 500 });
+ throw new Error("Error uploading file");
}
- const client = await getTemporalWorkflowClient();
- const handle = client.getHandle(importerId);
- await handle.executeUpdate("importer:add-file", {
- args: [
- {
- fileReference: destFileName,
- fileFormat: extname(file.name) === ".csv" ? "csv" : "xlsx",
- bucket,
- },
- ],
- });
- return new Response("", { status: 201 });
+ const fileFormat = extname(file.name) === ".csv" ? "csv" : "xlsx";
+
+ return {
+ importerId,
+ bucket,
+ destFileName,
+ fileFormat,
+ };
}
diff --git a/app/src/components/hooks/useFetchRecords.tsx b/app/src/components/hooks/useFetchRecords.tsx
index bab1c3f..c59c07e 100644
--- a/app/src/components/hooks/useFetchRecords.tsx
+++ b/app/src/components/hooks/useFetchRecords.tsx
@@ -1,17 +1,19 @@
import { SourceData } from "@/app/api/importer/[slug]/ImporterDto";
-import { fetchWithAuth } from "@/lib/frontendFetch";
+import { useFrontendFetchWithAuth } from "@/lib/frontendFetch";
import { getHost } from "@/lib/utils";
export function useFetchRecords(importerId: string | null) {
+ const frontendFetch = useFrontendFetchWithAuth();
if (!importerId) {
return () => [];
}
return async (page: number, pageSize: number) => {
- return fetchRecords(importerId, page, pageSize);
+ return fetchRecords(frontendFetch, importerId, page, pageSize);
};
}
export async function fetchRecords(
+ frontendFetch: ReturnType,
importerId: string,
page: number,
pageSize: number
@@ -19,7 +21,7 @@ export async function fetchRecords(
const getUrl = new URL(`/api/importer/${importerId}/records`, getHost());
getUrl.searchParams.append("page", page.toFixed());
getUrl.searchParams.append("pageSize", pageSize.toFixed());
- const result = fetchWithAuth(getUrl, {
+ const result = frontendFetch(getUrl, {
method: "GET",
cache: "no-cache",
}).then((res) => res.json() as Promise<{ records: SourceData[] }>);
diff --git a/app/src/components/hooks/useGetImporter.tsx b/app/src/components/hooks/useGetImporter.tsx
index c4f846c..2c5ce62 100644
--- a/app/src/components/hooks/useGetImporter.tsx
+++ b/app/src/components/hooks/useGetImporter.tsx
@@ -1,5 +1,5 @@
import { ImporterDto } from "@/app/api/importer/[slug]/ImporterDto";
-import { fetchWithAuth } from "@/lib/frontendFetch";
+import { useFrontendFetchWithAuth } from "@/lib/frontendFetch";
import { getHost } from "@/lib/utils";
import useSWR from "swr";
@@ -8,9 +8,10 @@ export function useGetImporter(
pollInterval?: number,
fallbackData?: ImporterDto
) {
+ const frontendFetch = useFrontendFetchWithAuth();
const { data, error, isLoading, mutate } = useSWR(
importerId ? [`${getHost()}/api/importer/${importerId}`] : null,
- ([url]) => fetchWithAuth(url).then((res) => res.json()),
+ ([url]) => frontendFetch(url).then((res) => res.json()),
{
refreshInterval: pollInterval,
fallbackData,
diff --git a/app/src/components/hooks/useGetMappingRecommendations.tsx b/app/src/components/hooks/useGetMappingRecommendations.tsx
index 1f8d799..c72b579 100644
--- a/app/src/components/hooks/useGetMappingRecommendations.tsx
+++ b/app/src/components/hooks/useGetMappingRecommendations.tsx
@@ -1,5 +1,5 @@
import { DataMappingRecommendation } from "@/app/api/importer/[slug]/ImporterDto";
-import { fetchWithAuth } from "@/lib/frontendFetch";
+import { useFrontendFetchWithAuth } from "@/lib/frontendFetch";
import { getHost } from "@/lib/utils";
import useSWR from "swr";
@@ -8,11 +8,12 @@ export function useGetMappingRecommendations(
pollInterval?: number,
fallbackData?: { recommendations: DataMappingRecommendation[] | null }
) {
+ const frontendFetch = useFrontendFetchWithAuth();
const { data, error, isLoading } = useSWR(
importerId
? [`${getHost()}/api/importer/${importerId}/mappings/recommendations`]
: null,
- ([url]) => fetchWithAuth(url).then((res) => res.json()),
+ ([url]) => frontendFetch(url).then((res) => res.json()),
{
refreshInterval: pollInterval,
fallbackData,
diff --git a/app/src/lib/ImporterManager.ts b/app/src/lib/ImporterManager.ts
new file mode 100644
index 0000000..ecf4e8c
--- /dev/null
+++ b/app/src/lib/ImporterManager.ts
@@ -0,0 +1,146 @@
+import {
+ DataMapping,
+ DataMappingRecommendation,
+ ImporterConfig,
+ ImporterDto,
+ ImporterStatus,
+ SourceData,
+} from "@/app/api/importer/[slug]/ImporterDto";
+import { WorkflowClient } from "@temporalio/client";
+import { Db, MongoClient } from "mongodb";
+
+import { getMongoClient } from "./mongoClient";
+import { getTemporalWorkflowClient } from "./temporalClient";
+
+export class ImporterManager {
+ constructor(
+ private readonly workflowClient: WorkflowClient,
+ private readonly mongoClient: MongoClient
+ ) {}
+
+ public async getImporterDto(importerId: string) {
+ const handle = this.workflowClient.getHandle(importerId);
+ const [status, config] = await Promise.all([
+ (await handle.query("importer:status")) as ImporterStatus,
+ (await handle.query("importer:config")) as ImporterConfig,
+ ]);
+ const importerDto: ImporterDto = {
+ importerId,
+ config,
+ status,
+ };
+ return importerDto;
+ }
+
+ public async getMappingRecommendations(
+ importerId: string
+ ): Promise {
+ const handle = this.workflowClient.getHandle(importerId);
+ const workflowState = await handle.query("importer:status");
+ if (workflowState.state === "closed") {
+ throw new Error("Importer is closed");
+ }
+ const recommendations = await handle.query(
+ "importer:data-mapping-recommendations"
+ );
+ return recommendations;
+ }
+
+ public async updateMappings(importerId: string, mappings: DataMapping[]) {
+ const handle = this.workflowClient.getHandle(importerId);
+ const workflowState = await handle.query("importer:status");
+ if (workflowState.state === "closed") {
+ throw new Error("Importer is closed");
+ }
+ await handle.executeUpdate("importer:update-mapping", {
+ args: [{ mappings }],
+ });
+ }
+
+ public async closeImporter(importerId: string) {
+ const handle = this.workflowClient.getHandle(importerId);
+ await handle.signal("importer:close");
+ }
+
+ public async getRecords(
+ importerId: string,
+ page: number,
+ size: number
+ ): Promise {
+ const handle = this.workflowClient.getHandle(importerId);
+ const workflowState = await handle.query("importer:status");
+ if (workflowState.state === "closed") {
+ throw new Error("Importer is closed");
+ }
+ const db = this.getDb(importerId);
+ const records = await db
+ .collection("data")
+ .find()
+ .sort({ __sourceRowId: 1 })
+ .skip(page * size)
+ .limit(size)
+ .toArray();
+ return records;
+ }
+
+ public async patchRecords(
+ importerId: string,
+ patches: { column: string; rowId: string; newValue: string }[]
+ ) {
+ const handle = this.workflowClient.getHandle(importerId);
+ const workflowState = await handle.query("importer:status");
+ if (workflowState.state === "closed") {
+ throw new Error("Importer is closed");
+ }
+ const updateResult = await handle.executeUpdate("importer:update-record", {
+ args: [{ patches }],
+ });
+ return updateResult;
+ }
+
+ public async startImport(importerId: string) {
+ const handle = this.workflowClient.getHandle(importerId);
+ const workflowState = await handle.query("importer:status");
+ if (workflowState.state === "closed") {
+ throw new Error("Importer is closed");
+ }
+ await handle.executeUpdate("importer:start-import");
+ }
+
+ public async addFile(
+ importerId: string,
+ fileReference: string,
+ fileFormat: string,
+ bucket: string
+ ) {
+ const handle = this.workflowClient.getHandle(importerId);
+ const workflowState = await handle.query("importer:status");
+ if (workflowState.state === "closed") {
+ throw new Error("Importer is closed");
+ }
+ await handle.executeUpdate("importer:add-file", {
+ args: [
+ {
+ fileReference,
+ fileFormat,
+ bucket,
+ },
+ ],
+ });
+ }
+
+ private getDb(importerId: string): Db {
+ return this.mongoClient.db(importerId);
+ }
+}
+
+let importerManager: ImporterManager;
+
+export async function getImporterManager() {
+ if (!importerManager) {
+ const workflowClient = await getTemporalWorkflowClient();
+ const mongoClient = await getMongoClient();
+ importerManager = new ImporterManager(workflowClient, mongoClient);
+ }
+ return importerManager;
+}
diff --git a/app/src/lib/frontendFetch.ts b/app/src/lib/frontendFetch.ts
index 625667b..e071df5 100644
--- a/app/src/lib/frontendFetch.ts
+++ b/app/src/lib/frontendFetch.ts
@@ -1,9 +1,15 @@
-export async function fetchWithAuth(...args: Parameters) {
- return await fetch(args[0], {
- ...args[1],
- headers: {
- authorization: "Bearer " + process.env.NEXT_PUBLIC_FRONTEND_TOKEN,
- ...args[1]?.headers,
- },
- });
+export function useFrontendFetchWithAuth() {
+ const baseUrl = undefined; // TODO get from context or so
+ return async (...args: Parameters) => {
+ const url = baseUrl
+ ? new URL(args[0] as string, baseUrl)
+ : (args[0] as string);
+ return await fetch(url, {
+ ...args[1],
+ headers: {
+ authorization: "Bearer " + process.env.NEXT_PUBLIC_FRONTEND_TOKEN,
+ ...args[1]?.headers,
+ },
+ });
+ };
}
diff --git a/app/src/lib/mongoClient.ts b/app/src/lib/mongoClient.ts
index 65487e4..f808c3a 100644
--- a/app/src/lib/mongoClient.ts
+++ b/app/src/lib/mongoClient.ts
@@ -4,10 +4,15 @@ import { MongoClient } from "mongodb";
let client: Promise;
export const getDb = async (dbName: string) => {
+ const client = await getMongoClient();
+ return client.db(dbName);
+};
+
+export const getMongoClient = async () => {
if (!client) {
client = new MongoClient(env.get("MONGO_URL").required().asString(), {
appName: "nextjs",
}).connect();
}
- return (await client).db(dbName);
+ return await client;
};
diff --git a/app/src/lib/temporalClient.ts b/app/src/lib/temporalClient.ts
index 952a886..8010a4a 100644
--- a/app/src/lib/temporalClient.ts
+++ b/app/src/lib/temporalClient.ts
@@ -5,7 +5,6 @@ export const DEFAULT_TEMPORAL_QUEUE = "imports";
let connection: Connection;
export async function getTemporalWorkflowClient() {
- // TODO get options from env
if (!connection) {
connection = await Connection.connect({
address: env.get("TEMPORAL_ADDRESS").default("localhost:7233").asString(),
diff --git a/app/src/lib/utils.ts b/app/src/lib/utils.ts
index 9e0be58..be7eb6f 100644
--- a/app/src/lib/utils.ts
+++ b/app/src/lib/utils.ts
@@ -6,10 +6,11 @@ export function cn(...inputs: ClassValue[]) {
}
export function getHost(): string {
- // this is needed because we use the api for ssr to. This is not really a good solution but works for now
- // See https://github.com/vercel/next.js/discussions/48793 for the "correct" way
const isServer = typeof window === "undefined";
- const host = isServer ? "http://127.0.0.1:3000" : "";
+ if (isServer) {
+ throw new Error("getHost() should only be called on the client");
+ }
+ const host = "";
return host;
}