Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/web socket #89

Merged
merged 7 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions next-app/app/creator/[creatorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { useSocket } from "@/context/socket-context";
import { useSession } from "next-auth/react";
import { useEffect } from "react";
import jwt from "jsonwebtoken";
import ErrorScreen from "@/components/ErrorScreen";
import LoadingScreen from "@/components/LoadingScreen";

export default function Creator({
params: { creatorId },
Expand All @@ -13,18 +15,17 @@ export default function Creator({
creatorId: string;
};
}) {
const { socket, user } = useSocket();
const session = useSession();
const { socket, user, connectionError, loading, setUser } = useSocket();
useRedirect();

useEffect(() => {
if (user) {
if (user && !user.token) {
const token = jwt.sign(
{
creatorId: creatorId,
userId: user.id,
},
process.env.NEXT_PUBLIC_SECRET ?? "secret",
process.env.NEXT_PUBLIC_SECRET ?? "secret"
);

socket?.send(
Expand All @@ -33,14 +34,25 @@ export default function Creator({
data: {
token,
},
}),
})
);

setUser({ ...user, token });
}
}, [user]);

if (!session.data) {
return <h1>Please Log in....</h1>;
if (connectionError) {
return <ErrorScreen>Cannot connect to socket server</ErrorScreen>;
}

if (loading) {
return <LoadingScreen />;
}

if (!user) {
return <ErrorScreen>Please Log in....</ErrorScreen>;
}

return <StreamView creatorId={creatorId} playVideo={false} />;
}

Expand Down
19 changes: 13 additions & 6 deletions next-app/app/dashboard/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import { useSocket } from "@/context/socket-context";
import useRedirect from "../../hooks/useRedirect";
import jwt from "jsonwebtoken";
import StreamView from "../../components/StreamView";
import ErrorScreen from "@/components/ErrorScreen";
import LoadingScreen from "@/components/LoadingScreen";

export default function Component() {
const { socket, user, connectionError } = useSocket();
const { socket, user, loading, setUser, connectionError } = useSocket();
useRedirect();

useEffect(() => {
if (user) {
if (user && !user.token) {
const token = jwt.sign(
{
creatorId: user?.id,
Expand All @@ -19,7 +21,7 @@ export default function Component() {
process.env.NEXT_PUBLIC_SECRET || "",
{
expiresIn: "24h",
},
}
);

socket?.send(
Expand All @@ -28,17 +30,22 @@ export default function Component() {
data: {
token,
},
}),
})
);
setUser({ ...user, token });
}
}, [user]);

if (connectionError) {
return <h1>Cannot connect to socket server</h1>;
return <ErrorScreen>Cannot connect to socket server</ErrorScreen>;
}

if (loading) {
return <LoadingScreen />;
}

if (!user) {
return <h1>Please Log in....</h1>;
return <ErrorScreen>Please Log in....</ErrorScreen>;
}

return <StreamView creatorId={user.id} playVideo={true} />;
Expand Down
9 changes: 9 additions & 0 deletions next-app/components/ErrorScreen.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import React, { PropsWithChildren } from "react";

export default function ErrorScreen({ children }: PropsWithChildren) {
return (
<div className="w-screen h-screen flex justify-center items-center bg-[rgb(10,10,10)] text-gray-200">
{children}
</div>
);
}
14 changes: 14 additions & 0 deletions next-app/components/LoadingScreen.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import React from "react";

export default function LoadingScreen() {
return (
<div className=" bg-[rgb(10,10,10)] text-gray-200">
<div className="flex space-x-2 justify-center items-center w-screen h-screen dark:invert">
<span className="sr-only">Loading...</span>
<div className="h-8 w-8 bg-white rounded-full animate-bounce [animation-delay:-0.3s]"></div>
<div className="h-8 w-8 bg-white rounded-full animate-bounce [animation-delay:-0.15s]"></div>
<div className="h-8 w-8 bg-white rounded-full animate-bounce"></div>
</div>
</div>
);
}
30 changes: 23 additions & 7 deletions next-app/context/socket-context.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { useSession } from "next-auth/react";
import {
Dispatch,
PropsWithChildren,
SetStateAction,
createContext,
useContext,
useEffect,
Expand All @@ -9,37 +11,45 @@ import {

type SocketContextType = {
socket: null | WebSocket;
user: null | { id: string };
user: null | { id: string; token?: string };
connectionError: boolean;
setUser: Dispatch<SetStateAction<{ id: string; token?: string } | null>>;
loading: boolean;
};

const SocketContext = createContext<SocketContextType>({
socket: null,
user: null,
connectionError: false,
setUser: () => {},
loading: true,
});

export const SocketContextProvider = ({ children }: PropsWithChildren) => {
const [socket, setSocket] = useState<WebSocket | null>(null);
const [user, setUser] = useState<{ id: string } | null>(null);
const [user, setUser] = useState<{ id: string; token?: string } | null>(null);
const [connectionError, setConnectionError] = useState<boolean>(false);
const [loading, setLoading] = useState(true);
const session = useSession();

useEffect(() => {
if (!socket && session.data?.user.id) {
const ws = new WebSocket(process.env.NEXT_PUBLIC_WSS_URL as string);
ws.onopen = () => {
setSocket(ws);
setUser(session.data?.user);
setUser(session.data?.user || null);
setLoading(false);
};

ws.onclose = () => {
setSocket(null);
setLoading(false);
};

ws.onerror = () => {
setSocket(null);
setConnectionError(true);
setLoading(false);
};

() => {
Expand All @@ -54,6 +64,8 @@ export const SocketContextProvider = ({ children }: PropsWithChildren) => {
socket,
user,
connectionError,
setUser,
loading,
}}
>
{children}
Expand All @@ -62,16 +74,20 @@ export const SocketContextProvider = ({ children }: PropsWithChildren) => {
};

export const useSocket = () => {
const { socket, user, connectionError } = useContext(SocketContext);
const { socket, user, setUser, connectionError, loading } =
useContext(SocketContext);

const sendMessage = (type: string, data: { [key: string]: any }) => {
socket?.send(
JSON.stringify({
type,
data,
}),
data: {
...data,
token: user?.token,
},
})
);
};

return { socket, sendMessage, user, connectionError };
return { socket, loading, setUser, sendMessage, user, connectionError };
};
5 changes: 3 additions & 2 deletions ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "index.js",
"scripts": {
"preinstall": "npx only-allow pnpm",
"postinstall": "prisma generate",
"dev": "nodemon src/app.ts",
"start": "node dist/app.js",
"build": "prisma generate && npx tsc -b",
Expand All @@ -14,7 +15,7 @@
"author": "",
"license": "ISC",
"dependencies": {
"@prisma/client": "^5.19.1",
"@prisma/client": "5.19.1",
"axios": "^1.7.7",
"bullmq": "^5.12.14",
"cors": "^2.8.5",
Expand All @@ -32,4 +33,4 @@
"ts-node": "^10.9.2",
"typescript": "^5.6.2"
}
}
}
22 changes: 13 additions & 9 deletions ws/src/StramManager.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { WebSocket } from "ws";
import { createClient, RedisClientType } from "redis";
import { PrismaClient } from "@prisma/client";
//@ts-ignore
import youtubesearchapi from "youtube-search-api";
import { Job, Queue, Worker } from "bullmq";
import { PrismaClient } from "@prisma/client";

const TIME_SPAN_FOR_VOTE = 1200000 / 40; // 20min
const TIME_SPAN_FOR_QUEUE = 1200000 / 40; // 20min
const TIME_SPAN_FOR_VOTE = 1200000; // 20min
const TIME_SPAN_FOR_QUEUE = 1200000; // 20min
const TIME_SPAN_FOR_REPEAT = 3600000;
const MAX_QUEUE_LENGTH = 20;

Expand Down Expand Up @@ -91,9 +91,6 @@ export class RoomManager {
await this.redisClient.connect();
await this.subscriber.connect();
await this.publisher.connect();
this.worker.on("error", () => {
console.log("Worker ready");
});
}

onSubscribeRoom(message: string, creatorId: string) {
Expand Down Expand Up @@ -141,14 +138,20 @@ export class RoomManager {
}
}

async addUser(userId: string, ws: WebSocket) {
async addUser(userId: string, ws: WebSocket, token: string) {
this.users.set(userId, {
userId,
ws,
token,
});
}

async joinRoom(creatorId: string, userId: string, ws: WebSocket) {
async joinRoom(
creatorId: string,
userId: string,
ws: WebSocket,
token: string
) {
let room = this.rooms.get(creatorId);
let user = this.users.get(userId);

Expand All @@ -158,7 +161,7 @@ export class RoomManager {
}

if (!user) {
await this.addUser(userId, ws);
await this.addUser(userId, ws, token);
user = this.users.get(userId);
}

Expand Down Expand Up @@ -645,6 +648,7 @@ export class RoomManager {
type User = {
userId: string;
ws: WebSocket;
token: string;
};

type Room = {
Expand Down
Loading
Loading