-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathaggregator.py
75 lines (55 loc) · 2.14 KB
/
aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import aioprocessing
from decimal import Decimal
from operator import itemgetter
from typing import Any, Dict, List
def aggregate_cex_orderbooks(orderbooks: Dict[str, Dict[str, Any]]) -> Dict[str, List[List[Decimal]]]:
"""
:param orderbooks:
{
'binance': {'source': 'cex', 'type': 'orderbook', ... },
'okx': {'source': 'cex', 'type': 'orderbook', ... },
}
"""
bids, asks = [], []
for exchange, orderbook in orderbooks.items():
bids.extend([b + [exchange] for b in orderbook['bids']])
asks.extend([a + [exchange] for a in orderbook['asks']])
bids = sorted(bids, key=itemgetter(0), reverse=True)
asks = sorted(asks, key=itemgetter(0))
return {'bids': bids, 'asks': asks}
async def event_handler(event_queue: aioprocessing.AioQueue):
orderbooks = {}
while True:
data = await event_queue.coro_get()
symbol = data['symbol']
if data['source'] == 'cex':
if symbol not in orderbooks:
orderbooks[symbol] = {}
orderbooks[symbol][data['exchange']] = data
multi_orderbook = aggregate_cex_orderbooks(orderbooks[symbol])
print(multi_orderbook)
if __name__ == '__main__':
import asyncio
import nest_asyncio
from functools import partial
from utils import reconnecting_websocket_loop
from cex_streams import stream_binance_usdm_orderbook, stream_okx_usdm_orderbook
nest_asyncio.apply()
symbols = ['ETH/USDT']
event_queue = aioprocessing.AioQueue()
# Testing CEX aggregator
binance_stream = reconnecting_websocket_loop(
partial(stream_binance_usdm_orderbook, symbols, event_queue, False),
tag='binance_stream'
)
okx_stream = reconnecting_websocket_loop(
partial(stream_okx_usdm_orderbook, symbols, event_queue, False),
tag='okx_stream'
)
event_handler_loop = event_handler(event_queue)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
binance_stream,
okx_stream,
event_handler_loop,
]))