From 17a210070c7946c813379df723a6d168189fbdad Mon Sep 17 00:00:00 2001 From: Artem Derevnjuk Date: Wed, 24 Apr 2024 11:15:31 +0400 Subject: [PATCH] fix: remove redis client direct dependency closes #24 --- README.md | 6 +++++ lib/adapter.ts | 8 ++++--- lib/util.ts | 59 +++++++++++++++++++++++++++++++------------------- 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 515d6e5..0ccdd5a 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,12 @@ The `@socket.io/redis-streams-adapter` package allows broadcasting packets betwe npm install @socket.io/redis-streams-adapter redis ``` +or + +``` +npm install @socket.io/redis-streams-adapter ioredis +``` + ## Usage ### With the `redis` package diff --git a/lib/adapter.ts b/lib/adapter.ts index 17f082a..c752026 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -83,7 +83,7 @@ export function createAdapter( if (response) { for (const entry of response[0].messages) { debug("reading entry %s", entry.id); - const message = entry.message; + const message = entry.message as RawClusterMessage; if (message.nsp) { namespaceToAdapters @@ -267,6 +267,7 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { return Promise.reject("session or offset not found"); } + // @ts-expect-error rawSession is expected to be a string const session = decode(Buffer.from(rawSession, "base64")) as Session; debug("found session %o", session); @@ -288,8 +289,9 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat { } for (const entry of entries) { - if (entry.message.nsp === this.nsp.name && entry.message.type === "3") { - const message = RedisStreamsAdapter.decode(entry.message); + const rawMessage = entry.message as RawClusterMessage; + if (rawMessage.nsp === this.nsp.name && rawMessage.type === "3") { + const message = RedisStreamsAdapter.decode(rawMessage); // @ts-ignore if (shouldIncludePacket(session.rooms, message.data.opts)) { diff --git a/lib/util.ts b/lib/util.ts index c311ce4..57fb9d2 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -1,5 +1,17 @@ import { randomBytes } from "crypto"; -import { commandOptions } from "redis"; +import { + type RedisClientType, + type RedisFunctions, + type RedisModules, + type RedisScripts, +} from "redis"; +import { type Redis as IORedisClient } from "ioredis"; + +export type RedisClient = RedisClientType< + RedisModules, + RedisFunctions, + RedisScripts +>; export function hasBinary(obj: any, toJSON?: boolean): boolean { if (!obj || typeof obj !== "object") { @@ -43,8 +55,12 @@ export function randomId() { * * @see https://github.com/redis/node-redis */ -function isRedisV4Client(redisClient: any) { - return typeof redisClient.sSubscribe === "function"; +function isRedisV4Client( + redisClient: RedisClient | IORedisClient +): redisClient is RedisClient { + return ( + "sSubscribe" in redisClient && typeof redisClient.sSubscribe === "function" + ); } /** @@ -68,30 +84,29 @@ function mapResult(result) { * @see https://redis.io/commands/xread/ */ export function XREAD( - redisClient: any, + redisClient: RedisClient | IORedisClient, streamName: string, offset: string, readCount: number ) { if (isRedisV4Client(redisClient)) { - return redisClient.xRead( - commandOptions({ - isolated: true, - }), - [ + return redisClient.executeIsolated((isolatedClient) => + isolatedClient.xRead( + [ + { + key: streamName, + id: offset, + }, + ], { - key: streamName, - id: offset, - }, - ], - { - COUNT: readCount, - BLOCK: 5000, - } + COUNT: readCount, + BLOCK: 5000, + } + ) ); } else { return redisClient - .xread("BLOCK", 100, "COUNT", readCount, "STREAMS", streamName, offset) + .xread("COUNT", readCount, "BLOCK", 100, "STREAMS", streamName, offset) .then((results) => { if (results === null) { return null; @@ -109,7 +124,7 @@ export function XREAD( * @see https://redis.io/commands/xadd/ */ export function XADD( - redisClient: any, + redisClient: RedisClient | IORedisClient, streamName: string, payload: any, maxLenThreshold: number @@ -136,7 +151,7 @@ export function XADD( * @see https://redis.io/commands/xrange/ */ export function XRANGE( - redisClient: any, + redisClient: RedisClient | IORedisClient, streamName: string, start: string, end: string @@ -154,7 +169,7 @@ export function XRANGE( * @see https://redis.io/commands/set/ */ export function SET( - redisClient: any, + redisClient: RedisClient | IORedisClient, key: string, value: string, expiryInSeconds: number @@ -171,7 +186,7 @@ export function SET( /** * @see https://redis.io/commands/getdel/ */ -export function GETDEL(redisClient: any, key: string) { +export function GETDEL(redisClient: RedisClient | IORedisClient, key: string) { if (isRedisV4Client(redisClient)) { // note: GETDEL was added in Redis version 6.2 return redisClient.multi().get(key).del(key).exec();