diff --git a/README.md b/README.md index 87cf0c9..d257886 100644 --- a/README.md +++ b/README.md @@ -232,7 +232,110 @@ ws.on('error', (err) => { }); ``` -See [websocket-client.ts](./src/websocket-client.ts) for further information and make sure to check the [examples](./examples/) folder for much more detail. +See [WebsocketClient.ts](./src/WebsocketClient.ts) for further information and make sure to check the [examples](./examples/) folder for much more detail. + +### Websocket API + +The [WebsocketClient.ts](./src/WebsocketClient.ts) supports this exchange's Websocket API. There are two ways to use the WS API, depending on individual preference: + +- event-driven: + - send requests via `client.sendWSAPIRequest(wsKey, channel, params)`, fire and forget, don't use await + - handle async replies via event handlers on `client.on('exception', cb)` and `client.on('response', cb)` +- promise-driven: + - send requests via `const result = await client.sendWSAPIRequest(wsKey, channel, params)`, which returns a promise + - await each call + - use try/catch blocks to handle promise rejections + +The below example demonstrates the promise-driven approach, which behaves similar to a REST API. For more detailed examples, refer to the [examples](./examples/) folder (e.g the [ws-private-spot-wsapi.ts](./examples/ws-private-spot-wsapi.ts) example). + +```javascript +const { WebsocketClient } = require('gateio-api'); + +const API_KEY = 'xxx'; +const PRIVATE_KEY = 'yyy'; + +async function start() { + const client = new WebsocketClient({ + apiKey: API_KEY, + apiSecret: PRIVATE_KEY, + // Automatically re-auth WS API, if we were auth'd before and get reconnected + reauthWSAPIOnReconnect: true, + }); + + /** + * Setup basic event handlers for core connectivity events. + * Note for this approach, the `response` and `update` events are not needed (but you can use them too/instead if you prefer). + **/ + + // Successfully connected + client.on('open', (data) => { + console.log(new Date(), 'ws connected ', data?.wsKey); + }); + + // Something happened, attempting to reconenct + client.on('reconnect', (data) => { + console.log(new Date(), 'ws reconnect: ', data); + }); + + // Reconnect successful + client.on('reconnected', (data) => { + console.log(new Date(), 'ws reconnected: ', data); + }); + + // Connection closed. If unexpected, expect reconnect -> reconnected. + client.on('close', (data) => { + console.error(new Date(), 'ws close: ', data); + }); + + client.on('exception', (data) => { + console.error(new Date(), 'ws exception: ', data); + }); + + client.on('authenticated', (data) => { + console.error(new Date(), 'ws authenticated: ', data); + }); + + try { + /** + * All WebSocket API (WS API) messaging should be done via the sendWSAPIRequest method. + */ + + // The WSKey identifies which connection this request is for. + // (e.g. "spotV4" | "perpFuturesUSDTV4" | "perpFuturesBTCV4" | "deliveryFuturesUSDTV4" | "deliveryFuturesBTCV4" | "optionsV4") + const wsKey = 'spotV4'; + + /** + * To authenticate, send an empty request to "spot.login". The SDK will handle all the parameters. + * + * By default (reauthWSAPIOnReconnect: true), if we get reconnected later on (e.g. connection temporarily lost), we will try to re-authenticate the WS API automatically when the connection is restored. + */ + console.log(new Date(), 'try authenticate'); + const loginResult = await client.sendWSAPIRequest(wsKey, 'spot.login'); + console.log(new Date(), 'authenticated!', loginResult); + + /** + * For other channels, you should include any parameters for the request (the payload) in your call. + * + * Note that internal parameters such as "signature" etc are all handled automatically by the SDK. Only the core request parameters are needed. + */ + console.log(new Date(), 'try get order status'); + const orderStatus = await client.sendWSAPIRequest( + wsKey, + 'spot.order_status', + { + order_id: '600995435390', + currency_pair: 'BTC_USDT', + }, + ); + + console.log(new Date(), 'orderStatus result!', orderStatus); + } catch (e) { + console.error(`WS API Error: `, e); + } +} + +start(); +``` --- diff --git a/examples/ws-private-spot-wsapi.ts b/examples/ws-private-spot-wsapi.ts new file mode 100644 index 0000000..8f24a34 --- /dev/null +++ b/examples/ws-private-spot-wsapi.ts @@ -0,0 +1,114 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ + +import { LogParams, WebsocketClient } from '../src'; + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const account = { + key: process.env.API_KEY || 'apiKeyHere', + secret: process.env.API_SECRET || 'apiSecretHere', +}; + +const customLogger = { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + trace: (...params: LogParams): void => { + // console.log(new Date(), 'trace', ...params); + }, + info: (...params: LogParams): void => { + console.log(new Date(), 'info', ...params); + }, + error: (...params: LogParams): void => { + console.error(new Date(), 'error', ...params); + }, +}; + +async function start() { + const client = new WebsocketClient( + { + apiKey: account.key, + apiSecret: account.secret, + reauthWSAPIOnReconnect: true, + }, + customLogger, + ); + + client.on('open', (data) => { + console.log(new Date(), 'ws connected ', data?.wsKey); + }); + + // // Data received + // client.on('update', (data) => { + // // console.info(new Date(), 'ws data received: ', JSON.stringify(data)); + // console.info(new Date(), 'ws data received: ', JSON.stringify(data, null, 2)); + // }); + + // Something happened, attempting to reconenct + client.on('reconnect', (data) => { + console.log(new Date(), 'ws reconnect: ', data); + }); + + // Reconnect successful + client.on('reconnected', (data) => { + console.log(new Date(), 'ws reconnected: ', data); + }); + + // Connection closed. If unexpected, expect reconnect -> reconnected. + client.on('close', (data) => { + console.error(new Date(), 'ws close: ', data); + }); + + // Reply to a request, e.g. "subscribe"/"unsubscribe"/"authenticate" + client.on('response', (data) => { + console.info( + new Date(), + 'ws server reply ', + JSON.stringify(data, null, 2), + '\n', + ); + }); + + client.on('exception', (data) => { + console.error(new Date(), 'ws exception: ', data); + }); + + client.on('authenticated', (data) => { + console.error(new Date(), 'ws authenticated: ', data); + }); + + try { + /** + * All WebSocket API (WS API) messaging should be done via the sendWSAPIRequest method. + * + * You have two ways to handle responses on the WS API. You can either: + * - process async `response` and `update` events from the websocket client, OR + * - await every call to `client.sendWSAPIRequest`, this can behave similar to using a REST API (successful responses resolve, exceptions reject). + */ + + /** + * To authenticate, send an empty request to "spot.login". The SDK will handle all the parameters. + */ + console.log(new Date(), 'try authenticate'); + const loginResult = await client.sendWSAPIRequest('spotV4', 'spot.login'); + console.log(new Date(), 'authenticated!', loginResult); + + /** + * For other channels, the 3rd parameter should have any parameters for the request (the payload). + * + * Note that internal parameters such as "signature" etc are all handled automatically by the SDK. + */ + console.log(new Date(), 'try get order status'); + const orderStatus = await client.sendWSAPIRequest( + 'spotV4', + 'spot.order_status', + { + order_id: '600995435390', + currency_pair: 'BTC_USDT', + }, + ); + + console.log(new Date(), 'orderStatus result!', orderStatus); + } catch (e) { + console.error(`WS API Error: `, e); + } +} + +start(); diff --git a/src/WebsocketClient.ts b/src/WebsocketClient.ts index 9543df9..8caa929 100644 --- a/src/WebsocketClient.ts +++ b/src/WebsocketClient.ts @@ -1,8 +1,6 @@ -import WebSocket from 'isomorphic-ws'; - import { BaseWebsocketClient, EmittableEvent } from './lib/BaseWSClient.js'; import { neverGuard } from './lib/misc-util.js'; -import { MessageEventLike } from './lib/requestUtils.js'; +import { CHANNEL_ID, MessageEventLike } from './lib/requestUtils.js'; import { SignAlgorithm, SignEncodeMethod, @@ -15,11 +13,14 @@ import { WsMarket, WsTopicRequest, } from './lib/websocket/websocket-util.js'; +import { DeferredPromise } from './lib/websocket/WsStore.types.js'; import { + WSAPIRequest, WsOperation, WsRequestOperationGate, WsRequestPing, } from './types/websockets/requests.js'; +import { SpotWSAPITopic, WSAPITopic } from './types/websockets/shared.js'; export const WS_LOGGER_CATEGORY = { category: 'gate-ws' }; @@ -47,7 +48,7 @@ function getPrivateSpotTopics(): string[] { ]; // WebSocket API for spot - const privateSpotWSAPITopics = [ + const privateSpotWSAPITopics: SpotWSAPITopic[] = [ 'spot.login', 'spot.order_place', 'spot.order_cancel', @@ -109,7 +110,7 @@ export class WebsocketClient extends BaseWebsocketClient { /** * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library */ - public connectAll(): Promise[] { + public connectAll(): (DeferredPromise['promise'] | undefined)[] { return [ this.connect(WS_KEY_MAP.spotV4), this.connect(WS_KEY_MAP.perpFuturesUSDTV4), @@ -201,8 +202,8 @@ export class WebsocketClient extends BaseWebsocketClient { } } - const timeInMs = Date.now(); - const timeInS = (timeInMs / 1000).toFixed(0); + const signTimestamp = Date.now() + this.options.recvWindow; + const timeInS = (signTimestamp / 1000).toFixed(0); return this.tryWsSend( wsKey, `{ "time": ${timeInS}, "channel": "${pingChannel}" }`, @@ -257,7 +258,10 @@ export class WebsocketClient extends BaseWebsocketClient { /** * Parse incoming events into categories */ - protected resolveEmittableEvents(event: MessageEventLike): EmittableEvent[] { + protected resolveEmittableEvents( + wsKey: WsKey, + event: MessageEventLike, + ): EmittableEvent[] { const results: EmittableEvent[] = []; try { @@ -266,7 +270,87 @@ export class WebsocketClient extends BaseWebsocketClient { const responseEvents = ['subscribe', 'unsubscribe']; const authenticatedEvents = ['auth']; - const eventAction = parsed.event || parsed.action; + const eventHeaders = parsed?.header; + const eventChannel = eventHeaders?.channel; + const eventType = eventHeaders?.event; + const eventStatusCode = eventHeaders?.status; + const requestId = parsed?.request_id; + + const promiseRef = [eventChannel, requestId].join('_'); + + const eventAction = parsed.event || parsed.action || parsed?.header.data; + + // const promise = this.getWsStore().getDeferredPromise(wsKey, promiseRef); + // console.error(`Event action: `, { + // // parsed: JSON.stringify(parsed, null, 2), + // eventChannel, + // eventType, + // eventStatusCode, + // promiseRef, + // promise: promise?.promise, + // }); + + if (eventType === 'api') { + const isError = eventStatusCode !== '200'; + + // WS API Exception + if (isError) { + try { + this.getWsStore().rejectDeferredPromise( + wsKey, + promiseRef, + parsed, + true, + ); + } catch (e) { + this.logger.error(`Exception trying to reject WSAPI promise`, { + wsKey, + promiseRef, + parsedEvent: parsed, + }); + } + + results.push({ + eventType: 'exception', + event: parsed, + }); + return results; + } + + // WS API Success + try { + this.getWsStore().resolveDeferredPromise( + wsKey, + promiseRef, + parsed, + true, + ); + } catch (e) { + this.logger.error(`Exception trying to resolve WSAPI promise`, { + wsKey, + promiseRef, + parsedEvent: parsed, + }); + } + + if (eventChannel.includes('.login')) { + results.push({ + eventType: 'authenticated', + event: { + ...parsed, + isWSAPI: true, + WSAPIAuthChannel: eventChannel, + }, + }); + } + + results.push({ + eventType: 'response', + event: parsed, + }); + return results; + } + if (typeof eventAction === 'string') { if (parsed.success === false) { results.push({ @@ -302,15 +386,6 @@ export class WebsocketClient extends BaseWebsocketClient { return results; } - // if (eventAction === 'ping') { - // this.logger.trace('Received ping - preparing pong', { - // ...WS_LOGGER_CATEGORY, - // wsKey, - // }); - // this.sendPongEvent(wsKey, ws); - // return; - // } - this.logger.error( `!! Unhandled string event type "${eventAction}. Defaulting to "update" channel...`, parsed, @@ -492,7 +567,8 @@ export class WebsocketClient extends BaseWebsocketClient { switch (market) { case 'all': { for (const request of requests) { - const timeInSeconds = +(Date.now() / 1000).toFixed(0); + const signTimestamp = Date.now() + this.options.recvWindow; + const timeInSeconds = +(signTimestamp / 1000).toFixed(0); const wsEvent: WsRequestOperationGate = { time: timeInSeconds, @@ -600,20 +676,12 @@ export class WebsocketClient extends BaseWebsocketClient { const signTimestamp = Date.now() + this.options.recvWindow; const signMessageInput = `|${signTimestamp}`; - let signature: string; - if (typeof this.options.customSignMessageFn === 'function') { - signature = await this.options.customSignMessageFn( - signMessageInput, - this.options.apiSecret, - ); - } else { - signature = await signMessage( - signMessageInput, - this.options.apiSecret, - 'hex', - 'SHA-512', - ); - } + const signature = await this.signMessage( + signMessageInput, + this.options.apiSecret, + 'hex', + 'SHA-512', + ); switch (market) { case 'all': { @@ -635,13 +703,118 @@ export class WebsocketClient extends BaseWebsocketClient { } } + async signWSAPIRequest( + requestEvent: WSAPIRequest, + ): Promise> { + if (!this.options.apiSecret) { + throw new Error(`API Secret missing`); + } + + const payload = requestEvent.payload; + + const toSign = [ + requestEvent.event, + requestEvent.channel, + JSON.stringify(payload.req_param), + requestEvent.time, + ].join('\n'); + + const signEncoding: SignEncodeMethod = 'hex'; + const signAlgoritm: SignAlgorithm = 'SHA-512'; + + return { + ...requestEvent, + payload: { + ...requestEvent.payload, + req_header: { + 'X-Gate-Channel-Id': CHANNEL_ID, + }, + signature: await this.signMessage( + toSign, + this.options.apiSecret, + signEncoding, + signAlgoritm, + ), + }, + }; + } + + getPromiseRefForWSAPIRequest(requestEvent: WSAPIRequest): string { + const promiseRef = [ + requestEvent.channel, + requestEvent.payload?.req_id, + ].join('_'); + return promiseRef; + } + /** - * This exchange API is split into "markets" that behave differently (different base URLs). - * The market can easily be resolved using the topic name. + * WS API Methods */ - private getMarketForTopic(topic: string): WsMarket { - return 'all'; - throw new Error(`Could not resolve "market" for topic: "${topic}"`); + /** + * Send a Websocket API event on a connection. Returns a promise that resolves on reply. + * + * Returned promise is rejected if an exception is detected in the reply OR the connection disconnects for any reason (even if automatic reconnect will happen). + * + * After a fresh connection, you should always send a login request first. + * + * If you authenticated once and you're reconnected later (e.g. connection temporarily lost), the SDK will by default automatically: + * - Detect you were authenticated to the WS API before + * - Try to re-authenticate (up to 5 times, in case something (bad timestamp) goes wrong) + * - If it succeeds, it will emit the 'authenticated' event. + * - If it fails and gives up, it will emit an 'exception' event (type: 'wsapi.auth', reason: detailed text). + * + * You can turn off the automatic re-auth WS API logic using `reauthWSAPIOnReconnect: false` in the WSClient config. + * + * @param wsKey - The connection this event is for (e.g. "spotV4" | "perpFuturesUSDTV4" | "perpFuturesBTCV4" | "deliveryFuturesUSDTV4" | "deliveryFuturesBTCV4" | "optionsV4") + * @param channel - The channel this event is for (e.g. "spot.login" to authenticate) + * @param params - Any request parameters for the payload. Signature generation is automatic, only send parameters such as order ID as per the docs. + * @returns + */ + async sendWSAPIRequest( + wsKey: WsKey, + channel: WSAPITopic, + params?: TRequestParams, + ) { + this.logger.trace(`sendWSAPIRequest(): assert "${wsKey}" is connected`); + await this.assertIsConnected(wsKey); + + const signTimestamp = Date.now() + this.options.recvWindow; + const timeInSeconds = +(signTimestamp / 1000).toFixed(0); + + const requestEvent: WSAPIRequest = { + time: timeInSeconds, + // id: timeInSeconds, + channel, + event: 'api', + payload: { + req_id: this.getNewRequestId(), + req_header: { + 'X-Gate-Channel-Id': CHANNEL_ID, + }, + api_key: this.options.apiKey, + req_param: params ? params : '', // should this be string, not sure + timestamp: `${timeInSeconds}`, + }, + }; + + // Sign request + const signedEvent = await this.signWSAPIRequest(requestEvent); + + // Store deferred promise + const promiseRef = this.getPromiseRefForWSAPIRequest(requestEvent); + const deferredPromise = this.getWsStore().createDeferredPromise( + wsKey, + promiseRef, + false, + ); + + // Send event + this.tryWsSend(wsKey, JSON.stringify(signedEvent)); + + this.logger.trace(`sendWSAPIRequest(): sent ${channel} event`); + + // Return deferred promise, so caller can await this call + return deferredPromise.promise; } } diff --git a/src/lib/BaseRestClient.ts b/src/lib/BaseRestClient.ts index 26c97cc..1950b5a 100644 --- a/src/lib/BaseRestClient.ts +++ b/src/lib/BaseRestClient.ts @@ -336,8 +336,8 @@ export abstract class BaseRestClient { const encodeQueryStringValues = true; if (signMethod === 'gateV4') { - const signAlgoritm: SignAlgorithm = 'SHA-512'; const signEncoding: SignEncodeMethod = 'hex'; + const signAlgoritm: SignAlgorithm = 'SHA-512'; const queryStringToSign = data?.query ? serializeParams( diff --git a/src/lib/BaseWSClient.ts b/src/lib/BaseWSClient.ts index 3f071cf..c30ab32 100644 --- a/src/lib/BaseWSClient.ts +++ b/src/lib/BaseWSClient.ts @@ -14,7 +14,10 @@ import { WsTopicRequestOrStringTopic, } from './websocket/websocket-util.js'; import { WsStore } from './websocket/WsStore.js'; -import { WsConnectionStateEnum } from './websocket/WsStore.types.js'; +import { + DeferredPromise, + WsConnectionStateEnum, +} from './websocket/WsStore.types.js'; interface WSClientEventMap { /** Connection opened. If this connection was previously opened and reconnected, expect the reconnected event instead */ @@ -99,6 +102,8 @@ export abstract class BaseWebsocketClient< protected logger: typeof DefaultLogger; protected options: WebsocketClientOptions; + private wsApiRequestId: number = 0; + constructor( options?: WSClientConfigurableOptions, logger?: typeof DefaultLogger, @@ -113,11 +118,13 @@ export abstract class BaseWebsocketClient< pongTimeout: 1500, pingInterval: 10000, reconnectTimeout: 500, - recvWindow: 0, + recvWindow: -1000, // Gate.io only has one connection (for both public & private). Auth works with every sub, not on connect, so this is turned off. authPrivateConnectionsOnConnect: false, // Gate.io requires auth to be added to every request, when subscribing to private topics. This is handled automatically. authPrivateRequests: true, + // Automatically re-auth WS API, if we were auth'd before and get reconnected + reauthWSAPIOnReconnect: true, ...options, }; } @@ -155,18 +162,28 @@ export abstract class BaseWebsocketClient< * Abstraction called to sort ws events into emittable event types (response to a request, data update, etc) */ protected abstract resolveEmittableEvents( + wsKey: TWSKey, event: MessageEventLike, ): EmittableEvent[]; /** * Request connection of all dependent (public & private) websockets, instead of waiting for automatic connection by library */ - abstract connectAll(): Promise[]; + protected abstract connectAll(): (DeferredPromise['promise'] | undefined)[]; protected isPrivateWsKey(wsKey: TWSKey): boolean { return this.getPrivateWSKeys().includes(wsKey); } + /** Returns auto-incrementing request ID, used to track promise references for async requests */ + protected getNewRequestId(): string { + return `${++this.wsApiRequestId}`; + } + + protected abstract sendWSAPIRequest< + TRequestParams extends object | string = object, + >(wsKey: TWSKey, channel: string, params?: TRequestParams); + /** * Don't call directly! Use subscribe() instead! * @@ -196,15 +213,13 @@ export abstract class BaseWebsocketClient< WsConnectionStateEnum.CONNECTED, ); + // TODO: test this, spam loads of subscribe topic calls in intervals (and add a log line on this logic) + const isConnectionInProgress = + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) || + this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.RECONNECTING); + // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect - if ( - !isConnected && - !this.wsStore.isConnectionState( - wsKey, - WsConnectionStateEnum.CONNECTING, - ) && - !this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.RECONNECTING) - ) { + if (!isConnected && !isConnectionInProgress) { return this.connect(wsKey); } @@ -337,14 +352,16 @@ export abstract class BaseWebsocketClient< /** * Request connection to a specific websocket, instead of waiting for automatic connection. */ - protected async connect(wsKey: TWSKey): Promise { + protected async connect( + wsKey: TWSKey, + ): Promise { try { if (this.wsStore.isWsOpen(wsKey)) { this.logger.error( 'Refused to connect to ws with existing active connection', { ...WS_LOGGER_CATEGORY, wsKey }, ); - return this.wsStore.getWs(wsKey); + return this.wsStore.getConnectionInProgressPromise(wsKey); } if ( @@ -364,16 +381,39 @@ export abstract class BaseWebsocketClient< this.setWsState(wsKey, WsConnectionStateEnum.CONNECTING); } + if (!this.wsStore.getConnectionInProgressPromise(wsKey)) { + this.wsStore.createConnectionInProgressPromise(wsKey, false); + } + const url = this.getWsUrl(wsKey); const ws = this.connectToWsUrl(url, wsKey); - return this.wsStore.setWs(wsKey, ws); + this.wsStore.setWs(wsKey, ws); + + return this.wsStore.getConnectionInProgressPromise(wsKey)?.promise; } catch (err) { this.parseWsError('Connection failed', err, wsKey); this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); } } + private connectToWsUrl(url: string, wsKey: TWSKey): WebSocket { + this.logger.trace(`Opening WS connection to URL: ${url}`, { + ...WS_LOGGER_CATEGORY, + wsKey, + }); + + const ws = new WebSocket(url, undefined); + + ws.onopen = (event: any) => this.onWsOpen(event, wsKey); + ws.onmessage = (event: any) => this.onWsMessage(event, wsKey, ws); + ws.onerror = (event: any) => + this.parseWsError('websocket error', event, wsKey); + ws.onclose = (event: any) => this.onWsClose(event, wsKey); + + return ws; + } + private parseWsError(context: string, error: any, wsKey: TWSKey) { if (!error.message) { this.logger.error(`${context} due to unexpected error: `, error); @@ -601,24 +641,11 @@ export abstract class BaseWebsocketClient< } } - private connectToWsUrl(url: string, wsKey: TWSKey): WebSocket { - this.logger.trace(`Opening WS connection to URL: ${url}`, { - ...WS_LOGGER_CATEGORY, - wsKey, - }); - - const ws = new WebSocket(url, undefined); - - ws.onopen = (event: any) => this.onWsOpen(event, wsKey); - ws.onmessage = (event: any) => this.onWsMessage(event, wsKey, ws); - ws.onerror = (event: any) => - this.parseWsError('websocket error', event, wsKey); - ws.onclose = (event: any) => this.onWsClose(event, wsKey); - - return ws; - } - private async onWsOpen(event: any, wsKey: TWSKey) { + const didReconnectSuccessfully = this.wsStore.isConnectionState( + wsKey, + WsConnectionStateEnum.RECONNECTING, + ); if ( this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.CONNECTING) ) { @@ -627,9 +654,7 @@ export abstract class BaseWebsocketClient< wsKey, }); this.emit('open', { wsKey, event }); - } else if ( - this.wsStore.isConnectionState(wsKey, WsConnectionStateEnum.RECONNECTING) - ) { + } else if (didReconnectSuccessfully) { this.logger.info('Websocket reconnected', { ...WS_LOGGER_CATEGORY, wsKey, @@ -645,6 +670,24 @@ export abstract class BaseWebsocketClient< this.options.pingInterval, ); + // Resolve & cleanup deferred "connection attempt in progress" promise + try { + const connectionInProgressPromise = + this.wsStore.getConnectionInProgressPromise(wsKey); + if (connectionInProgressPromise?.resolve) { + connectionInProgressPromise.resolve({ + wsKey, + }); + } + } catch (e) { + this.logger.error( + `Exception trying to resolve "connectionInProgress" promise`, + ); + } + + // Remove before resolving, in case there's more requests queued + this.wsStore.removeConnectingInProgressPromise(wsKey); + // Some websockets require an auth packet to be sent after opening the connection if ( this.isPrivateWsKey(wsKey) && @@ -664,9 +707,58 @@ export abstract class BaseWebsocketClient< // Request sub to private topics, if auth on connect isn't needed if (!this.options.authPrivateConnectionsOnConnect) { - // TODO: check for API keys and throw if missing this.requestSubscribeTopics(wsKey, privateReqs); } + + const wsStoredState = this.wsStore.get(wsKey, true); + + const { didAuthWSAPI, WSAPIAuthChannel } = wsStoredState; + + // If enabled, automatically reauth WS API if reconnected + if ( + didReconnectSuccessfully && + this.options.reauthWSAPIOnReconnect && + didAuthWSAPI && + WSAPIAuthChannel + ) { + this.logger.info( + `WS API was authenticated before reconnect - re-authenticating WS API...`, + ); + + let attempt = 0; + const maxReAuthAttempts = 5; + + while (attempt <= maxReAuthAttempts) { + attempt++; + try { + this.logger.trace( + `try reauthenticate (attempt ${attempt}/${maxReAuthAttempts})`, + ); + const loginResult = await this.sendWSAPIRequest( + wsKey, + WSAPIAuthChannel, + ); + this.logger.trace('reauthenticated!', loginResult); + break; + } catch (e) { + const giveUp = attempt >= maxReAuthAttempts; + + const suffix = giveUp + ? 'Max tries reached. Giving up!' + : 'Trying again...'; + + this.logger.error( + `Exception trying to reauthenticate WS API on reconnect... ${suffix}`, + ); + + this.emit('exception', { + wsKey, + type: 'wsapi.auth', + reason: `automatic WS API reauth failed after ${attempt} attempts`, + }); + } + } + } } /** @@ -674,17 +766,27 @@ export abstract class BaseWebsocketClient< * * Only used for exchanges that require auth before sending private topic subscription requests */ - private onWsAuthenticated(wsKey: TWSKey) { + private onWsAuthenticated( + wsKey: TWSKey, + event: { isWSAPI?: boolean; WSAPIAuthChannel?: string }, + ) { const wsState = this.wsStore.get(wsKey, true); wsState.isAuthenticated = true; - const topics = [...this.wsStore.getTopics(wsKey)]; - const privateTopics = topics.filter((topic) => - this.isPrivateTopicRequest(topic, wsKey), - ); + if (this.options.authPrivateConnectionsOnConnect) { + const topics = [...this.wsStore.getTopics(wsKey)]; + const privateTopics = topics.filter((topic) => + this.isPrivateTopicRequest(topic, wsKey), + ); + + if (privateTopics.length) { + this.subscribeTopicsForWsKey(privateTopics, wsKey); + } + } - if (privateTopics.length) { - this.subscribeTopicsForWsKey(privateTopics, wsKey); + if (event?.isWSAPI && event?.WSAPIAuthChannel) { + wsState.didAuthWSAPI = true; + wsState.WSAPIAuthChannel = event.WSAPIAuthChannel; } } @@ -716,7 +818,7 @@ export abstract class BaseWebsocketClient< const data = event.data; const dataType = event.type; - const emittableEvents = this.resolveEmittableEvents(event); + const emittableEvents = this.resolveEmittableEvents(wsKey, event); if (!emittableEvents.length) { // console.log(`raw event: `, { data, dataType, emittableEvents }); @@ -750,7 +852,7 @@ export abstract class BaseWebsocketClient< wsKey, }); this.emit(emittable.eventType, { ...emittable.event, wsKey }); - this.onWsAuthenticated(wsKey); + this.onWsAuthenticated(wsKey, emittable.event); continue; } @@ -788,9 +890,19 @@ export abstract class BaseWebsocketClient< if ( this.wsStore.getConnectionState(wsKey) !== WsConnectionStateEnum.CLOSING ) { + // clean up any pending promises for this connection + this.getWsStore().rejectAllDeferredPromises( + wsKey, + 'connection lost, reconnecting', + ); + + this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); + this.reconnectWithDelay(wsKey, this.options.reconnectTimeout!); this.emit('reconnect', { wsKey, event }); } else { + // clean up any pending promises for this connection + this.getWsStore().rejectAllDeferredPromises(wsKey, 'disconnected'); this.setWsState(wsKey, WsConnectionStateEnum.INITIAL); this.emit('close', { wsKey, event }); } @@ -803,4 +915,42 @@ export abstract class BaseWebsocketClient< private setWsState(wsKey: TWSKey, state: WsConnectionStateEnum) { this.wsStore.setConnectionState(wsKey, state); } + + /** + * Promise-driven method to assert that a ws has successfully connected (will await until connection is open) + */ + protected async assertIsConnected(wsKey: TWSKey): Promise { + const isConnected = this.getWsStore().isConnectionState( + wsKey, + WsConnectionStateEnum.CONNECTED, + ); + + if (!isConnected) { + const inProgressPromise = + this.getWsStore().getConnectionInProgressPromise(wsKey); + + // Already in progress? Await shared promise and retry + if (inProgressPromise) { + this.logger.trace( + `sendWSAPIRequest(): Awaiting EXISTING connection promise...`, + ); + await inProgressPromise.promise; + this.logger.trace( + `sendWSAPIRequest(): EXISTING connection promise resolved!`, + ); + return; + } + + // Start connection, it should automatically store/return a promise. + this.logger.trace( + `sendWSAPIRequest(): Not connected yet...queue await connection...`, + ); + + await this.connect(wsKey); + + this.logger.trace( + `sendWSAPIRequest(): New connection promise resolved! `, + ); + } + } } diff --git a/src/lib/websocket/WsStore.ts b/src/lib/websocket/WsStore.ts index 41ae5c5..075b758 100644 --- a/src/lib/websocket/WsStore.ts +++ b/src/lib/websocket/WsStore.ts @@ -1,7 +1,11 @@ import WebSocket from 'isomorphic-ws'; import { DefaultLogger } from '../logger.js'; -import { WsConnectionStateEnum, WsStoredState } from './WsStore.types.js'; +import { + DeferredPromise, + WsConnectionStateEnum, + WsStoredState, +} from './WsStore.types.js'; /** * Simple comparison of two objects, only checks 1-level deep (nested objects won't match) @@ -26,6 +30,13 @@ export function isDeepObjectMatch(object1: unknown, object2: unknown): boolean { return true; } +const DEFERRED_PROMISE_REF = { + CONNECTION_IN_PROGRESS: 'CONNECTION_IN_PROGRESS', +} as const; + +type DeferredPromiseRef = + (typeof DEFERRED_PROMISE_REF)[keyof typeof DEFERRED_PROMISE_REF]; + export class WsStore< WsKey extends string, TWSTopicSubscribeEventArgs extends string | object, @@ -74,6 +85,7 @@ export class WsStore< this.wsState[key] = { subscribedTopics: new Set(), connectionState: WsConnectionStateEnum.INITIAL, + deferredPromiseStore: {}, }; return this.get(key); } @@ -113,6 +125,162 @@ export class WsStore< return wsConnection; } + getDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + ): DeferredPromise | undefined { + const storeForKey = this.get(wsKey); + if (!storeForKey) { + return; + } + + const deferredPromiseStore = storeForKey.deferredPromiseStore; + return deferredPromiseStore[promiseRef]; + } + + createDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + throwIfExists: boolean, + ): DeferredPromise { + const existingPromise = this.getDeferredPromise(wsKey, promiseRef); + if (existingPromise) { + if (throwIfExists) { + throw new Error(`Promise exists for "${wsKey}"`); + } else { + // console.log('existing promise'); + return existingPromise; + } + } + + // console.log('create promise'); + const createIfMissing = true; + const storeForKey = this.get(wsKey, createIfMissing); + + const deferredPromise: DeferredPromise = {}; + + deferredPromise.promise = new Promise((resolve, reject) => { + deferredPromise.resolve = resolve; + deferredPromise.reject = reject; + }); + + const deferredPromiseStore = storeForKey.deferredPromiseStore; + + deferredPromiseStore[promiseRef] = deferredPromise; + + return deferredPromise; + } + + resolveDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + value: unknown, + removeAfter: boolean, + ): void { + const promise = this.getDeferredPromise(wsKey, promiseRef); + if (promise?.resolve) { + promise.resolve(value); + } + if (removeAfter) { + this.removeDeferredPromise(wsKey, promiseRef); + } + } + + rejectDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + value: unknown, + removeAfter: boolean, + ): void { + const promise = this.getDeferredPromise(wsKey, promiseRef); + if (promise?.reject) { + promise.reject(value); + } + if (removeAfter) { + this.removeDeferredPromise(wsKey, promiseRef); + } + } + + removeDeferredPromise( + wsKey: WsKey, + promiseRef: string | DeferredPromiseRef, + ): void { + const storeForKey = this.get(wsKey); + if (!storeForKey) { + return; + } + + const deferredPromise = storeForKey.deferredPromiseStore[promiseRef]; + if (deferredPromise) { + // Just in case it's pending + if (deferredPromise.resolve) { + deferredPromise.resolve('promiseRemoved'); + } + + delete storeForKey.deferredPromiseStore[promiseRef]; + } + } + + rejectAllDeferredPromises(wsKey: WsKey, reason: string): void { + const storeForKey = this.get(wsKey); + const deferredPromiseStore = storeForKey.deferredPromiseStore; + if (!storeForKey || !deferredPromiseStore) { + return; + } + + const reservedKeys = Object.values(DEFERRED_PROMISE_REF) as string[]; + + for (const promiseRef in deferredPromiseStore) { + // Skip reserved keys, such as the connection promise + if (reservedKeys.includes(promiseRef)) { + continue; + } + + try { + this.rejectDeferredPromise(wsKey, promiseRef, reason, true); + } catch (e) { + this.logger.error( + `rejectAllDeferredPromises(): Exception rejecting deferred promise`, + { wsKey: wsKey, reason, promiseRef, exception: e }, + ); + } + } + } + + /** Get promise designed to track a connection attempt in progress. Resolves once connected. */ + getConnectionInProgressPromise( + wsKey: WsKey, + ): DeferredPromise | undefined { + return this.getDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + ); + } + + /** + * Create a deferred promise designed to track a connection attempt in progress. + * + * Will throw if existing promise is found. + */ + createConnectionInProgressPromise( + wsKey: WsKey, + throwIfExists: boolean, + ): DeferredPromise { + return this.createDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + throwIfExists, + ); + } + + /** Remove promise designed to track a connection attempt in progress */ + removeConnectingInProgressPromise(wsKey: WsKey): void { + return this.removeDeferredPromise( + wsKey, + DEFERRED_PROMISE_REF.CONNECTION_IN_PROGRESS, + ); + } + /* connection state */ isWsOpen(key: WsKey): boolean { diff --git a/src/lib/websocket/WsStore.types.ts b/src/lib/websocket/WsStore.types.ts index da0953a..96df534 100644 --- a/src/lib/websocket/WsStore.types.ts +++ b/src/lib/websocket/WsStore.types.ts @@ -9,6 +9,12 @@ export enum WsConnectionStateEnum { // ERROR = 5, } +export interface DeferredPromise { + resolve?: (value: TSuccess) => TSuccess; + reject?: (value: TError) => TError; + promise?: Promise; +} + export interface WsStoredState { /** The currently active websocket connection */ ws?: WebSocket; @@ -20,6 +26,13 @@ export interface WsStoredState { activePongTimer?: ReturnType | undefined; /** If a reconnection is in progress, this will have the timer for the delayed reconnect */ activeReconnectTimer?: ReturnType | undefined; + /** + * When a connection attempt is in progress (even for reconnect), a promise is stored here. + * + * This promise will resolve once connected (and will then get removed); + */ + // connectionInProgressPromise?: DeferredPromise | undefined; + deferredPromiseStore: Record; /** * All the topics we are expected to be subscribed to on this connection (and we automatically resubscribe to if the connection drops) * @@ -28,4 +41,8 @@ export interface WsStoredState { subscribedTopics: Set; /** Whether this connection has completed authentication (only applies to private connections) */ isAuthenticated?: boolean; + /** Whether this connection has completed authentication before for the Websocket API, so it knows to automatically reauth if reconnected */ + didAuthWSAPI?: boolean; + /** To reauthenticate on the WS API, which channel do we send to? */ + WSAPIAuthChannel?: string; } diff --git a/src/types/websockets/client.ts b/src/types/websockets/client.ts index f1f0e7e..9ab10d2 100644 --- a/src/types/websockets/client.ts +++ b/src/types/websockets/client.ts @@ -42,6 +42,11 @@ export interface WSClientConfigurableOptions { * Look in the examples folder for a demonstration on using node's createHmac instead. */ customSignMessageFn?: (message: string, secret: string) => Promise; + + /** + * If you authenticated the WS API before, automatically try to re-authenticate the WS API if you're disconnected/reconnected for any reason. + */ + reauthWSAPIOnReconnect?: boolean; } /** @@ -55,4 +60,5 @@ export interface WebsocketClientOptions extends WSClientConfigurableOptions { recvWindow: number; authPrivateConnectionsOnConnect: boolean; authPrivateRequests: boolean; + reauthWSAPIOnReconnect: boolean; } diff --git a/src/types/websockets/requests.ts b/src/types/websockets/requests.ts index 88c09ed..85b5d1f 100644 --- a/src/types/websockets/requests.ts +++ b/src/types/websockets/requests.ts @@ -1,3 +1,6 @@ +import { CHANNEL_ID } from '../../lib/requestUtils'; +import { WSAPITopic } from './shared'; + export type WsOperation = 'subscribe' | 'unsubscribe' | 'auth'; export interface WsRequestAuthGate { @@ -22,3 +25,20 @@ export interface WsRequestOperationGate< event?: WsOperation; payload?: TWSPayload; } + +export interface WSAPIRequest { + time: number; + id?: number; + channel: WSAPITopic; + event: 'api'; + payload: { + req_id: string; + req_param?: TRequestParams | string; + req_header: { + 'X-Gate-Channel-Id': typeof CHANNEL_ID; + }; + api_key?: string; + signature?: string; + timestamp?: string; + }; +} diff --git a/src/types/websockets/shared.ts b/src/types/websockets/shared.ts new file mode 100644 index 0000000..c24d22d --- /dev/null +++ b/src/types/websockets/shared.ts @@ -0,0 +1,10 @@ +export type SpotWSAPITopic = + | 'spot.login' + | 'spot.order_place' + | 'spot.order_cancel' + | 'spot.order_cancel_ids' + | 'spot.order_cancel_cp' + | 'spot.order_amend' + | 'spot.order_status'; + +export type WSAPITopic = SpotWSAPITopic;