Skip to content

Commit

Permalink
fix: remove redis client direct dependency
Browse files Browse the repository at this point in the history
closes #24
  • Loading branch information
derevnjuk committed Apr 24, 2024
1 parent 2013436 commit 17a2100
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 25 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ The `@socket.io/redis-streams-adapter` package allows broadcasting packets betwe
npm install @socket.io/redis-streams-adapter redis
```

or

```
npm install @socket.io/redis-streams-adapter ioredis
```

## Usage

### With the `redis` package
Expand Down
8 changes: 5 additions & 3 deletions lib/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export function createAdapter(
if (response) {
for (const entry of response[0].messages) {
debug("reading entry %s", entry.id);
const message = entry.message;
const message = entry.message as RawClusterMessage;

if (message.nsp) {
namespaceToAdapters
Expand Down Expand Up @@ -267,6 +267,7 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
return Promise.reject("session or offset not found");
}

// @ts-expect-error rawSession is expected to be a string
const session = decode(Buffer.from(rawSession, "base64")) as Session;

debug("found session %o", session);
Expand All @@ -288,8 +289,9 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
}

for (const entry of entries) {
if (entry.message.nsp === this.nsp.name && entry.message.type === "3") {
const message = RedisStreamsAdapter.decode(entry.message);
const rawMessage = entry.message as RawClusterMessage;
if (rawMessage.nsp === this.nsp.name && rawMessage.type === "3") {
const message = RedisStreamsAdapter.decode(rawMessage);

// @ts-ignore
if (shouldIncludePacket(session.rooms, message.data.opts)) {
Expand Down
59 changes: 37 additions & 22 deletions lib/util.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import { randomBytes } from "crypto";
import { commandOptions } from "redis";
import {
type RedisClientType,
type RedisFunctions,
type RedisModules,
type RedisScripts,
} from "redis";
import { type Redis as IORedisClient } from "ioredis";

export type RedisClient = RedisClientType<
RedisModules,
RedisFunctions,
RedisScripts
>;

export function hasBinary(obj: any, toJSON?: boolean): boolean {
if (!obj || typeof obj !== "object") {
Expand Down Expand Up @@ -43,8 +55,12 @@ export function randomId() {
*
* @see https://github.com/redis/node-redis
*/
function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
function isRedisV4Client(
redisClient: RedisClient | IORedisClient
): redisClient is RedisClient {
return (
"sSubscribe" in redisClient && typeof redisClient.sSubscribe === "function"
);
}

/**
Expand All @@ -68,30 +84,29 @@ function mapResult(result) {
* @see https://redis.io/commands/xread/
*/
export function XREAD(
redisClient: any,
redisClient: RedisClient | IORedisClient,
streamName: string,
offset: string,
readCount: number
) {
if (isRedisV4Client(redisClient)) {
return redisClient.xRead(
commandOptions({
isolated: true,
}),
[
return redisClient.executeIsolated((isolatedClient) =>
isolatedClient.xRead(
[
{
key: streamName,
id: offset,
},
],
{
key: streamName,
id: offset,
},
],
{
COUNT: readCount,
BLOCK: 5000,
}
COUNT: readCount,
BLOCK: 5000,
}
)
);
} else {
return redisClient
.xread("BLOCK", 100, "COUNT", readCount, "STREAMS", streamName, offset)
.xread("COUNT", readCount, "BLOCK", 100, "STREAMS", streamName, offset)
.then((results) => {
if (results === null) {
return null;
Expand All @@ -109,7 +124,7 @@ export function XREAD(
* @see https://redis.io/commands/xadd/
*/
export function XADD(
redisClient: any,
redisClient: RedisClient | IORedisClient,
streamName: string,
payload: any,
maxLenThreshold: number
Expand All @@ -136,7 +151,7 @@ export function XADD(
* @see https://redis.io/commands/xrange/
*/
export function XRANGE(
redisClient: any,
redisClient: RedisClient | IORedisClient,
streamName: string,
start: string,
end: string
Expand All @@ -154,7 +169,7 @@ export function XRANGE(
* @see https://redis.io/commands/set/
*/
export function SET(
redisClient: any,
redisClient: RedisClient | IORedisClient,
key: string,
value: string,
expiryInSeconds: number
Expand All @@ -171,7 +186,7 @@ export function SET(
/**
* @see https://redis.io/commands/getdel/
*/
export function GETDEL(redisClient: any, key: string) {
export function GETDEL(redisClient: RedisClient | IORedisClient, key: string) {
if (isRedisV4Client(redisClient)) {
// note: GETDEL was added in Redis version 6.2
return redisClient.multi().get(key).del(key).exec();
Expand Down

0 comments on commit 17a2100

Please sign in to comment.