Skip to content

Commit

Permalink
feat: middleware support (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
Meemaw authored Mar 9, 2023
1 parent cc88de3 commit 49e681d
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:

- name: Install dependencies
run: npm ci

- name: Run linters
run: npm run lint:check

Expand All @@ -37,6 +38,11 @@ jobs:
- name: Run tests
run: npm run test -- --coverage --ci --runInBand

- name: Upload code coverage
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

build:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# OpenSea Stream API - JavaScript SDK

[![https://badges.frapsoft.com/os/mit/mit.svg?v=102](https://badges.frapsoft.com/os/mit/mit.svg?v=102)](https://opensource.org/licenses/MIT)
[![Coverage Status](https://coveralls.io/repos/github/ProjectOpenSea/stream-js/badge.svg?branch=main)](https://coveralls.io/github/ProjectOpenSea/stream-js?branch=main)
[![styled with prettier](https://img.shields.io/badge/styled_with-prettier-ff69b4.svg)](https://github.com/prettier/prettier)

A Javascript SDK for receiving updates from the OpenSea Stream API - pushed over websockets. We currently support the following event types on a per-collection basis:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@opensea/stream-js",
"version": "0.0.23",
"version": "0.0.24",
"description": "An SDK to receive pushed updates from OpenSea over websocket",
"license": "MIT",
"author": "OpenSea Developers",
Expand Down
17 changes: 14 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ import {
TraitOfferEvent,
Callback,
LogLevel,
Network
Network,
OnClientEvent
} from './types';
import { ENDPOINTS } from './constants';

export class OpenSeaStreamClient {
private socket: Socket;
private channels: Map<string, Channel>;
private logLevel: LogLevel;
private onEvent: OnClientEvent;

constructor({
network = Network.MAINNET,
token,
apiUrl,
connectOptions,
logLevel = LogLevel.INFO,
onError = (error) => this.error(error)
onError = (error) => this.error(error),
onEvent = () => true
}: ClientConfig) {
const endpoint = apiUrl || ENDPOINTS[network];
const webTransportDefault =
Expand All @@ -44,6 +47,7 @@ export class OpenSeaStreamClient {
this.socket.onError(onError);
this.channels = new Map<string, Channel>();
this.logLevel = logLevel;
this.onEvent = onEvent;
}

private debug(message: unknown) {
Expand Down Expand Up @@ -113,7 +117,14 @@ export class OpenSeaStreamClient {
this.debug(`Fetching channel ${topic}`);
const channel = this.getChannel(topic);
this.debug(`Subscribing to ${eventType} events on ${topic}`);
channel.on(eventType, callback);

const onClientEvent = this.onEvent;
channel.on(eventType, (event) => {
if (onClientEvent(collectionSlug, eventType, event)) {
callback(event);
}
});

return () => {
this.debug(`Unsubscribing from ${eventType} events on ${topic}`);
channel.leave().receive('ok', () => {
Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import type { SocketConnectOption } from 'phoenix';

export type OnClientEvent = <Payload>(
collection: string,
eventType: EventType,
event: BaseStreamMessage<Payload>
) => boolean;

/**
* OpenSea Stream API configuration object
* @param token API key to use for API
Expand All @@ -8,6 +14,7 @@ import type { SocketConnectOption } from 'phoenix';
* @param connectOptions `SocketConnectOption` type to use to connect to the Stream API socket.
* @param onError a callback function to use whenever errors occur in the SDK.
* @param logLevel `LogLevel` type to define the amount of logging the SDK should provide.
* @param onEvent a callback function to use whenever an event is emmited in the SDK. Can be used to globally apply some logic, e.g emitting metric/logging etc. If the onEvent handler returns false, event will be filtered and the subscription callback won't be invoked.
*/
export type ClientConfig = {
network?: Network;
Expand All @@ -16,6 +23,7 @@ export type ClientConfig = {
connectOptions?: Partial<SocketConnectOption>;
onError?: (error: unknown) => void;
logLevel?: LogLevel;
onEvent?: OnClientEvent;
};

export enum Network {
Expand Down
138 changes: 138 additions & 0 deletions tests/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,141 @@ describe('event streams', () => {
});
});
});

describe('middleware', () => {
test('single', () => {
const collectionSlug = 'c1';

const onClientEvent = jest.fn().mockImplementation(() => true);

streamClient = new OpenSeaStreamClient({
token: 'test',
apiUrl: 'ws://localhost:1234',
connectOptions: { transport: WebSocket },
onEvent: onClientEvent
});

const socket = getSocket(streamClient);
jest
.spyOn(socket, 'endPointURL')
.mockImplementation(() => 'ws://localhost:1234');

const onEvent = jest.fn();

const listingEvent = mockEvent(EventType.ITEM_LISTED, {});
const saleEvent = mockEvent(EventType.ITEM_SOLD, {});

streamClient.onEvents(
collectionSlug,
[EventType.ITEM_LISTED, EventType.ITEM_SOLD],
(event) => onEvent(event)
);

server.send(
encode({
topic: collectionTopic(collectionSlug),
event: EventType.ITEM_LISTED,
payload: listingEvent
})
);

server.send(
encode({
topic: collectionTopic(collectionSlug),
event: EventType.ITEM_SOLD,
payload: saleEvent
})
);

expect(onClientEvent).nthCalledWith(
1,
collectionSlug,
EventType.ITEM_LISTED,
listingEvent
);

expect(onClientEvent).nthCalledWith(
2,
collectionSlug,
EventType.ITEM_SOLD,
saleEvent
);

expect(onEvent).nthCalledWith(1, listingEvent);
expect(onEvent).nthCalledWith(2, saleEvent);

streamClient.disconnect();
});

test('filter out events', () => {
const collectionSlug = 'c1';

const onClientEvent = jest
.fn()
.mockImplementation(
(_c, _e, event) => event.payload.chain === 'ethereum'
);

streamClient = new OpenSeaStreamClient({
token: 'test',
apiUrl: 'ws://localhost:1234',
connectOptions: { transport: WebSocket },
onEvent: onClientEvent
});

const socket = getSocket(streamClient);
jest
.spyOn(socket, 'endPointURL')
.mockImplementation(() => 'ws://localhost:1234');

const onEvent = jest.fn();

const ethereumListing = mockEvent(EventType.ITEM_LISTED, {
chain: 'ethereum'
});
const polygonListing = mockEvent(EventType.ITEM_LISTED, {
chain: 'polygon'
});

streamClient.onEvents(
collectionSlug,
[EventType.ITEM_LISTED, EventType.ITEM_SOLD],
(event) => onEvent(event)
);

server.send(
encode({
topic: collectionTopic(collectionSlug),
event: EventType.ITEM_LISTED,
payload: ethereumListing
})
);

server.send(
encode({
topic: collectionTopic(collectionSlug),
event: EventType.ITEM_SOLD,
payload: polygonListing
})
);

expect(onClientEvent).nthCalledWith(
1,
collectionSlug,
EventType.ITEM_LISTED,
ethereumListing
);

expect(onClientEvent).nthCalledWith(
2,
collectionSlug,
EventType.ITEM_SOLD,
polygonListing
);

expect(onEvent).toHaveBeenCalledTimes(1);
expect(onEvent).toHaveBeenCalledWith(ethereumListing);

streamClient.disconnect();
});
});

0 comments on commit 49e681d

Please sign in to comment.