diff --git a/docker-compose.yml b/docker-compose.yml index b03d2fc..9d49f25 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,7 @@ services: redis-gui: image: rediscommander/redis-commander environment: - - REDIS_HOSTS=local:connection:6379 + - REDIS_HOSTS=local:redis:6379 ports: - "8180:8081" @@ -16,3 +16,24 @@ services: image: scireum/s3-ninja ports: - "9000:9000" + + tusd: + image: tusproject/tusd + volumes: + - ./tmp-upload:/srv/tusd-data/data + entrypoint: ["tusd"] + command: [ + "-hooks-http", "http://localhost:8001/tusd-hooks", + "-hooks-http-forward-headers", "Authorization", + "-behind-proxy", + "-hooks-enabled-events", "pre-finish", + ] + network_mode: host + + nginx: + build: + context: nginxKludge + dockerfile: Dockerfile.nginx + depends_on: + - tusd + network_mode: host diff --git a/nginxKludge/Dockerfile.nginx b/nginxKludge/Dockerfile.nginx new file mode 100644 index 0000000..afc6a5d --- /dev/null +++ b/nginxKludge/Dockerfile.nginx @@ -0,0 +1,2 @@ +FROM nginx:alpine +COPY nginx.conf /etc/nginx/conf.d/default.conf \ No newline at end of file diff --git a/nginxKludge/nginx.conf b/nginxKludge/nginx.conf new file mode 100644 index 0000000..5eb280e --- /dev/null +++ b/nginxKludge/nginx.conf @@ -0,0 +1,14 @@ +server { + listen *:8002; + client_max_body_size 100M; + + location /files { + + add_header "Access-Control-Allow-Credentials" "true"; + proxy_hide_header Access-Control-Allow-Origin; + add_header Access-Control-Allow-Origin $http_origin; + add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, x-csrf-token"; + add_header "Access-Control-Allow-Methods" "POST, HEAD, PATCH, OPTIONS, GET, DELETE"; + proxy_pass http://localhost:1080/files; + } +} \ No newline at end of file diff --git a/package.json b/package.json index 7d1f736..ab6816b 100644 --- a/package.json +++ b/package.json @@ -26,11 +26,12 @@ "@koa/router": "^12.0.0", "aws-sdk": "^2.1421.0", "axios": "^1.4.0", - "bull": "^4.10.1", + "bullmq": "^4.6.0", "dotenv": "^16.3.1", "ffmpeg": "^0.0.4", "fluent-ffmpeg": "^2.1.2", "http-event-stream": "^0.2.0", + "ioredis": "^5.3.2", "jest": "^29.6.1", "kafkajs": "^2.2.2", "koa": "^2.13.4", diff --git a/src/config.ts b/src/config.ts new file mode 100644 index 0000000..e61dd19 --- /dev/null +++ b/src/config.ts @@ -0,0 +1,34 @@ +export const IS_PROD = process.env["NODE_ENV"] === "production" +export const TEMP_UPLOAD_FOLDER = "tmp-upload" +export const SECRET_KEY_HEADER = "X-Api-Key" +export const FK_API_KEY = process.env["FK_API_KEY"]! +export const FK_API = process.env["FK_API"]! + +if (!FK_API) { + throw new Error( + "FK_API is not set! Please set it to the base url of the api to communicate with.", + ) +} + +if (!FK_API_KEY) { + throw new Error( + "FK_API_KEY is not set, this means internal endpoints will be unreachable", + ) +} + +export const MEDIA_BUCKET = "media" +const getRedisURL = () => { + const { REDIS_SERVICE_HOST, REDIS_SERVICE_PORT, REDIS_URL } = process.env + + if (REDIS_SERVICE_HOST?.length && REDIS_SERVICE_PORT?.length) + return `redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}` + + if (!REDIS_URL?.length) { + throw new Error( + "REDIS_SERVICE_HOST && REDIS_SERVICE_PORT or REDIS_URL is not set!", + ) + } + + return REDIS_URL +} +export const REDIS_URL = getRedisURL() diff --git a/src/constants.ts b/src/constants.ts deleted file mode 100644 index 568d1d7..0000000 --- a/src/constants.ts +++ /dev/null @@ -1,17 +0,0 @@ -export const IS_PROD = process.env["NODE_ENV"] === "production" -export const TEMP_UPLOAD_FOLDER = "tmp-upload" -export const SECRET_KEY_HEADER = "X-Api-Key" -export const FK_API_KEY = process.env["FK_API_KEY"]! -export const FK_API = process.env["FK_API"]! - -if (!FK_API) { - throw new Error( - "FK_API is not set! Please set it to the base url of the api to communicate with." - ) -} - -if (!FK_API_KEY) { - throw new Error( - "FK_API_KEY is not set, this means internal endpoints will be unreachable" - ) -} diff --git a/src/log.ts b/src/log.ts index 19d5c3a..0d949b9 100644 --- a/src/log.ts +++ b/src/log.ts @@ -1,6 +1,6 @@ import { Logger } from "tslog" import type { ILogObj } from "tslog" -import { IS_PROD } from "./constants.js" +import { IS_PROD } from "./config" import type { Middleware } from "koa" const LogLevels = [ @@ -14,7 +14,7 @@ const LogLevels = [ ] as const export const getLogLevel = (): number => { - const LOG_LEVEL = process.env["LOG_LEVEL"] as + const LOG_LEVEL = process.env["LOG_LEVEL"]?.toUpperCase() as | (typeof LogLevels)[number] | undefined diff --git a/src/metrics.ts b/src/metrics.ts index 76144da..6979c90 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -1,18 +1,35 @@ -import Queue from "bull" +import { Queue } from "bullmq" import type { VideoJobData } from "./video/types.js" -import { url } from "./upload/redis/connection.js" import { Gauge, register } from "prom-client" import type { Middleware } from "koa" import { log } from "./log.js" +import IORedis from "ioredis" +import { REDIS_URL } from "./config" +const connection = new IORedis(REDIS_URL) -const queue = new Queue("video-processing", url) +const queue = new Queue("video-processing", { connection }) + +const getCounts = async () => { + const { waiting, active, delayed, failed } = await queue.getJobCounts() + if (!waiting || !active || !delayed || !failed) { + throw new Error(`Failed to get job counts`) + } + const size = waiting + active + delayed + failed + + return { + size, + waiting, + active, + delayed, + failed, + } +} const processQueueSizeTotal = new Gauge({ name: "process_queue_size_total", help: "Total number of jobs in the processing queue", collect: async () => { - const counts = await queue.getJobCounts() - const size = counts.waiting + counts.active + counts.delayed + const { size } = await getCounts() processQueueSizeTotal.set(size) }, }) @@ -21,8 +38,9 @@ const processQueuePending = new Gauge({ name: "process_queue_pending", help: "Number of pending jobs in the processing queue", collect: async () => { - const counts = await queue.getJobCounts() - processQueuePending.set(counts.waiting + counts.delayed) + const { waiting, delayed } = await getCounts() + + processQueuePending.set(waiting + delayed) }, }) @@ -30,8 +48,9 @@ const processQueueActive = new Gauge({ name: "process_queue_active", help: "Number of active jobs in the processing queue", collect: async () => { - const counts = await queue.getJobCounts() - processQueueActive.set(counts.active) + const { active } = await getCounts() + + processQueueActive.set(active) }, }) @@ -39,8 +58,8 @@ const processQueueFailed = new Gauge({ name: "process_queue_failed", help: "Number of failed jobs in the processing queue", collect: async () => { - const counts = await queue.getJobCounts() - processQueueFailed.set(counts.failed) + const { failed } = await getCounts() + processQueueFailed.set(failed) }, }) export const showMetrics: Middleware = async (ctx, next) => { diff --git a/src/middleware/authenticate.ts b/src/middleware/authenticate.ts index a13caca..2970bbf 100644 --- a/src/middleware/authenticate.ts +++ b/src/middleware/authenticate.ts @@ -1,6 +1,7 @@ import type { Middleware } from "koa" import axios from "axios" -import { FK_API, FK_API_KEY, SECRET_KEY_HEADER } from "../constants.js" +import { FK_API, FK_API_KEY, SECRET_KEY_HEADER } from "../config" +import { log } from "../log" export type AuthUser = { id: number @@ -16,6 +17,7 @@ export type AuthState = { export type Options = { required?: true + cookieGetter?: (ctx: any) => string } // TODO: Some caching might be in order here, so that we don't hammer the API @@ -23,8 +25,8 @@ export type Options = { export const authenticate = (options: Options): Middleware => async (ctx, next) => { - const { required } = options - const cookie = ctx.get("Cookie") + const { required, cookieGetter } = options + const cookie = cookieGetter ? cookieGetter(ctx) : ctx.get("Cookie") const { data, status } = await axios.get("/auth/user", { baseURL: FK_API, @@ -39,6 +41,7 @@ export const authenticate = if (status !== 200) return ctx.throw(500, "auth_server_error") if (!data.user) { + log.info(data.user) if (required) return ctx.throw(401) } else { ctx.state.user = data.user diff --git a/src/middleware/sendCORSDev.ts b/src/middleware/sendCORSDev.ts index 2bdfe5d..020943c 100644 --- a/src/middleware/sendCORSDev.ts +++ b/src/middleware/sendCORSDev.ts @@ -1,36 +1,16 @@ import type { Middleware } from "koa" export const sendCORSDev = (): Middleware => (context, next) => { - const { method } = context - context.set("Access-Control-Allow-Origin", "http://localhost:3000") context.set("Access-Control-Allow-Credentials", "true") context.set( "Access-Control-Allow-Methods", - "GET,PUT,POST,PATCH,DELETE,OPTIONS,HEAD" - ) - - context.set( - "Access-Control-Allow-Headers", - [ - "Content-Type", - "Tus-Resumable", - "Upload-Offset", - "Upload-Length", - "Upload-Metadata", - "X-CSRF-Token", - "Location", - ].join(", ") + "GET,PUT,POST,PATCH,DELETE,OPTIONS,HEAD", ) - context.set( - "Access-Control-Expose-Headers", - ["Location", "Upload-Offset", "Upload-Length"].join(", ") - ) + context.set("Access-Control-Allow-Headers", "X-CSRF-Token") - if (method === "OPTIONS") { - context.status = 200 - } + if (context.method === "OPTIONS") context.status = 200 return next() } diff --git a/src/server.ts b/src/server.ts index 5f585b7..bd2545f 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,17 +1,16 @@ import "dotenv/config" -import { connection } from "./upload/redis/connection.js" import Koa from "koa" import bodyParser from "koa-bodyparser" import { handleError } from "./middleware/handleError.js" -//import { sendCORSDev } from "./middleware/sendCORSDev.js" -import { videoRouter } from "./upload/router.js" import { log, requestLogger } from "./log.js" -import { FK_API, FK_API_KEY, IS_PROD, SECRET_KEY_HEADER } from "./constants.js" +import { FK_API, FK_API_KEY, IS_PROD, SECRET_KEY_HEADER } from "./config" import { OpenAPI } from "./generated/index.js" import { statusUpdate } from "./status/router.js" import { showMetrics } from "./metrics.js" +import { sendCORSDev } from "./middleware/sendCORSDev" +import { uploadHookRouter } from "./tusHook/router" OpenAPI.BASE = FK_API @@ -30,13 +29,15 @@ log.info({ IS_PROD }) app.use(requestLogger()) app.use(handleError) app.use(bodyParser()) -//if (!IS_PROD) app.use(sendCORSDev()) +if (!IS_PROD) { + log.warn("Dev mode: Enabling CORS for localhost") + app.use(sendCORSDev()) +} app.use(showMetrics) -app.use(videoRouter.prefix("/upload/video").routes()) +app.use(uploadHookRouter.prefix("/tusd-hooks").routes()) app.use(statusUpdate("/upload/status")) async function main() { - await connection.connect() app.listen(port, () => log.info(`media-processor listening on port ${port}`)) } diff --git a/src/status/router.ts b/src/status/router.ts index 8fecda7..d39e928 100644 --- a/src/status/router.ts +++ b/src/status/router.ts @@ -1,5 +1,4 @@ import type { Context, Next } from "koa" -import { videoQueue } from "../video/queue.js" import { PassThrough } from "stream" const streamEventsMode = async (ctx: Context) => { @@ -21,6 +20,7 @@ const streamEventsMode = async (ctx: Context) => { * HTTP Event Stream showing the progress of the encoding job * * @param prefix URL prefix + * @deprecated The code as it stands is vestigial and should be replaced */ export const statusUpdate = (prefix: string) => async (ctx: Context, next: Next) => { @@ -31,21 +31,20 @@ export const statusUpdate = await streamEventsMode(ctx) - const responseStream = new PassThrough() - ctx.response.body = responseStream - - const job = await videoQueue.getJob(jobId) - if (job === null) return ctx.throw(404, "job_does_not_exist") - - const response = { - jobId, - progress: job.progress(), - } - - responseStream.write("event: name\n") - responseStream.write(`data: ${JSON.stringify(response)}\n\n`) - - if (await job.finished()) responseStream.end() + ctx.response.body = new PassThrough() + + //const job = await videoQueue.getJob(`${jobId}`) + //if (!job) return ctx.throw(404, "job_does_not_exist") + // + //const response = { + // jobId, + // progress: job.progress(), + //} + // + //responseStream.write("event: name\n") + //responseStream.write(`data: ${JSON.stringify(response)}\n\n`) + // + //if (await job) responseStream.end() return next() } diff --git a/src/tusHook/handlers/analyze.ts b/src/tusHook/handlers/analyze.ts new file mode 100644 index 0000000..c761f4f --- /dev/null +++ b/src/tusHook/handlers/analyze.ts @@ -0,0 +1,25 @@ +import type { Middleware } from "koa" +import type { UploadHookState } from "../types" +import { getVideoMetadata } from "../../video/getVideoMetadata" +import { log } from "../../log" +import { stat } from "fs" +import { promisify } from "util" +const statAsync = promisify(stat) + +// Analyze the received file to extract metadata +export const analyze: Middleware = async (ctx, next) => { + const { Path, uploadId } = ctx.state + log.info(`Analyzing ${Path} for metadata`, { uploadId }) + const stats = await statAsync(Path) + if (!stats.isFile()) { + log.error(`Rejecting ${Path}`, { uploadId }) + return ctx.throw(400, "Invalid file") + } + const metadata = await getVideoMetadata(Path) + if (!metadata) { + log.error(`Rejecting ${Path}`, { uploadId }) + return ctx.throw(400, "Invalid file") + } + ctx.state.metadata = metadata + return next() +} diff --git a/src/tusHook/handlers/ingest.ts b/src/tusHook/handlers/ingest.ts new file mode 100644 index 0000000..43e212b --- /dev/null +++ b/src/tusHook/handlers/ingest.ts @@ -0,0 +1,24 @@ +import type { Middleware } from "koa" +import type { UploadHookState } from "../types" +import { videoQueue } from "../../video/queue" +import { log } from "../../log" + +// Begin an ingest job for the uploaded file to generate secondary assets +export const ingest: Middleware = async (ctx, next) => { + const { Path, mediaId, metadata, uploadId } = ctx.state + + const { id: jobId } = await videoQueue.add(ctx.state.uploadId, { + pathToVideo: Path, + mediaId, + metadata, + uploadId: uploadId, + }) + + log.info(`Created ingest job ${jobId} for media #${mediaId} ${Path}`, { + uploadId, + }) + + ctx.status = 200 + ctx.body = { message: "OK" } + return next() +} diff --git a/src/tusHook/handlers/register.ts b/src/tusHook/handlers/register.ts new file mode 100644 index 0000000..c8f9515 --- /dev/null +++ b/src/tusHook/handlers/register.ts @@ -0,0 +1,12 @@ +import type { Middleware } from "koa" +import type { UploadHookState } from "../types" +import { log } from "../../log" +import { registerMedia } from "../helpers/registerMedia" + +// Register the received and validated file in the database +export const register: Middleware = async (ctx, next) => { + const { locator, originalFilename, metadata, uploadId } = ctx.state + log.info(`Registering ${locator} in database`, { uploadId }) + ctx.state.mediaId = await registerMedia(locator, originalFilename, metadata) + return next() +} diff --git a/src/tusHook/handlers/store.ts b/src/tusHook/handlers/store.ts new file mode 100644 index 0000000..e62430a --- /dev/null +++ b/src/tusHook/handlers/store.ts @@ -0,0 +1,18 @@ +import type { Middleware } from "koa" +import type { UploadHookState } from "../types" +import { s3Client } from "../../s3Client" +import { MEDIA_BUCKET } from "../../config" +import fs from "fs" + +export const store: Middleware = async (ctx, next) => { + const { uploadId, Path, metadata } = ctx.state + + await s3Client.putObject({ + Bucket: MEDIA_BUCKET, + Key: `${uploadId}`, + Body: await fs.createReadStream(Path), + ContentType: metadata.mime, + }) + + return next() +} diff --git a/src/tusHook/helpers/parseRequest.ts b/src/tusHook/helpers/parseRequest.ts new file mode 100644 index 0000000..f2c3a4d --- /dev/null +++ b/src/tusHook/helpers/parseRequest.ts @@ -0,0 +1,34 @@ +import type { Middleware } from "koa" +import type { UploadHookState } from "../types" +import { UploadObjectSchema } from "../schema" +import { MEDIA_BUCKET, TEMP_UPLOAD_FOLDER } from "../../config" +import path from "path" + +export const parseRequest: Middleware = async (ctx, next) => { + try { + const { + Upload: { + ID, + MetaData, + Storage: { Path }, + }, + HTTPRequest: { Header }, + } = await UploadObjectSchema.validate(ctx.request.body) + + if (!Path) return ctx.throw(500, "Invalid upload storage") + + if (!MetaData["filename"]) + return ctx.throw(500, "missing filename metadata") + + ctx.state.uploadId = ID + ctx.state.Path = `${TEMP_UPLOAD_FOLDER}/${path.basename(Path)}` + ctx.state.originalFilename = MetaData["filename"] + ctx.state.locator = `S3:${MEDIA_BUCKET}:${ctx.state.uploadId}` + ctx.state.cookie = Header["Cookie"] + + return next() + } catch (error) { + ctx.status = 400 + ctx.body = { message: error } + } +} diff --git a/src/tusHook/helpers/registerMedia.ts b/src/tusHook/helpers/registerMedia.ts new file mode 100644 index 0000000..f7323a5 --- /dev/null +++ b/src/tusHook/helpers/registerMedia.ts @@ -0,0 +1,21 @@ +import type { VideoMetadataV2 } from "../../video/getVideoMetadata" +import { log } from "../../log" +import { MediaService } from "../../generated" +import { FK_API_KEY } from "../../config" + +export async function registerMedia( + originalLocator: `S3:${string}` | `CLOUDFLARE:${string}`, + filename: string, + metadata: VideoMetadataV2, +) { + log.info("Registering file on backend") + + const { id } = await MediaService.postVideosMedia(FK_API_KEY, { + locator: originalLocator, + fileName: filename, + duration: metadata.duration, + metadata, + }) + + return id +} diff --git a/src/tusHook/router.ts b/src/tusHook/router.ts new file mode 100644 index 0000000..89876e1 --- /dev/null +++ b/src/tusHook/router.ts @@ -0,0 +1,32 @@ +import Router from "@koa/router" +import { authenticate } from "../middleware/authenticate" +import type { UploadHookState } from "./types" +import { ingest } from "./handlers/ingest" +import { register } from "./handlers/register" +import { analyze } from "./handlers/analyze" +import { parseRequest } from "./helpers/parseRequest" +import { store } from "./handlers/store" +import { log } from "../log" + +// HTTP hooks for tusd +const uploadHookRouter = new Router() + +uploadHookRouter.use(async (ctx, next) => { + log.info(`Request: ${ctx.method} ${ctx.path}`) + log.info(`Request headers: ${JSON.stringify(ctx.request.headers, null, 2)}`) + log.info(`Request body: ${JSON.stringify(ctx.request.body, null, 2)}`) + + await next() + log.info(`Response: HTTP ${ctx.status}: ${JSON.stringify(ctx.body, null, 2)}`) +}) +uploadHookRouter.use(parseRequest) +uploadHookRouter.use( + authenticate({ + required: true, + cookieGetter: (ctx) => ctx.state.cookie, + }), +) + +uploadHookRouter.post("/", analyze, store, register, ingest) + +export { uploadHookRouter } diff --git a/src/tusHook/schema.ts b/src/tusHook/schema.ts new file mode 100644 index 0000000..1503363 --- /dev/null +++ b/src/tusHook/schema.ts @@ -0,0 +1,55 @@ +import * as Yup from "yup" + +const StorageSchema = Yup.object().shape({ + Type: Yup.string().required(), + Path: Yup.string(), + Bucket: Yup.string(), + Key: Yup.string(), +}) + +const HeaderSchema = Yup.lazy((value: any) => + Yup.object( + Object.keys(value).reduce( + (schema: Record, key) => { + schema[key] = Yup.array().of(Yup.string()).required() + return schema + }, + {} as Record, + ), + ), +) + +const MetaDataSchema = Yup.lazy((value: any) => + Yup.object( + Object.keys(value).reduce( + (schema: Record, key) => { + schema[key] = Yup.string().required() + return schema + }, + {} as Record, + ), + ), +) + +const UploadSchema = Yup.object().shape({ + ID: Yup.string().required(), + Size: Yup.number().required(), + Offset: Yup.number().required(), + IsFinal: Yup.boolean().required(), + IsPartial: Yup.boolean().required(), + PartialUploads: Yup.array().of(Yup.string()).nullable(), + MetaData: MetaDataSchema, + Storage: StorageSchema.required(), +}) + +const HTTPRequestSchema = Yup.object().shape({ + Method: Yup.string().required(), + URI: Yup.string().required(), + RemoteAddr: Yup.string().required(), + Header: HeaderSchema, +}) + +export const UploadObjectSchema = Yup.object().shape({ + Upload: UploadSchema, + HTTPRequest: HTTPRequestSchema, +}) diff --git a/src/tusHook/types.ts b/src/tusHook/types.ts new file mode 100644 index 0000000..3318c9b --- /dev/null +++ b/src/tusHook/types.ts @@ -0,0 +1,18 @@ +import type { VideoMetadataV2 } from "../video/getVideoMetadata" +import type { AuthState } from "../middleware/authenticate" + +export type UploadHookState = { + // Local path to received upload + Path: string + // Upload ID from tus + uploadId: string + // Original filename from tus + originalFilename: string + // File metadata + metadata: VideoMetadataV2 + // Media locator (as used in the back-end database) + locator: `S3:${string}` + // All request cookies + cookie: string + mediaId: number +} & AuthState diff --git a/src/upload/redis/RedisNamespace.ts b/src/upload/redis/RedisNamespace.ts deleted file mode 100644 index 622358e..0000000 --- a/src/upload/redis/RedisNamespace.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { connection } from "./connection.js" - -export class RedisNamespace { - constructor(private prefix: string) {} - - public set(key: string, value: T, duration?: number) { - return connection.set(this.getKey(key), JSON.stringify(value), { - EX: duration!, - }) - } - - public async get(key: string) { - const value = await connection.get(this.getKey(key)) - - if (value) { - return JSON.parse(value) as T - } - - return null - } - - public delete(key: string) { - return connection.del(this.getKey(key)) - } - - public async assign(key: string, value: Partial) { - const existing = await this.get(key) - - if (existing) { - const newData = { ...existing, ...value } - await this.set(key, newData) - } - } - - private getKey(key: string) { - return `${this.prefix}:${key}` - } -} diff --git a/src/upload/redis/ResumableUpload.ts b/src/upload/redis/ResumableUpload.ts deleted file mode 100644 index d259b25..0000000 --- a/src/upload/redis/ResumableUpload.ts +++ /dev/null @@ -1,162 +0,0 @@ -import { close, createWriteStream, open, stat, unlink } from "fs" -import { promisify } from "util" -import { randomBytes } from "crypto" -import type { Readable } from "stream" -import { HttpError } from "../../HttpError.js" -import { TEMP_UPLOAD_FOLDER } from "../../constants.js" -import { RedisNamespace } from "./RedisNamespace.js" - -const openAsync = promisify(open) -const closeAsync = promisify(close) -const unlinkAsync = promisify(unlink) -const statAsync = promisify(stat) - -type ResumableUploadCreateOptions = { - user: number - length: number - filename: string - metadata: Record -} - -type ResumableUploadData = { - /** The key used to identify the upload */ - key: string - /** The path to the file */ - path: string - /** Upload offset, how many bytes has been uploaded */ - offset: number - /** Total amount of bytes, the size of the file */ - length: number - /** The name of the file uploaded */ - filename: string - /** The user that creted this upload */ - user: number - /** Metadata used for various reasons */ - metadata: Record -} - -const resumableUploadNamespace = new RedisNamespace( - "resumable-uploads" -) - -// Redis-backed resumable upload -export class ResumableUpload { - constructor(private data: ResumableUploadData) {} - - public static async create(options: ResumableUploadCreateOptions) { - const path = `${TEMP_UPLOAD_FOLDER}/${randomBytes(16).toString("hex")}` - const key = `${randomBytes(16).toString("hex")}-${options.user}` - - const upload = new ResumableUpload({ ...options, offset: 0, path, key }) - - await upload.createFile() - return upload - } - - public static async restore(key: string) { - const data = await resumableUploadNamespace.get(key) - if (!data) return undefined - - const upload = new ResumableUpload(data) - - try { - const { size } = await statAsync(upload.path) - upload.data.offset = size - } catch (error: any) { - if (error.code === "ENOENT") { - throw new HttpError(410, "upload does not exist yet") - } - - throw error - } - - return upload - } - - private async createFile() { - const { path } = this.data - const descriptor = await openAsync(path, "w") - - await this.save() - await closeAsync(descriptor) - } - - public writeToFile(readStream: Readable) { - const { offset, path } = this.data - - const writeStream = createWriteStream(path, { - flags: "r+", - start: offset, - }) - - return new Promise((resolve, reject) => { - writeStream.on("close", async () => { - if (this.invalid) { - reject(new HttpError(400, "Payload exceeds set length")) - - return - } - - await this.save() - resolve(undefined) - }) - - readStream.on("data", (buffer: Buffer) => { - this.data.offset += buffer.length - }) - - readStream.on("error", () => { - reject(new HttpError(500, "Stream error")) - }) - - readStream.pipe(writeStream) - }) - } - - public async delete() { - const { key, path } = this.data - - await unlinkAsync(path) - await resumableUploadNamespace.delete(key) - } - - public async save() { - const { key } = this.data - await resumableUploadNamespace.set(key, this.data) - } - - public get finished() { - const { offset, length } = this.data - - return offset === length - } - - public get invalid() { - const { offset, length } = this.data - return offset > length - } - - public get user() { - return this.data.user - } - - public get path() { - return this.data.path - } - - public get key() { - return this.data.key - } - - public get length() { - return this.data.length - } - - public get offset() { - return this.data.offset - } - - public get filename() { - return this.data.filename - } -} diff --git a/src/upload/redis/connection.ts b/src/upload/redis/connection.ts deleted file mode 100644 index c8141af..0000000 --- a/src/upload/redis/connection.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { createClient } from "redis" - -const getRedisURL = () => { - const { REDIS_SERVICE_HOST, REDIS_SERVICE_PORT, REDIS_URL } = process.env - - if (REDIS_SERVICE_HOST?.length && REDIS_SERVICE_PORT?.length) - return `redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}` - - if (!REDIS_URL?.length) { - throw new Error("REDIS_SERVICE_HOST && REDIS_SERVICE_PORT or REDIS_URL is not set!") - } - - return REDIS_URL -} - -export const url = getRedisURL() - -export const connection = createClient({ - url, -}) diff --git a/src/upload/router.ts b/src/upload/router.ts deleted file mode 100644 index b1289ad..0000000 --- a/src/upload/router.ts +++ /dev/null @@ -1,52 +0,0 @@ -import Router from "@koa/router" -import { ingestVideo } from "../video/ingestVideo.js" -import { authenticate } from "../middleware/authenticate.js" -import { uploadCreate } from "./tus/uploadCreate.js" -import { uploadGet } from "./tus/uploadGet.js" -import { uploadSend } from "./tus/uploadSend.js" -import { uploadPatch } from "./tus/uploadPatch.js" -import type { Middleware } from "koa" - -export const TUS_RESUMABLE = "1.0.0" -export const TUS_EXTENSIONS = ["creation", "expiration"] -export const TUS_MAX_SIZE = 50 * 1024 * 1024 * 1024 // 50 GB - -export const setTusHeaders: Middleware = async (ctx, next) => { - ctx.set("Tus-Extension", TUS_EXTENSIONS.join(",")) - ctx.set("Tus-Version", TUS_RESUMABLE) - ctx.set("Tus-Resumable", TUS_RESUMABLE) - ctx.set("Tus-Max-Size", String(TUS_MAX_SIZE)) - - return next() -} - -const setHTTPStatus = - (status: number): Middleware => - async (ctx, next) => { - ctx.status = status - return next() - } - -const router = new Router() - -router.post("/", authenticate({ required: true }), setTusHeaders, uploadCreate) - -router.head( - "/:key", - authenticate({ required: true }), - setTusHeaders, - uploadGet, - uploadSend, -) - -router.patch( - "/:key", - authenticate({ required: true }), - uploadGet, - uploadPatch, - ingestVideo, -) - -router.options("/", setTusHeaders, setHTTPStatus(204)) - -export { router as videoRouter } diff --git a/src/upload/tus/uploadCreate.ts b/src/upload/tus/uploadCreate.ts deleted file mode 100644 index 51d984d..0000000 --- a/src/upload/tus/uploadCreate.ts +++ /dev/null @@ -1,52 +0,0 @@ -import type { Middleware } from "koa" -import { ResumableUpload } from "../redis/ResumableUpload.js" -import type { AuthState } from "../../middleware/authenticate.js" -import { log } from "../../log.js" -import { TUS_MAX_SIZE } from "../router.js" - -export type CreateUploadState = AuthState & { - upload: ResumableUpload -} - -const parseMetadata = (metadata: string): Record => { - const result: any = {} - const rows = metadata.split(",") - - for (const row of rows) { - const [key, rawValue] = row.split(" ") as [string, string] - - result[key] = Buffer.from(rawValue, "base64").toString("ascii") - } - - return result -} -export const uploadCreate: Middleware = async ( - ctx, - next -) => { - const user = ctx.state.user.id - const length = Number(ctx.get("Upload-Length")) - - if (length > TUS_MAX_SIZE) - return ctx.throw(400, "too_large", { maxSize: TUS_MAX_SIZE }) - if (!length) return ctx.throw(400, `length_required`) - - const { filename = "unnamed", ...metadata } = parseMetadata( - ctx.get("Upload-Metadata") - ) - - log.info(`Got ${(length / 1048576).toFixed(2)}MiB upload from ${user}`) - - const upload = await ResumableUpload.create({ - user, - metadata, - filename, - length, - }) - - ctx.set("Location", `video/${upload.key}`) - ctx.status = 201 - ctx.state.upload = upload - - return next() -} diff --git a/src/upload/tus/uploadGet.ts b/src/upload/tus/uploadGet.ts deleted file mode 100644 index d691f6f..0000000 --- a/src/upload/tus/uploadGet.ts +++ /dev/null @@ -1,25 +0,0 @@ -import type { Middleware } from "koa" -import { ResumableUpload } from "../redis/ResumableUpload.js" -import type { AuthState } from "../../middleware/authenticate.js" - -export type GetUploadState = AuthState & { - upload: ResumableUpload -} - -/** Fetches an upload in progress from Redis **/ -export const uploadGet: Middleware = async (ctx, next) => { - const user = ctx.state.user.id - const { key } = ctx["params"] - - if (!key) return ctx.throw(400, "missing_key") - - const upload = await ResumableUpload.restore(key) - - if (!upload) return ctx.throw(410, "no_such_upload") - - if (user !== upload.user) return ctx.throw(403, "not_your_upload") - - ctx.state.upload = upload - - return next() -} diff --git a/src/upload/tus/uploadPatch.ts b/src/upload/tus/uploadPatch.ts deleted file mode 100644 index 158f5c2..0000000 --- a/src/upload/tus/uploadPatch.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { ResumableUpload } from "../redis/ResumableUpload.js" -import type { Middleware } from "koa" -import { log } from "../../log.js" - -export type PatchUploadState = { - upload: ResumableUpload -} - -export const uploadPatch: Middleware = async (ctx, next) => { - const { upload } = ctx.state - - const offset = Number(ctx.get("Upload-Offset")) - - if (offset !== upload.offset || isNaN(offset)) - return ctx.throw(400, "invalid_offset") - - await upload.writeToFile(ctx.req) - - ctx.set("Upload-Offset", String(upload.offset)) - ctx.set("Upload-Length", String(upload.length)) - - log.info(`upload.finished = ${upload.finished}`) - - if (upload.finished) return next() - - ctx.status = 204 -} diff --git a/src/upload/tus/uploadSend.ts b/src/upload/tus/uploadSend.ts deleted file mode 100644 index 38bba91..0000000 --- a/src/upload/tus/uploadSend.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type { ResumableUpload } from "../redis/ResumableUpload.js" -import type { Middleware } from "koa" - -export type SendUploadState = { - upload: ResumableUpload -} - -export const uploadSend: Middleware = async (ctx, next) => { - const { upload } = ctx.state - const { offset, length } = upload - - ctx.set("Upload-Offset", String(offset)) - ctx.set("Upload-Length", String(length)) - ctx.set("Cache-Control", "no-store") - ctx.status = 200 - - return next() -} diff --git a/src/video/getVideoDescriptors.ts b/src/video/getVideoDescriptors.ts index 2f6dd72..17c2652 100644 --- a/src/video/getVideoDescriptors.ts +++ b/src/video/getVideoDescriptors.ts @@ -1,9 +1,9 @@ -import { toBroadcast } from "../transcode/toBroadcast.js" -import { toTheora } from "../transcode/toTheora.js" -import { toWebM } from "../transcode/toWebM.js" +import { toBroadcast } from "./transcode/toBroadcast.js" +import { toTheora } from "./transcode/toTheora.js" +import { toWebM } from "./transcode/toWebM.js" // import { toHLS } from '../../transcode/toHLS.js' -export type VideoTranscoders = "broadcastable" | "theora" | "webm" | "hls" +export type VideoTranscoder = "broadcastable" | "theora" | "webm" | "hls" export const VideoDescriptors = [ { diff --git a/src/video/getVideoMetadata.ts b/src/video/getVideoMetadata.ts index 88140ed..5a672d4 100644 --- a/src/video/getVideoMetadata.ts +++ b/src/video/getVideoMetadata.ts @@ -1,6 +1,6 @@ import { execSync } from "child_process" import type { FfprobeData, FfprobeStream } from "fluent-ffmpeg" -import { probeVideo } from "./probeVideo.js" +import { probeFile } from "./probeFile" import { log } from "../log.js" export type VideoStats = { @@ -40,7 +40,7 @@ export type VideoMetadata = { const getVideoQuality = ( width: number, height: number, - isInterlaced: boolean + isInterlaced: boolean, ): string => { const progressiveSuffix = isInterlaced ? "i" : "p" @@ -90,7 +90,7 @@ const getVideoStats = (streams: FfprobeStream[]): VideoStats => { const videoQuality = getVideoQuality( videoStream.width, videoStream.height, - isInterlaced + isInterlaced, ) return { @@ -144,13 +144,19 @@ const getAudioQuality = (channels: number): string => { } } - export const getVideoMetadata = async ( - path: string + path: string, ): Promise => { log.info(`Running ffprobe on "${path}"`) - const probed = await probeVideo(path) + let probed: FfprobeData + + try { + probed = await probeFile(path) + } catch (err) { + log.error(`ffprobe failed: ${err}`) + return undefined + } const mime = execSync(`file -b --mime-type ${path}`).toString().trim() if (probed.streams.length < 1) { @@ -171,10 +177,9 @@ export const getVideoMetadata = async ( if (probed.format.duration === undefined) { log.error("Duration is not available!") return undefined - } - if (probed.format.duration as unknown as string === "N/A") { + if ((probed.format.duration as unknown as string) === "N/A") { log.error("Duration is not available!") return undefined } diff --git a/src/video/helpers/cleanupFiles.ts b/src/video/helpers/cleanupFiles.ts new file mode 100644 index 0000000..0b48bf9 --- /dev/null +++ b/src/video/helpers/cleanupFiles.ts @@ -0,0 +1,9 @@ +import { unlink } from "fs/promises" + +/** + * Delete files. + * @param {string[]} paths - Array of file paths to clean up. + */ +export const cleanupFiles = async (paths: string[]): Promise => { + for (const path of paths) await unlink(path) +} diff --git a/src/video/helpers/register.ts b/src/video/helpers/register.ts new file mode 100644 index 0000000..2b29682 --- /dev/null +++ b/src/video/helpers/register.ts @@ -0,0 +1,16 @@ +import { MediaService } from "../../generated" +import { FK_API_KEY } from "../../config" +import { getLocator } from "../getLocator" +import { Bucket } from "../process" + +export async function register( + uploadId: string, + outputFormat: "broadcastable" | "theora" | "webm" | "hls", + mediaId: number, +) { + await MediaService.postVideosMediaAssets(mediaId, FK_API_KEY, { + type: outputFormat, + metadata: {}, + locator: getLocator("S3", Bucket, uploadId, outputFormat), + }) +} diff --git a/src/video/helpers/store.ts b/src/video/helpers/store.ts new file mode 100644 index 0000000..1fdb656 --- /dev/null +++ b/src/video/helpers/store.ts @@ -0,0 +1,17 @@ +import type { TranscoderOutputFile } from "../transcode/types" +import { s3Client } from "../../s3Client" +import fs from "fs" +import { Bucket } from "../process" + +export async function store( + uploadId: string, + format: string, + { path, mime }: TranscoderOutputFile, +) { + await s3Client.putObject({ + Bucket, + Key: `${uploadId}/${format}`, + Body: await fs.createReadStream(path), + ContentType: mime, + }) +} diff --git a/src/video/helpers/tempDir.ts b/src/video/helpers/tempDir.ts new file mode 100644 index 0000000..b21b719 --- /dev/null +++ b/src/video/helpers/tempDir.ts @@ -0,0 +1,13 @@ +import fs from "fs" + +/** + * Generate a temporary directory. + * @param {string} jobId - The ID of the job. + * @param {string} assetName - The name of the asset. + * @return {string} - The path to the temporary directory. + */ +export const tempDir = (jobId: string, assetName: string): string => { + const tempDir = `tmp-upload/${jobId}_${assetName}` + fs.mkdirSync(tempDir) + return tempDir +} diff --git a/src/video/ingestVideo.ts b/src/video/ingestVideo.ts deleted file mode 100644 index b65402a..0000000 --- a/src/video/ingestVideo.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { createReadStream } from "fs" -import type { Middleware } from "koa" -import { getLocator } from "./getLocator.js" -import type { PatchUploadState } from "../upload/tus/uploadPatch.js" -import { getVideoMetadata } from "./getVideoMetadata.js" -import type { VideoMetadataV2 } from "./getVideoMetadata.js" -import { videoQueue } from "./queue.js" -import { FK_API_KEY } from "../constants.js" -import { MediaService } from "../generated/index.js" -import { log } from "../log.js" -import { Upload } from "@aws-sdk/lib-storage" -import { s3Client } from "../s3Client.js" -import { randomBytes } from "crypto" - -export const makeVideoKey = () => randomBytes(16).toString("hex") -// TODO: This would probably be better solved by using tusd, storing directly to S3 -// and then calling an endpoint in media-processor to get metadata/acceptance. -async function uploadVideo(path: string, Key: string, mime: string) { - const Bucket = "media" - - log.info(`Copying file to S3 as ${Bucket}/${Key}`) - - const params = { - Body: createReadStream(path), - Bucket, - Key, - ContentType: mime, - } - - const originalUpload = new Upload({ - client: s3Client, - params, - }) - - await originalUpload.done() - - log.debug("Upload complete") - - return getLocator("S3", params.Bucket, Key, "original") -} - -async function registerMedia( - originalLocator: `S3:${string}` | `CLOUDFLARE:${string}`, - filename: string, - metadata: VideoMetadataV2 -) { - log.info("Registering file on backend") - - const { id } = await MediaService.postVideosMedia(FK_API_KEY, { - locator: originalLocator, - fileName: filename, - duration: metadata.duration, - metadata, - }) - - return id -} - -async function createJob( - path: string, - mediaId: number, - metadata: VideoMetadataV2, - key: string -) { - const { id: jobIdRaw } = await videoQueue.add({ - pathToVideo: path, - mediaId, - metadata, - key, - }) - - log.debug(`bull jobId = "${jobIdRaw}" (${typeof jobIdRaw})`) - - return typeof jobIdRaw === "string" ? parseInt(jobIdRaw) : jobIdRaw -} - -export const ingestVideo: Middleware = async (ctx, next) => { - const { upload } = ctx.state - const Key = makeVideoKey() - - log.info("Upload from client complete") - - const metadata = await getVideoMetadata(upload.path) - if (!metadata) return ctx.throw(400, "Invalid file") - - const originalLocator = await uploadVideo(upload.path, Key, metadata.mime) - - const mediaId = await registerMedia( - originalLocator, - upload.filename, - metadata - ) - - const jobId = await createJob(upload.path, mediaId, metadata, Key) - - ctx.body = { mediaId, jobId } - - log.info(`Created job ${jobId} for media ${mediaId}`) - - return next() -} diff --git a/src/video/makeThumbnail.ts b/src/video/makeThumbnail.ts new file mode 100644 index 0000000..f3d7cee --- /dev/null +++ b/src/video/makeThumbnail.ts @@ -0,0 +1,51 @@ +import type { VideoJob } from "./types" +import fs from "fs" +import { log } from "../log" +import { toWebP } from "./transcode/toWebP" +import { s3Client } from "../s3Client" +import { MediaService } from "../generated" +import { FK_API_KEY } from "../config" +import { getLocator } from "./getLocator" +import { Bucket } from "./process" + +export const makeThumbnail = async ( + job: VideoJob, + thumbType: string, + dimensions: { width: number; height: number }, +) => { + const { width, height } = dimensions + const { uploadId, pathToStill, mediaId } = job.data + + if (!pathToStill) throw new Error("No input still found") + + const outputDir = `tmp-upload/${job.id}_${thumbType}` + + fs.mkdirSync(outputDir) + + log.info(`Generating thumbnail ${thumbType}`) + + const result = await toWebP({ + onProgress: (progress) => { + log.debug(`${progress}%`) + }, + inputPath: pathToStill!, + outputDir, + width, + height, + }) + + await s3Client.putObject({ + Bucket, + Key: `${uploadId}/${thumbType}`, + Body: await fs.createReadStream(result.asset.path), + ContentType: result.asset.mime, + }) + + await fs.rmSync(outputDir, { recursive: true }) + + await MediaService.postVideosMediaAssets(mediaId, FK_API_KEY, { + type: thumbType, + metadata: dimensions, + locator: getLocator("S3", Bucket, uploadId, thumbType), + }) +} diff --git a/src/video/probeVideo.ts b/src/video/probeFile.ts similarity index 78% rename from src/video/probeVideo.ts rename to src/video/probeFile.ts index a47afc4..67b97ae 100644 --- a/src/video/probeVideo.ts +++ b/src/video/probeFile.ts @@ -3,7 +3,7 @@ const { ffprobe } = pkg import type { FfprobeData } from "fluent-ffmpeg" -export const probeVideo = (path: string): Promise => +export const probeFile = (path: string): Promise => new Promise((resolve, reject) => { ffprobe(path, (err, meta) => { if (err) return reject(err) diff --git a/src/video/process.ts b/src/video/process.ts index e6f11f2..3114a7b 100644 --- a/src/video/process.ts +++ b/src/video/process.ts @@ -1,192 +1,156 @@ -import { unlink } from "fs/promises" import { getLocator } from "./getLocator.js" import { grabStill } from "./grabStill.js" import { ThumbnailDescriptors } from "./thumbnailDescriptors.js" import type { VideoJob } from "./types.js" import { log } from "../log.js" import { MediaService } from "../generated/index.js" -import { FK_API_KEY } from "../constants.js" +import { FK_API_KEY } from "../config" import fs from "fs" import { s3Client } from "../s3Client.js" +import type { VideoTranscoder } from "./getVideoDescriptors.js" import { VideoDescriptors } from "./getVideoDescriptors.js" -import { toWebP } from "../transcode/toWebP.js" -import type { TranscoderOutputFile } from "../transcode/types.js" - -const makeThumbnails = (job: VideoJob) => - Object.entries(ThumbnailDescriptors).map(async ([thumbType, dimensions]) => { - try { - const { width, height } = dimensions - const Bucket = "media" - const { key, pathToStill, mediaId } = job.data - const outputDir = `tmp-upload/${job.id}_${thumbType}` - - fs.mkdirSync(outputDir) - - log.info(`Generating thumbnail ${thumbType}`) - - const result = await toWebP({ - onProgress: (progress) => { - log.debug(`${progress}%`) - }, - inputPath: pathToStill!, - outputDir, - width, - height, - }) - - await s3Client.putObject({ - Bucket, - Key: `${key}/${thumbType}`, - Body: await fs.createReadStream(result.asset.path), - ContentType: result.asset.mime, - }) - - await fs.rmSync(outputDir, { recursive: true }) - - await MediaService.postVideosMediaAssets(mediaId, FK_API_KEY, { - type: thumbType, - metadata: dimensions, - locator: getLocator("S3", Bucket, key, thumbType), - }) - } catch (error) { - log.error(`Error generating thumbnail ${thumbType}:`, error) - throw error - } - }) +import type { Transcoder, TranscoderOutputFile } from "./transcode/types.js" +import { makeThumbnail } from "./makeThumbnail" +import { store } from "./helpers/store" +import { register } from "./helpers/register" +import { tempDir } from "./helpers/tempDir" +import { cleanupFiles } from "./helpers/cleanupFiles" -const uploadAndRegisterAsset = async ( - asset: TranscoderOutputFile, - mediaId: number, - name: string, - bucket: string, - key: string -) => { - await s3Client.putObject({ - Bucket: bucket, - Key: `${key}/${name}`, - Body: await fs.createReadStream(asset.path), - ContentType: asset.mime, - }) - - await MediaService.postVideosMediaAssets(mediaId, FK_API_KEY, { - type: name, - metadata: {}, - locator: getLocator("S3", "media", key, name), - }) -} +export const Bucket = "media" const uploadAndRegisterSubfiles = async ( subfiles: TranscoderOutputFile[], + job: VideoJob, tempDir: string, - mediaId: number, - name: string, - bucket: string, - key: string + type: string, ) => { - const substreams = subfiles.flatMap((segment) => + const { mediaId, uploadId } = job.data + + const substreams = subfiles.flatMap(({ path, mime }) => s3Client.putObject({ - Bucket: bucket, - Key: `${key}/${name}/${segment.path}`, - Body: fs.createReadStream(`${tempDir}/${segment.path}`), - ContentType: segment.mime, - }) + Bucket, + Key: `${uploadId}/${type}/${path}`, + Body: fs.createReadStream(`${tempDir}/${path}`), + ContentType: mime, + }), ) await Promise.all(substreams) await MediaService.postVideosMediaAssets(mediaId, FK_API_KEY, { - type: name, + type, metadata: {}, - locator: getLocator("S3", "media", key, "hls/manifest.m3u8"), + locator: getLocator("S3", Bucket, uploadId, "hls/manifest.m3u8"), }) } +const processAsset = async ( + outputFormat: VideoTranscoder, + transcode: Transcoder, + job: VideoJob, + onProgress: (progress: number) => void, +) => { + if (!job.id) throw new Error("Job has no ID") + + const { pathToVideo: inputPath, mediaId, uploadId } = job.data + const outputDir = tempDir(uploadId, outputFormat) + + log.info(`Generating asset ${outputFormat}`, { uploadId }) + + const { asset, subfiles } = await transcode({ + onProgress, + inputPath, + outputDir, + }) + + await store(uploadId, outputFormat, asset) + await register(uploadId, outputFormat, mediaId) + + if (subfiles) + await uploadAndRegisterSubfiles(subfiles, job, outputDir, outputFormat) + + fs.rmSync(outputDir, { recursive: true }) +} + +const handleProgress = async ( + progress: number, + assetIndex: number, + assetProgress: number[], + job: VideoJob, +) => { + // Update the progress of the specific asset + assetProgress[assetIndex] = progress + + // Average progress of all assets + const totalProgress = + assetProgress.reduce((sum, curr) => sum + curr, 0) / assetProgress.length + + // Update the job progress + await job.updateProgress(totalProgress) +} + +const makeThumbnails = async (job: VideoJob) => { + if (!job.data.pathToStill) job.data.pathToStill = await makeStill(job) + + await Promise.all( + Object.entries(ThumbnailDescriptors).map( + async ([thumbType, dimensions]) => { + try { + await makeThumbnail(job, thumbType, dimensions) + } catch (error) { + log.error(`Error generating thumbnail ${thumbType}:`, error) + throw error + } + }, + ), + ) +} export const process = async (job: VideoJob) => { try { - const bucket = "media" - const { key, pathToVideo, mediaId } = job.data + const { pathToStill, pathToVideo, mediaId } = job.data const finished = job.data.finished ?? [] log.info(`Processing start for mediaId ${mediaId}`) - const pathToStill = job.data.pathToStill ?? (await makeStill(job)) - - await Promise.all(makeThumbnails(job)) + await makeThumbnails(job) // Filter out already finished descriptors const pendingAssets = VideoDescriptors.filter( - ({ name }) => !finished.includes(name) + ({ name }) => !finished.includes(name), ) const assetProgress: number[] = new Array(pendingAssets.length).fill(0) - const handleProgress = (progress: number, assetIndex: number) => { - // Update the progress of the specific asset - assetProgress[assetIndex] = progress - - // Calculate the average progress of all assets - const totalProgress = - assetProgress.reduce((sum, curr) => sum + curr, 0) / - pendingAssets.length - - // Update the job progress - job.progress(totalProgress) - } - // Map all transcoding jobs into a concurrent list of promises const transcodingProcesses = pendingAssets.map( async ({ name, transcode }, assetIndex) => { try { - const tempDir = `tmp-upload/${job.id}_${name}` - - fs.mkdirSync(tempDir) - - log.info(`Generating asset ${name}`) - - const { asset, subfiles } = await transcode({ - onProgress: (progress) => handleProgress(progress, assetIndex), - inputPath: pathToVideo, - outputDir: tempDir, - }) - - await uploadAndRegisterAsset(asset, mediaId, name, bucket, key) - - if (subfiles) { - await uploadAndRegisterSubfiles( - subfiles, - tempDir, - mediaId, - name, - bucket, - key - ) - } - - fs.rmSync(tempDir, { recursive: true }) + await processAsset(name, transcode, job, (progress: number) => + handleProgress(progress, assetIndex, assetProgress, job), + ) finished.push(name) - await job.update({ ...job.data, finished }) + await job.updateData({ ...job.data, finished }) } catch (error: any) { log.error(`Error generating asset ${name}:`, error) - await job.update({ + await job.updateData({ ...job.data, error: `Error generating asset ${name}: ${error.message}`, }) throw error } - } + }, ) await Promise.all(transcodingProcesses) log.info(`Processing finished for mediaId ${mediaId}, cleaning up`) - await unlink(pathToVideo) - await unlink(pathToStill) + await cleanupFiles([pathToVideo, pathToStill!]) } catch (error: any) { log.error(`Error processing job for mediaId ${job.data.mediaId}:`, error) - await job.update({ + await job.updateData({ ...job.data, error: `Error processing job for mediaId ${job.data.mediaId}: ${error.message}`, }) @@ -205,8 +169,7 @@ const makeStill = async (job: VideoJob) => { const seek = Math.floor((duration ?? 0) * 0.3333) const pathToStill = await grabStill(pathToVideo, seek) - console.log("done", pathToStill) - await job.update({ ...job.data, pathToStill }) + await job.updateData({ ...job.data, pathToStill }) return pathToStill } diff --git a/src/video/queue.ts b/src/video/queue.ts index 497a232..5ac2dd8 100644 --- a/src/video/queue.ts +++ b/src/video/queue.ts @@ -1,11 +1,25 @@ -import Queue from "bull" -import { url } from "../upload/redis/connection.js" +import { Queue, QueueEvents, Worker } from "bullmq" import type { VideoJobData } from "./types.js" import { process } from "./process.js" +import { log } from "../log" +import IORedis from "ioredis" +import { REDIS_URL } from "../config" -const queue = new Queue("video-processing", url) +const connection = new IORedis(REDIS_URL) -queue.process(5, process) -queue.on("failed", console.log) +const queue = new Queue("video-processing", { + connection, +}) + +const events = new QueueEvents("video-processing", { connection }) + +new Worker("video-processing", process, { + concurrency: 5, + connection, +}) + +events.on("failed", ({ jobId, failedReason }) => + log.error(`Job ${jobId} failed`, failedReason), +) export { queue as videoQueue } diff --git a/src/transcode/toBroadcast.ts b/src/video/transcode/toBroadcast.ts similarity index 85% rename from src/transcode/toBroadcast.ts rename to src/video/transcode/toBroadcast.ts index 8a9cc47..967a576 100644 --- a/src/transcode/toBroadcast.ts +++ b/src/video/transcode/toBroadcast.ts @@ -1,5 +1,5 @@ import ffmpeg from "fluent-ffmpeg" -import type { Transcoder } from "./types.js" +import type { Transcoder } from "./types" export const toBroadcast: Transcoder = async (options) => { const { inputPath, outputDir, onProgress } = options @@ -20,9 +20,9 @@ export const toBroadcast: Transcoder = async (options) => { .size("1280x720") .autopad(true) - broadcast.on("progress", (progress) => onProgress(progress.percent)) + broadcast.on("progress", ({ percent }) => onProgress(percent)) broadcast.on("end", () => - resolve({ asset: { path: outputPath, mime: "application/mxf" } }) + resolve({ asset: { path: outputPath, mime: "application/mxf" } }), ) broadcast.on("error", (e) => reject(e)) broadcast.run() diff --git a/src/transcode/toHLS.ts b/src/video/transcode/toHLS.ts similarity index 96% rename from src/transcode/toHLS.ts rename to src/video/transcode/toHLS.ts index d502b05..2f733c1 100644 --- a/src/transcode/toHLS.ts +++ b/src/video/transcode/toHLS.ts @@ -1,4 +1,4 @@ -import type { Transcoder, TranscoderOutputFile } from "./types.js" +import type { Transcoder, TranscoderOutputFile } from "./types" import { spawn } from "node:child_process" // @ts-ignore import parse from "shell-quote/parse" diff --git a/src/transcode/toTheora.ts b/src/video/transcode/toTheora.ts similarity index 93% rename from src/transcode/toTheora.ts rename to src/video/transcode/toTheora.ts index 4e4bae8..0716d1c 100644 --- a/src/transcode/toTheora.ts +++ b/src/video/transcode/toTheora.ts @@ -1,5 +1,5 @@ import ffmpeg from "fluent-ffmpeg" -import type { Transcoder } from "./types.js" +import type { Transcoder } from "./types" export const toTheora: Transcoder = async (options) => { const { inputPath, outputDir, onProgress } = options @@ -17,7 +17,7 @@ export const toTheora: Transcoder = async (options) => { theora.on("progress", (progress) => onProgress(progress.percent)) theora.on("end", () => - resolve({ asset: { path: outputPath, mime: "video/ogg" } }) + resolve({ asset: { path: outputPath, mime: "video/ogg" } }), ) theora.on("error", (e) => reject(e)) theora.run() diff --git a/src/transcode/toWebM.ts b/src/video/transcode/toWebM.ts similarity index 94% rename from src/transcode/toWebM.ts rename to src/video/transcode/toWebM.ts index 02472ad..621acb4 100644 --- a/src/transcode/toWebM.ts +++ b/src/video/transcode/toWebM.ts @@ -1,5 +1,5 @@ import ffmpeg from "fluent-ffmpeg" -import type { Transcoder } from "./types.js" +import type { Transcoder } from "./types" export const toWebM: Transcoder = async (options) => { const { inputPath, outputDir, onProgress } = options @@ -23,7 +23,7 @@ export const toWebM: Transcoder = async (options) => { webm.on("progress", (progress) => onProgress(progress.percent)) webm.on("end", () => - resolve({ asset: { path: outputPath, mime: "video/webm" } }) + resolve({ asset: { path: outputPath, mime: "video/webm" } }), ) webm.on("error", (e) => reject(e)) diff --git a/src/transcode/toWebP.ts b/src/video/transcode/toWebP.ts similarity index 77% rename from src/transcode/toWebP.ts rename to src/video/transcode/toWebP.ts index bfa8bf6..b65ca8a 100644 --- a/src/transcode/toWebP.ts +++ b/src/video/transcode/toWebP.ts @@ -1,6 +1,6 @@ -import type { Transcoder } from "./types.js" +import type { Transcoder } from "./types" import sharp from "sharp" -import type { ThumbnailSettings } from "../video/thumbnailDescriptors.js" +import type { ThumbnailSettings } from "../thumbnailDescriptors" export const toWebP: Transcoder = async (options) => { const { inputPath, outputDir, onProgress, width, height } = options diff --git a/src/transcode/types.ts b/src/video/transcode/types.ts similarity index 82% rename from src/transcode/types.ts rename to src/video/transcode/types.ts index 894f23e..000edf2 100644 --- a/src/transcode/types.ts +++ b/src/video/transcode/types.ts @@ -1,7 +1,7 @@ export type TranscoderOptions = { inputPath: string // Input file spec outputDir: string // Base temporary directory - onProgress: (value: number) => void + onProgress: (percent: number) => void } export type TranscoderOutputFile = { @@ -15,5 +15,5 @@ export type TranscoderResult = { } export type Transcoder = ( - options: TranscoderOptions & ExtraOptions + options: TranscoderOptions & ExtraOptions, ) => Promise diff --git a/src/video/types.ts b/src/video/types.ts index 45abecd..a96c2ca 100644 --- a/src/video/types.ts +++ b/src/video/types.ts @@ -1,14 +1,14 @@ -import type { Job } from "bull" -import type { VideoTranscoders } from "./getVideoDescriptors.js" +import type { Job } from "bullmq" +import type { VideoTranscoder } from "./getVideoDescriptors.js" import type { VideoMetadataV2 } from "./getVideoMetadata.js" export type VideoJobData = { - key: string + uploadId: string pathToVideo: string pathToStill?: string metadata: VideoMetadataV2 mediaId: number - finished?: VideoTranscoders[] + finished?: VideoTranscoder[] error?: string } diff --git a/yarn.lock b/yarn.lock index cfa2f41..146b18e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2262,6 +2262,13 @@ brace-expansion@^1.1.7: balanced-match "^1.0.0" concat-map "0.0.1" +brace-expansion@^2.0.1: + version "2.0.1" + resolved "https://registry.yarnpkg.com/brace-expansion/-/brace-expansion-2.0.1.tgz#1edc459e0f0c548486ecf9fc99f2221364b9a0ae" + integrity sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA== + dependencies: + balanced-match "^1.0.0" + braces@^3.0.2, braces@~3.0.2: version "3.0.2" resolved "https://registry.yarnpkg.com/braces/-/braces-3.0.2.tgz#3454e1a462ee8d599e236df336cd9ea4f8afe107" @@ -2316,19 +2323,19 @@ buffer@^5.5.0: base64-js "^1.3.1" ieee754 "^1.1.13" -bull@^4.10.1: - version "4.10.4" - resolved "https://registry.yarnpkg.com/bull/-/bull-4.10.4.tgz#db39ee0c3bfbe3b76f1f35db800501de5bba4f84" - integrity sha512-o9m/7HjS/Or3vqRd59evBlWCXd9Lp+ALppKseoSKHaykK46SmRjAilX98PgmOz1yeVaurt8D5UtvEt4bUjM3eA== +bullmq@^4.6.0: + version "4.6.0" + resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-4.6.0.tgz#13e23ec205e1e7f75ce4c0948d14b4f5bc81a31d" + integrity sha512-RVJSC/DhruJpSoIWYS69NMZ+qTVZpWF4fZ9WKxP9QqY7WHK6Hbm+S5D7W4XN1HAExIGHTmWYZMiqKnBgxEi9XA== dependencies: - cron-parser "^4.2.1" - debuglog "^1.0.0" - get-port "^5.1.1" - ioredis "^5.0.0" + cron-parser "^4.6.0" + glob "^8.0.3" + ioredis "^5.3.2" lodash "^4.17.21" - msgpackr "^1.5.2" - semver "^7.3.2" - uuid "^8.3.0" + msgpackr "^1.6.2" + semver "^7.5.4" + tslib "^2.0.0" + uuid "^9.0.0" bytes@3.0.0: version "3.0.0" @@ -2685,7 +2692,7 @@ create-require@^1.1.0: resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333" integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ== -cron-parser@^4.2.1: +cron-parser@^4.6.0: version "4.8.1" resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.8.1.tgz#47062ea63d21d78c10ddedb08ea4c5b6fc2750fb" integrity sha512-jbokKWGcyU4gl6jAfX97E1gDpY12DJ1cLJZmoDzaAln/shZ+S3KBFBuA2Q6WeUN4gJf/8klnV1EfvhA2lK5IRQ== @@ -2722,11 +2729,6 @@ debug@^4.1.0, debug@^4.1.1, debug@^4.3.2, debug@^4.3.4: dependencies: ms "2.1.2" -debuglog@^1.0.0: - version "1.0.1" - resolved "https://registry.yarnpkg.com/debuglog/-/debuglog-1.0.1.tgz#aa24ffb9ac3df9a2351837cfb2d279360cd78492" - integrity sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw== - decompress-response@^6.0.0: version "6.0.0" resolved "https://registry.yarnpkg.com/decompress-response/-/decompress-response-6.0.0.tgz#ca387612ddb7e104bd16d85aab00d5ecf09c66fc" @@ -3101,11 +3103,6 @@ get-package-type@^0.1.0: resolved "https://registry.yarnpkg.com/get-package-type/-/get-package-type-0.1.0.tgz#8de2d803cff44df3bc6c456e6668b36c3926e11a" integrity sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q== -get-port@^5.1.1: - version "5.1.1" - resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" - integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== - get-stream@^6.0.0: version "6.0.1" resolved "https://registry.yarnpkg.com/get-stream/-/get-stream-6.0.1.tgz#a262d8eef67aced57c2852ad6167526a43cbf7b7" @@ -3135,6 +3132,17 @@ glob@^7.1.3, glob@^7.1.4: once "^1.3.0" path-is-absolute "^1.0.0" +glob@^8.0.3: + version "8.1.0" + resolved "https://registry.yarnpkg.com/glob/-/glob-8.1.0.tgz#d388f656593ef708ee3e34640fdfb99a9fd1c33e" + integrity sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ== + dependencies: + fs.realpath "^1.0.0" + inflight "^1.0.4" + inherits "2" + minimatch "^5.0.1" + once "^1.3.0" + globals@^11.1.0: version "11.12.0" resolved "https://registry.yarnpkg.com/globals/-/globals-11.12.0.tgz#ab8795338868a0babd8525758018c2a7eb95c42e" @@ -3322,7 +3330,7 @@ ini@~1.3.0: resolved "https://registry.yarnpkg.com/ini/-/ini-1.3.8.tgz#a29da425b48806f34767a4efce397269af28432c" integrity sha512-JV/yugV2uzW5iMRSiZAyDtQd+nxtUnjeLt0acNdw98kKLrvuRVyB80tsREOE7yvGVgalhZ6RNXCmEHkUKBKxew== -ioredis@^5.0.0: +ioredis@^5.3.2: version "5.3.2" resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.2.tgz#9139f596f62fc9c72d873353ac5395bcf05709f7" integrity sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA== @@ -4145,6 +4153,13 @@ minimatch@3.1.2, minimatch@^3.0.4, minimatch@^3.1.1, minimatch@^3.1.2: dependencies: brace-expansion "^1.1.7" +minimatch@^5.0.1: + version "5.1.6" + resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-5.1.6.tgz#1cfcb8cf5522ea69952cd2af95ae09477f122a96" + integrity sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g== + dependencies: + brace-expansion "^2.0.1" + minimist@^1.2.0, minimist@^1.2.3, minimist@^1.2.5: version "1.2.8" resolved "https://registry.yarnpkg.com/minimist/-/minimist-1.2.8.tgz#c1a464e7693302e082a075cee0c057741ac4772c" @@ -4184,7 +4199,7 @@ msgpackr-extract@^3.0.2: "@msgpackr-extract/msgpackr-extract-linux-x64" "3.0.2" "@msgpackr-extract/msgpackr-extract-win32-x64" "3.0.2" -msgpackr@^1.5.2: +msgpackr@^1.6.2: version "1.9.5" resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.9.5.tgz#ac548c5f4546db895e84e46d39d813be961dc527" integrity sha512-/IJ3cFSN6Ci3eG2wLhbFEL6GT63yEaoN/R5My2QkV6zro+OJaVRLPlwvxY7EtHYSmDlQpk8stvOQTL2qJFkDRg== @@ -4795,7 +4810,7 @@ semver@^6.0.0, semver@^6.3.0, semver@^6.3.1: resolved "https://registry.yarnpkg.com/semver/-/semver-6.3.1.tgz#556d2ef8689146e46dcea4bfdd095f3434dffcb4" integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA== -semver@^7.3.2, semver@^7.3.5, semver@^7.5.3, semver@^7.5.4: +semver@^7.3.5, semver@^7.5.3, semver@^7.5.4: version "7.5.4" resolved "https://registry.yarnpkg.com/semver/-/semver-7.5.4.tgz#483986ec4ed38e1c6c48c34894a9182dbff68a6e" integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA== @@ -5227,7 +5242,7 @@ tslib@^1.11.1: resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== -tslib@^2.0.3, tslib@^2.3.1, tslib@^2.5.0: +tslib@^2.0.0, tslib@^2.0.3, tslib@^2.3.1, tslib@^2.5.0: version "2.6.1" resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.1.tgz#fd8c9a0ff42590b25703c0acb3de3d3f4ede0410" integrity sha512-t0hLfiEKfMUoqhG+U1oid7Pva4bbDPHYfJNiB7BiIjRkj1pyC++4N3huJfqY6aRH6VTB0rvtzQwjM4K6qpfOig== @@ -5371,11 +5386,16 @@ uuid@8.0.0: resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.0.0.tgz#bc6ccf91b5ff0ac07bbcdbf1c7c4e150db4dbb6c" integrity sha512-jOXGuXZAWdsTH7eZLtyXMqUb9EcWMGZNbL9YcGBJl4MH4nrxHmZJhEHvyLFrkxo+28uLb/NYRcStH48fnD0Vzw== -uuid@^8.3.0, uuid@^8.3.2: +uuid@^8.3.2: version "8.3.2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== +uuid@^9.0.0: + version "9.0.0" + resolved "https://registry.yarnpkg.com/uuid/-/uuid-9.0.0.tgz#592f550650024a38ceb0c562f2f6aa435761efb5" + integrity sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg== + v8-compile-cache-lib@^3.0.1: version "3.0.1" resolved "https://registry.yarnpkg.com/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz#6336e8d71965cb3d35a1bbb7868445a7c05264bf"