Skip to content

Commit

Permalink
feat: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl committed Oct 15, 2024
1 parent b4fdf77 commit 48ec355
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 49 deletions.
47 changes: 45 additions & 2 deletions keep-ui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions keep-ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
"has-symbols": "^1.0.3",
"has-tostringtag": "^1.0.0",
"hoist-non-react-statics": "^3.3.2",
"http-proxy": "^1.18.1",
"human-signals": "^4.3.1",
"ignore": "^5.2.4",
"import-fresh": "^3.3.0",
Expand Down Expand Up @@ -368,6 +369,7 @@
"util-deprecate": "^1.0.2",
"uuid": "^8.3.2",
"victory-vendor": "^36.6.10",
"ws": "^8.18.0",
"yallist": "^4.0.0",
"yaml": "^2.2.2",
"yocto-queue": "^0.1.0",
Expand All @@ -377,6 +379,7 @@
"@next/bundle-analyzer": "^14.2.15",
"@tailwindcss/typography": "^0.5.12",
"@types/d3-time-format": "^4.0.3",
"@types/http-proxy": "^1.17.15",
"@types/js-cookie": "^3.0.3",
"@types/js-yaml": "^4.0.5",
"@types/json-logic-js": "^2.0.7",
Expand All @@ -388,6 +391,7 @@
"@types/react-modal": "^3.16.0",
"@types/react-timeago": "^4.1.7",
"@types/uuid": "^9.0.1",
"@types/ws": "^8.5.12",
"@typescript-eslint/parser": "^5.62.0",
"clsx": "^2.1.1",
"elkjs": "^0.9.3",
Expand Down
71 changes: 71 additions & 0 deletions keep-ui/pages/api/pusher/proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// pages/api/pusher/proxy.ts
import { NextApiRequest, NextApiResponse } from "next";
import { createServer, IncomingMessage, ServerResponse } from "http";
import { parse } from "url";
import { WebSocketServer, WebSocket } from "ws";
import httpProxy from "http-proxy";

const pusherHost =
process.env.PUSHER_HOST || `ws-${process.env.PUSHER_CLUSTER}.pusher.com`;
const pusherPort = process.env.PUSHER_PORT
? parseInt(process.env.PUSHER_PORT)
: 443;
const pusherProtocol = process.env.PUSHER_PORT ? "ws:" : "wss:";

const proxy = httpProxy.createProxyServer({
target: {
protocol: pusherProtocol,
host: pusherHost,
port: pusherPort,
},
ws: true,
changeOrigin: true,
});

const wsServer = new WebSocketServer({ noServer: true });

const handler = (req: NextApiRequest, res: NextApiResponse) => {
if (req.method === "GET") {
res.status(200).json({ message: "WebSocket proxy server" });
} else {
res.status(405).json({ message: "Method not allowed" });
}
};

export const config = {
api: {
bodyParser: false,
},
};

if (!(process as any).wss) {
(process as any).wss = createServer(
(req: IncomingMessage, res: ServerResponse) => {
const parsedUrl = parse(req.url!, true);
if (parsedUrl.pathname === "/api/pusher/proxy") {
handler(req as NextApiRequest, res as NextApiResponse);
} else {
res.writeHead(404).end();
}
}
);

(process as any).wss.on(
"upgrade",
(request: IncomingMessage, socket: any, head: Buffer) => {
const { pathname } = parse(request.url || "");

if (pathname === "/api/pusher/proxy") {
wsServer.handleUpgrade(request, socket, head, (ws: WebSocket) => {
proxy.ws(request, socket, head);
});
} else {
socket.destroy();
}
}
);

(process as any).wss.listen(parseInt(process.env.PORT || "3000"));
}

export default handler;
122 changes: 75 additions & 47 deletions keep-ui/utils/hooks/usePusher.ts
Original file line number Diff line number Diff line change
@@ -1,98 +1,126 @@
import Pusher from "pusher-js";
import { useConfig } from "./useConfig";
import { useSession } from "next-auth/react";
import { getApiURL } from "utils/apiUrl";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";

let PUSHER: Pusher | null = null;
const POLLING_INTERVAL = 3000;

