-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathcex_streams.py
108 lines (92 loc) · 3.62 KB
/
cex_streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import json
import asyncio
import requests
import websockets
import aioprocessing
from typing import List
from decimal import Decimal
# Binance USDM-Futures orderbook stream
async def stream_binance_usdm_orderbook(symbols: List[str],
event_queue: aioprocessing.AioQueue,
debug: bool = False):
async with websockets.connect('wss://fstream.binance.com/ws/') as ws:
params = [
f'{s.replace("/", "").lower()}@depth5@100ms' for s in symbols]
subscription = {
'method': 'SUBSCRIBE',
'params': params,
'id': 1,
}
await ws.send(json.dumps(subscription))
_ = await ws.recv()
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=15)
data = json.loads(msg)
orderbook = {
'source': 'cex',
'type': 'orderbook',
'exchange': 'binance',
'symbol': data['s'],
'bids': [[Decimal(d[0]), Decimal(d[1])] for d in data['b']],
'asks': [[Decimal(d[0]), Decimal(d[1])] for d in data['a']],
}
if not debug:
event_queue.put(orderbook)
else:
print(orderbook)
# OKX USDM-Futures orderbook stream
# At OKX, they call perpetuals by the name of swaps.
async def stream_okx_usdm_orderbook(symbols: List[str],
event_queue: aioprocessing.AioQueue,
debug: bool = False):
instruments = requests.get('https://www.okx.com/api/v5/public/instruments?instType=SWAP').json()
multipliers = {
d['instId'].replace('USD', 'USDT'): Decimal(d['ctMult']) / Decimal(d['ctVal'])
for d in instruments['data']
}
async with websockets.connect('wss://ws.okx.com:8443/ws/v5/public') as ws:
args = [{'channel': 'books5', 'instId': f'{s.replace("/", "-")}-SWAP'} for s in symbols]
subscription = {
'op': 'subscribe',
'args': args,
}
await ws.send(json.dumps(subscription))
_ = await ws.recv()
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=15)
data = json.loads(msg)
multiplier = multipliers[data['arg']['instId']]
symbol = data['arg']['instId'].replace('-SWAP', '').replace('-', '')
bids = [[Decimal(d[0]), Decimal(d[1]) * multiplier] for d in data['data'][0]['bids']]
asks = [[Decimal(d[0]), Decimal(d[1]) * multiplier] for d in data['data'][0]['asks']]
orderbook = {
'source': 'cex',
'type': 'orderbook',
'exchange': 'okx',
'symbol': symbol,
'bids': bids,
'asks': asks,
}
if not debug:
event_queue.put(orderbook)
else:
print(orderbook)
if __name__ == '__main__':
import nest_asyncio
from functools import partial
from utils import reconnecting_websocket_loop
nest_asyncio.apply()
symbols = ['ETH/USDT']
binance_stream = reconnecting_websocket_loop(
partial(stream_binance_usdm_orderbook, symbols, None, True),
tag='binance_stream'
)
okx_stream = reconnecting_websocket_loop(
partial(stream_okx_usdm_orderbook, symbols, None, True),
tag='okx_stream'
)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
binance_stream,
okx_stream,
]))