From 7eef477b0f406db0bc154dc6e6b0c9a1795e6e65 Mon Sep 17 00:00:00 2001 From: Atul Morchhlay <96740143+atul24112001@users.noreply.github.com> Date: Fri, 13 Sep 2024 21:10:34 +0530 Subject: [PATCH] Fix/web socket (#89) Added error and loading screen and improved web socket security --- next-app/app/creator/[creatorId]/page.tsx | 26 ++++++--- next-app/app/dashboard/page.tsx | 19 ++++-- next-app/components/ErrorScreen.tsx | 9 +++ next-app/components/LoadingScreen.tsx | 14 +++++ next-app/context/socket-context.tsx | 30 +++++++--- ws/package.json | 5 +- ws/src/StramManager.ts | 22 ++++--- ws/src/app.ts | 71 +++++++++++++++-------- 8 files changed, 142 insertions(+), 54 deletions(-) create mode 100644 next-app/components/ErrorScreen.tsx create mode 100644 next-app/components/LoadingScreen.tsx diff --git a/next-app/app/creator/[creatorId]/page.tsx b/next-app/app/creator/[creatorId]/page.tsx index a02541a..c922abc 100644 --- a/next-app/app/creator/[creatorId]/page.tsx +++ b/next-app/app/creator/[creatorId]/page.tsx @@ -5,6 +5,8 @@ import { useSocket } from "@/context/socket-context"; import { useSession } from "next-auth/react"; import { useEffect } from "react"; import jwt from "jsonwebtoken"; +import ErrorScreen from "@/components/ErrorScreen"; +import LoadingScreen from "@/components/LoadingScreen"; export default function Creator({ params: { creatorId }, @@ -13,18 +15,17 @@ export default function Creator({ creatorId: string; }; }) { - const { socket, user } = useSocket(); - const session = useSession(); + const { socket, user, connectionError, loading, setUser } = useSocket(); useRedirect(); useEffect(() => { - if (user) { + if (user && !user.token) { const token = jwt.sign( { creatorId: creatorId, userId: user.id, }, - process.env.NEXT_PUBLIC_SECRET ?? "secret", + process.env.NEXT_PUBLIC_SECRET ?? "secret" ); socket?.send( @@ -33,14 +34,25 @@ export default function Creator({ data: { token, }, - }), + }) ); + + setUser({ ...user, token }); } }, [user]); - if (!session.data) { - return

Please Log in....

; + if (connectionError) { + return Cannot connect to socket server; + } + + if (loading) { + return ; } + + if (!user) { + return Please Log in....; + } + return ; } diff --git a/next-app/app/dashboard/page.tsx b/next-app/app/dashboard/page.tsx index 7736bfd..5b095c6 100644 --- a/next-app/app/dashboard/page.tsx +++ b/next-app/app/dashboard/page.tsx @@ -4,13 +4,15 @@ import { useSocket } from "@/context/socket-context"; import useRedirect from "../../hooks/useRedirect"; import jwt from "jsonwebtoken"; import StreamView from "../../components/StreamView"; +import ErrorScreen from "@/components/ErrorScreen"; +import LoadingScreen from "@/components/LoadingScreen"; export default function Component() { - const { socket, user, connectionError } = useSocket(); + const { socket, user, loading, setUser, connectionError } = useSocket(); useRedirect(); useEffect(() => { - if (user) { + if (user && !user.token) { const token = jwt.sign( { creatorId: user?.id, @@ -19,7 +21,7 @@ export default function Component() { process.env.NEXT_PUBLIC_SECRET || "", { expiresIn: "24h", - }, + } ); socket?.send( @@ -28,17 +30,22 @@ export default function Component() { data: { token, }, - }), + }) ); + setUser({ ...user, token }); } }, [user]); if (connectionError) { - return

Cannot connect to socket server

; + return Cannot connect to socket server; + } + + if (loading) { + return ; } if (!user) { - return

Please Log in....

; + return Please Log in....; } return ; diff --git a/next-app/components/ErrorScreen.tsx b/next-app/components/ErrorScreen.tsx new file mode 100644 index 0000000..9260fdb --- /dev/null +++ b/next-app/components/ErrorScreen.tsx @@ -0,0 +1,9 @@ +import React, { PropsWithChildren } from "react"; + +export default function ErrorScreen({ children }: PropsWithChildren) { + return ( +
+ {children} +
+ ); +} diff --git a/next-app/components/LoadingScreen.tsx b/next-app/components/LoadingScreen.tsx new file mode 100644 index 0000000..4694af5 --- /dev/null +++ b/next-app/components/LoadingScreen.tsx @@ -0,0 +1,14 @@ +import React from "react"; + +export default function LoadingScreen() { + return ( +
+
+ Loading... +
+
+
+
+
+ ); +} diff --git a/next-app/context/socket-context.tsx b/next-app/context/socket-context.tsx index c578cd5..9d844c9 100644 --- a/next-app/context/socket-context.tsx +++ b/next-app/context/socket-context.tsx @@ -1,6 +1,8 @@ import { useSession } from "next-auth/react"; import { + Dispatch, PropsWithChildren, + SetStateAction, createContext, useContext, useEffect, @@ -9,20 +11,25 @@ import { type SocketContextType = { socket: null | WebSocket; - user: null | { id: string }; + user: null | { id: string; token?: string }; connectionError: boolean; + setUser: Dispatch>; + loading: boolean; }; const SocketContext = createContext({ socket: null, user: null, connectionError: false, + setUser: () => {}, + loading: true, }); export const SocketContextProvider = ({ children }: PropsWithChildren) => { const [socket, setSocket] = useState(null); - const [user, setUser] = useState<{ id: string } | null>(null); + const [user, setUser] = useState<{ id: string; token?: string } | null>(null); const [connectionError, setConnectionError] = useState(false); + const [loading, setLoading] = useState(true); const session = useSession(); useEffect(() => { @@ -30,16 +37,19 @@ export const SocketContextProvider = ({ children }: PropsWithChildren) => { const ws = new WebSocket(process.env.NEXT_PUBLIC_WSS_URL as string); ws.onopen = () => { setSocket(ws); - setUser(session.data?.user); + setUser(session.data?.user || null); + setLoading(false); }; ws.onclose = () => { setSocket(null); + setLoading(false); }; ws.onerror = () => { setSocket(null); setConnectionError(true); + setLoading(false); }; () => { @@ -54,6 +64,8 @@ export const SocketContextProvider = ({ children }: PropsWithChildren) => { socket, user, connectionError, + setUser, + loading, }} > {children} @@ -62,16 +74,20 @@ export const SocketContextProvider = ({ children }: PropsWithChildren) => { }; export const useSocket = () => { - const { socket, user, connectionError } = useContext(SocketContext); + const { socket, user, setUser, connectionError, loading } = + useContext(SocketContext); const sendMessage = (type: string, data: { [key: string]: any }) => { socket?.send( JSON.stringify({ type, - data, - }), + data: { + ...data, + token: user?.token, + }, + }) ); }; - return { socket, sendMessage, user, connectionError }; + return { socket, loading, setUser, sendMessage, user, connectionError }; }; diff --git a/ws/package.json b/ws/package.json index cc810d7..e0e9921 100644 --- a/ws/package.json +++ b/ws/package.json @@ -5,6 +5,7 @@ "main": "index.js", "scripts": { "preinstall": "npx only-allow pnpm", + "postinstall": "prisma generate", "dev": "nodemon src/app.ts", "start": "node dist/app.js", "build": "prisma generate && npx tsc -b", @@ -14,7 +15,7 @@ "author": "", "license": "ISC", "dependencies": { - "@prisma/client": "^5.19.1", + "@prisma/client": "5.19.1", "axios": "^1.7.7", "bullmq": "^5.12.14", "cors": "^2.8.5", @@ -32,4 +33,4 @@ "ts-node": "^10.9.2", "typescript": "^5.6.2" } -} +} \ No newline at end of file diff --git a/ws/src/StramManager.ts b/ws/src/StramManager.ts index cc20061..36741ef 100644 --- a/ws/src/StramManager.ts +++ b/ws/src/StramManager.ts @@ -1,12 +1,12 @@ import { WebSocket } from "ws"; import { createClient, RedisClientType } from "redis"; -import { PrismaClient } from "@prisma/client"; //@ts-ignore import youtubesearchapi from "youtube-search-api"; import { Job, Queue, Worker } from "bullmq"; +import { PrismaClient } from "@prisma/client"; -const TIME_SPAN_FOR_VOTE = 1200000 / 40; // 20min -const TIME_SPAN_FOR_QUEUE = 1200000 / 40; // 20min +const TIME_SPAN_FOR_VOTE = 1200000; // 20min +const TIME_SPAN_FOR_QUEUE = 1200000; // 20min const TIME_SPAN_FOR_REPEAT = 3600000; const MAX_QUEUE_LENGTH = 20; @@ -91,9 +91,6 @@ export class RoomManager { await this.redisClient.connect(); await this.subscriber.connect(); await this.publisher.connect(); - this.worker.on("error", () => { - console.log("Worker ready"); - }); } onSubscribeRoom(message: string, creatorId: string) { @@ -141,14 +138,20 @@ export class RoomManager { } } - async addUser(userId: string, ws: WebSocket) { + async addUser(userId: string, ws: WebSocket, token: string) { this.users.set(userId, { userId, ws, + token, }); } - async joinRoom(creatorId: string, userId: string, ws: WebSocket) { + async joinRoom( + creatorId: string, + userId: string, + ws: WebSocket, + token: string + ) { let room = this.rooms.get(creatorId); let user = this.users.get(userId); @@ -158,7 +161,7 @@ export class RoomManager { } if (!user) { - await this.addUser(userId, ws); + await this.addUser(userId, ws, token); user = this.users.get(userId); } @@ -645,6 +648,7 @@ export class RoomManager { type User = { userId: string; ws: WebSocket; + token: string; }; type Room = { diff --git a/ws/src/app.ts b/ws/src/app.ts index a5c2d16..19efd9e 100644 --- a/ws/src/app.ts +++ b/ws/src/app.ts @@ -56,33 +56,58 @@ async function main() { RoomManager.getInstance().joinRoom( decoded.creatorId, decoded.userId, - ws + ws, + data.token ); } } ); - } else if (type === "cast-vote") { - await RoomManager.getInstance().castVote( - data.creatorId, - data.userId, - data.streamId, - data.vote - ); - } else if (type === "add-to-queue") { - await RoomManager.getInstance().addToQueue( - data.creatorId, - data.userId, - data.url - ); - } else if (type === "play-next") { - await RoomManager.getInstance().queue.add("play-next", { - creatorId: data.creatorId, - userId: data.userId, - }); - } else if (type === "remove-song") { - await RoomManager.getInstance().queue.add("remove-song", data); - } else if (type === "empty-queue") { - await RoomManager.getInstance().queue.add("empty-queue", data); + } else { + const user = RoomManager.getInstance().users.get(data.userId); + // Adding this to verify the user who is sending this message is not mocking other user. + if (user && data.token === user?.token) { + data.userId = user.userId; + if (type === "cast-vote") { + await RoomManager.getInstance().castVote( + data.creatorId, + data.userId, + data.streamId, + data.vote + ); + } else if (type === "add-to-queue") { + await RoomManager.getInstance().addToQueue( + data.creatorId, + data.userId, + data.url + ); + } else if (type === "play-next") { + await RoomManager.getInstance().queue.add("play-next", { + creatorId: data.userId, + userId: data.userId, + }); + } else if (type === "remove-song") { + await RoomManager.getInstance().queue.add("remove-song", { + ...data, + creatorId: data.userId, + userId: data.userId, + }); + } else if (type === "empty-queue") { + await RoomManager.getInstance().queue.add("empty-queue", { + ...data, + creatorId: data.userId, + userId: data.userId, + }); + } + } else { + ws.send( + JSON.stringify({ + type: "error", + data: { + message: "You are unauthorized to perform this action", + }, + }) + ); + } } });