Skip to content

Commit

Permalink
Comprehensive refactor
Browse files Browse the repository at this point in the history
- Use tusd binary rather than roll our own
- Clean up logic, refactor
- Probably still crashes, but will do so with far more aplomb and panache
  • Loading branch information
toresbe committed Jul 26, 2023
1 parent 3d2041e commit 41b13e3
Show file tree
Hide file tree
Showing 48 changed files with 677 additions and 768 deletions.
23 changes: 22 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,32 @@ services:
redis-gui:
image: rediscommander/redis-commander
environment:
- REDIS_HOSTS=local:connection:6379
- REDIS_HOSTS=local:redis:6379
ports:
- "8180:8081"

s3:
image: scireum/s3-ninja
ports:
- "9000:9000"

tusd:
image: tusproject/tusd
volumes:
- ./tmp-upload:/srv/tusd-data/data
entrypoint: ["tusd"]
command: [
"-hooks-http", "http://localhost:8001/tusd-hooks",
"-hooks-http-forward-headers", "Authorization",
"-behind-proxy",
"-hooks-enabled-events", "pre-finish",
]
network_mode: host

nginx:
build:
context: nginxKludge
dockerfile: Dockerfile.nginx
depends_on:
- tusd
network_mode: host
2 changes: 2 additions & 0 deletions nginxKludge/Dockerfile.nginx
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM nginx:alpine
COPY nginx.conf /etc/nginx/conf.d/default.conf
14 changes: 14 additions & 0 deletions nginxKludge/nginx.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
server {
listen *:8002;
client_max_body_size 100M;

location /files {

add_header "Access-Control-Allow-Credentials" "true";
proxy_hide_header Access-Control-Allow-Origin;
add_header Access-Control-Allow-Origin $http_origin;
add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, x-csrf-token";
add_header "Access-Control-Allow-Methods" "POST, HEAD, PATCH, OPTIONS, GET, DELETE";
proxy_pass http://localhost:1080/files;
}
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
"@koa/router": "^12.0.0",
"aws-sdk": "^2.1421.0",
"axios": "^1.4.0",
"bull": "^4.10.1",
"bullmq": "^4.6.0",
"dotenv": "^16.3.1",
"ffmpeg": "^0.0.4",
"fluent-ffmpeg": "^2.1.2",
"http-event-stream": "^0.2.0",
"ioredis": "^5.3.2",
"jest": "^29.6.1",
"kafkajs": "^2.2.2",
"koa": "^2.13.4",
Expand Down
34 changes: 34 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
export const IS_PROD = process.env["NODE_ENV"] === "production"
export const TEMP_UPLOAD_FOLDER = "tmp-upload"
export const SECRET_KEY_HEADER = "X-Api-Key"
export const FK_API_KEY = process.env["FK_API_KEY"]!
export const FK_API = process.env["FK_API"]!

if (!FK_API) {
throw new Error(
"FK_API is not set! Please set it to the base url of the api to communicate with.",
)
}

if (!FK_API_KEY) {
throw new Error(
"FK_API_KEY is not set, this means internal endpoints will be unreachable",
)
}

export const MEDIA_BUCKET = "media"
const getRedisURL = () => {
const { REDIS_SERVICE_HOST, REDIS_SERVICE_PORT, REDIS_URL } = process.env

if (REDIS_SERVICE_HOST?.length && REDIS_SERVICE_PORT?.length)
return `redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}`

if (!REDIS_URL?.length) {
throw new Error(
"REDIS_SERVICE_HOST && REDIS_SERVICE_PORT or REDIS_URL is not set!",
)
}

return REDIS_URL
}
export const REDIS_URL = getRedisURL()
17 changes: 0 additions & 17 deletions src/constants.ts

This file was deleted.

4 changes: 2 additions & 2 deletions src/log.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Logger } from "tslog"
import type { ILogObj } from "tslog"
import { IS_PROD } from "./constants.js"
import { IS_PROD } from "./config"
import type { Middleware } from "koa"