export const useWebsocket = () => {
const apiUrl = getApiURL();
const { data: configData } = useConfig();
const { data: session } = useSession();
let channelName = `private-${session?.tenantId}`;

if (
PUSHER === null &&
configData !== undefined &&
session !== undefined &&
configData.PUSHER_DISABLED === false
) {
channelName = `private-${session?.tenantId}`;
PUSHER = new Pusher(configData.PUSHER_APP_KEY, {
wsHost: configData.PUSHER_HOST,
wsPort: configData.PUSHER_PORT,
forceTLS: false,
disableStats: true,
enabledTransports: ["ws", "wss"],
cluster: configData.PUSHER_CLUSTER || "local",
channelAuthorization: {
transport: "ajax",
endpoint: `${apiUrl}/pusher/auth`,
headers: {
Authorization: `Bearer ${session?.accessToken!}`,
},
},
});
PUSHER.subscribe(channelName);
}
const [error, setError] = useState<string | null>(null);

const channelName = useMemo(() => {
return session?.tenantId ? `private-${session.tenantId}` : null;
}, [session?.tenantId]);

useEffect(() => {
if (
PUSHER === null &&
configData &&
session &&
channelName &&
!configData.PUSHER_DISABLED
) {
try {
PUSHER = new Pusher(configData.PUSHER_APP_KEY, {
cluster: configData.PUSHER_CLUSTER || "local",
forceTLS: true,
authEndpoint: "/api/pusher/auth",
auth: {
headers: {
Authorization: `Bearer ${session.accessToken}`,
},
},
wsHost: window.location.hostname,
wsPort: window.location.port
? parseInt(window.location.port)
: window.location.protocol === "https:"
? 443
: 80,
wssPort: window.location.port
? parseInt(window.location.port)
: window.location.protocol === "https:"
? 443
: 80,
enabledTransports: ["ws", "wss"],
disabledTransports: ["xhr_streaming", "xhr_polling", "sockjs"],
});
PUSHER.subscribe(channelName);
} catch (err) {
setError(`Failed to initialize Pusher: ${(err as Error).message}`);
}
}

return () => {
if (PUSHER && channelName) {
PUSHER.unsubscribe(channelName);
PUSHER.disconnect();
PUSHER = null;
}
};
}, [configData, session, channelName]);

const subscribe = useCallback(() => {
return PUSHER?.subscribe(channelName);
if (PUSHER && channelName) {
return PUSHER.subscribe(channelName);
}
}, [channelName]);

const unsubscribe = useCallback(() => {
return PUSHER?.unsubscribe(channelName);
if (PUSHER && channelName) {
return PUSHER.unsubscribe(channelName);
}
}, [channelName]);

const bind = useCallback(
(event: any, callback: any) => {
return PUSHER?.channel(channelName)?.bind(event, callback);
(event: string, callback: (data: any) => void) => {
if (PUSHER && channelName) {
return PUSHER.channel(channelName)?.bind(event, callback);
}
},
[channelName]
);

const unbind = useCallback(
(event: any, callback: any) => {
return PUSHER?.channel(channelName)?.unbind(event, callback);
(event: string, callback: (data: any) => void) => {
if (PUSHER && channelName) {
return PUSHER.channel(channelName)?.unbind(event, callback);
}
},
[channelName]
);

const trigger = useCallback(
(event: any, data: any) => {
return PUSHER?.channel(channelName).trigger(event, data);
(event: string, data: any) => {
if (PUSHER && channelName) {
return PUSHER.channel(channelName)?.trigger(event, data);
}
},
[channelName]
);

const channel = useCallback(() => {
return PUSHER?.channel(channelName);
}, [channelName]);

return {
subscribe,
unsubscribe,
bind,
unbind,
trigger,
channel,
error,
};
};

export const useAlertPolling = () => {
const { bind, unbind } = useWebsocket();
const { bind, unbind, error } = useWebsocket();
const [pollAlerts, setPollAlerts] = useState(0);
const lastPollTimeRef = useRef(0);

const handleIncoming = useCallback((incoming: any) => {
const handleIncoming = useCallback(() => {
const currentTime = Date.now();
const timeSinceLastPoll = currentTime - lastPollTimeRef.current;

if (timeSinceLastPoll < POLLING_INTERVAL) {
setPollAlerts(0);
} else {
if (timeSinceLastPoll >= POLLING_INTERVAL) {
lastPollTimeRef.current = currentTime;
setPollAlerts(Math.floor(Math.random() * 10000));
}
Expand All @@ -105,5 +133,5 @@ export const useAlertPolling = () => {
};
}, [bind, unbind, handleIncoming]);

return { data: pollAlerts };
return { data: pollAlerts, error };
};

0 comments on commit 48ec355

Please sign in to comment.