const LogLevels = [
Expand All @@ -14,7 +14,7 @@ const LogLevels = [
] as const

export const getLogLevel = (): number => {
const LOG_LEVEL = process.env["LOG_LEVEL"] as
const LOG_LEVEL = process.env["LOG_LEVEL"]?.toUpperCase() as
| (typeof LogLevels)[number]
| undefined

Expand Down
41 changes: 30 additions & 11 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
import Queue from "bull"
import { Queue } from "bullmq"
import type { VideoJobData } from "./video/types.js"
import { url } from "./upload/redis/connection.js"
import { Gauge, register } from "prom-client"
import type { Middleware } from "koa"
import { log } from "./log.js"
import IORedis from "ioredis"
import { REDIS_URL } from "./config"
const connection = new IORedis(REDIS_URL)

const queue = new Queue<VideoJobData>("video-processing", url)
const queue = new Queue<VideoJobData>("video-processing", { connection })

const getCounts = async () => {
const { waiting, active, delayed, failed } = await queue.getJobCounts()
if (!waiting || !active || !delayed || !failed) {
throw new Error(`Failed to get job counts`)
}
const size = waiting + active + delayed + failed

return {
size,
waiting,
active,
delayed,
failed,
}
}

const processQueueSizeTotal = new Gauge({
name: "process_queue_size_total",
help: "Total number of jobs in the processing queue",
collect: async () => {
const counts = await queue.getJobCounts()
const size = counts.waiting + counts.active + counts.delayed
const { size } = await getCounts()
processQueueSizeTotal.set(size)
},
})
Expand All @@ -21,26 +38,28 @@ const processQueuePending = new Gauge({
name: "process_queue_pending",
help: "Number of pending jobs in the processing queue",
collect: async () => {
const counts = await queue.getJobCounts()
processQueuePending.set(counts.waiting + counts.delayed)
const { waiting, delayed } = await getCounts()

processQueuePending.set(waiting + delayed)
},
})

const processQueueActive = new Gauge({
name: "process_queue_active",
help: "Number of active jobs in the processing queue",
collect: async () => {
const counts = await queue.getJobCounts()
processQueueActive.set(counts.active)
const { active } = await getCounts()

processQueueActive.set(active)
},
})

const processQueueFailed = new Gauge({
name: "process_queue_failed",
help: "Number of failed jobs in the processing queue",
collect: async () => {
const counts = await queue.getJobCounts()
processQueueFailed.set(counts.failed)
const { failed } = await getCounts()
processQueueFailed.set(failed)
},
})
export const showMetrics: Middleware = async (ctx, next) => {
Expand Down
9 changes: 6 additions & 3 deletions src/middleware/authenticate.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Middleware } from "koa"
import axios from "axios"
import { FK_API, FK_API_KEY, SECRET_KEY_HEADER } from "../constants.js"
import { FK_API, FK_API_KEY, SECRET_KEY_HEADER } from "../config"
import { log } from "../log"

export type AuthUser = {
id: number
Expand All @@ -16,15 +17,16 @@ export type AuthState = {

export type Options = {
required?: true
cookieGetter?: (ctx: any) => string
}

// TODO: Some caching might be in order here, so that we don't hammer the API
// with a user profile load every time we upload a chunk of data
export const authenticate =
(options: Options): Middleware<AuthState> =>
async (ctx, next) => {
const { required } = options
const cookie = ctx.get("Cookie")
const { required, cookieGetter } = options
const cookie = cookieGetter ? cookieGetter(ctx) : ctx.get("Cookie")

const { data, status } = await axios.get<AuthData>("/auth/user", {
baseURL: FK_API,
Expand All @@ -39,6 +41,7 @@ export const authenticate =
if (status !== 200) return ctx.throw(500, "auth_server_error")

if (!data.user) {
log.info(data.user)
if (required) return ctx.throw(401)
} else {
ctx.state.user = data.user
Expand Down
26 changes: 3 additions & 23 deletions src/middleware/sendCORSDev.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@
import type { Middleware } from "koa"

export const sendCORSDev = (): Middleware => (context, next) => {
const { method } = context

context.set("Access-Control-Allow-Origin", "http://localhost:3000")
context.set("Access-Control-Allow-Credentials", "true")
context.set(
"Access-Control-Allow-Methods",
"GET,PUT,POST,PATCH,DELETE,OPTIONS,HEAD"
)

context.set(
"Access-Control-Allow-Headers",
[
"Content-Type",
"Tus-Resumable",
"Upload-Offset",
"Upload-Length",
"Upload-Metadata",
"X-CSRF-Token",
"Location",
].join(", ")
"GET,PUT,POST,PATCH,DELETE,OPTIONS,HEAD",
)

context.set(
"Access-Control-Expose-Headers",
["Location", "Upload-Offset", "Upload-Length"].join(", ")
)
context.set("Access-Control-Allow-Headers", "X-CSRF-Token")

if (method === "OPTIONS") {
context.status = 200
}
if (context.method === "OPTIONS") context.status = 200

return next()
}
15 changes: 8 additions & 7 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import "dotenv/config"

import { connection } from "./upload/redis/connection.js"
import Koa from "koa"
import bodyParser from "koa-bodyparser"
import { handleError } from "./middleware/handleError.js"
//import { sendCORSDev } from "./middleware/sendCORSDev.js"
import { videoRouter } from "./upload/router.js"
import { log, requestLogger } from "./log.js"
import { FK_API, FK_API_KEY, IS_PROD, SECRET_KEY_HEADER } from "./constants.js"
import { FK_API, FK_API_KEY, IS_PROD, SECRET_KEY_HEADER } from "./config"
import { OpenAPI } from "./generated/index.js"
import { statusUpdate } from "./status/router.js"

import { showMetrics } from "./metrics.js"
import { sendCORSDev } from "./middleware/sendCORSDev"
import { uploadHookRouter } from "./tusHook/router"

OpenAPI.BASE = FK_API

Expand All @@ -30,13 +29,15 @@ log.info({ IS_PROD })
app.use(requestLogger())
app.use(handleError)
app.use(bodyParser())
//if (!IS_PROD) app.use(sendCORSDev())
if (!IS_PROD) {
log.warn("Dev mode: Enabling CORS for localhost")
app.use(sendCORSDev())
}
app.use(showMetrics)
app.use(videoRouter.prefix("/upload/video").routes())
app.use(uploadHookRouter.prefix("/tusd-hooks").routes())
app.use(statusUpdate("/upload/status"))

async function main() {
await connection.connect()
app.listen(port, () => log.info(`media-processor listening on port ${port}`))
}

Expand Down
31 changes: 15 additions & 16 deletions src/status/router.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Context, Next } from "koa"
import { videoQueue } from "../video/queue.js"
import { PassThrough } from "stream"

const streamEventsMode = async (ctx: Context) => {
Expand All @@ -21,6 +20,7 @@ const streamEventsMode = async (ctx: Context) => {
* HTTP Event Stream showing the progress of the encoding job
*
* @param prefix URL prefix
* @deprecated The code as it stands is vestigial and should be replaced
*/
export const statusUpdate =
(prefix: string) => async (ctx: Context, next: Next) => {
Expand All @@ -31,21 +31,20 @@ export const statusUpdate =

await streamEventsMode(ctx)

const responseStream = new PassThrough()
ctx.response.body = responseStream

const job = await videoQueue.getJob(jobId)
if (job === null) return ctx.throw(404, "job_does_not_exist")

const response = {
jobId,
progress: job.progress(),
}

responseStream.write("event: name\n")
responseStream.write(`data: ${JSON.stringify(response)}\n\n`)

if (await job.finished()) responseStream.end()
ctx.response.body = new PassThrough()

//const job = await videoQueue.getJob(`${jobId}`)
//if (!job) return ctx.throw(404, "job_does_not_exist")
//
//const response = {
// jobId,
// progress: job.progress(),
//}
//
//responseStream.write("event: name\n")
//responseStream.write(`data: ${JSON.stringify(response)}\n\n`)
//
//if (await job) responseStream.end()

return next()
}
Loading

0 comments on commit 41b13e3

Please sign in to comment.