Skip to content

Commit

Permalink
Merge pull request #1146 from yutiansut/master
Browse files Browse the repository at this point in the history
新增一个异步线程执行器 超级牛逼 QA_AsyncThread
  • Loading branch information
yutiansut authored Apr 30, 2019
2 parents 7bfe830 + a329037 commit 3d4d078
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 72 deletions.
111 changes: 52 additions & 59 deletions QUANTAXIS/Exp/test_async.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from QUANTAXIS.QAEngine.asyncexec import QA_AsyncExec


#from QUANTAXIS.QAEngine.asyncexec import QA_AsyncExec
from QUANTAXIS.QAEngine.QAAsyncThread import QA_AsyncThread

"""这里展示了如何超级简便的使用QA异步执行线程
Expand All @@ -19,80 +18,74 @@
CEP的基础
"""
class job1(QA_AsyncExec):
def do(self):


class job1(QA_AsyncThread):
def do(self, event):
try:
x=self.queue.get()
print('job1 do {}'.format(x))
print('job1 do {}'.format(event))
except:
pass

class job2(QA_AsyncExec):
def do(self):

class job2(QA_AsyncThread):
def do(self, event):
try:
x=self.queue.get()
print('job2 do {}'.format(x))
print('job2 do {}'.format(event))
except:
pass


j1 = job1()
j2 = job2()

print(j1)
print(j2)
j1.start()
j2.start()


for i in range(100):

j1.put(i)
j2.put(i)

"""
λ python .\QUANTAXIS\Exp\test_async.py
job2 do 0
job1 do 0
job1 do 1
job1 do 2
job2 do 1
job2 do 2
job1 do 3
job2 do 3
job1 do 4
job2 do 4
job1 do 5
job1 do 6
job2 do 5
job1 do 7
job1 do 8
job2 do 6
job1 do 9
job2 do 7
job1 do 10
job2 do 8
job2 do 9
job1 do 11
job1 do 12
job2 do 10
job1 do 13
job2 do 11
job1 do 14
job1 do 15
job2 do 12
job1 do 16
job2 do 13
job1 do 17
job2 do 14
job1 do 18
job2 do 15
job1 do 19
job1 do 20
job2 do 16
job1 do 21
job2 do 17
job1 do 22
job2 do 18
job2 do 19
job1 do 23
job2 do 20
job2 do 21
<QA_AsyncThread: QA_AsyncThread_SRq id=1872684598216 ident None>
<QA_AsyncThread: QA_AsyncThread_2tf id=1872684555904 ident None>
start
start
job1 do < QA_Event None None False , id = 1872759171784 >
job2 do < QA_Event None None False , id = 1872759172848 >
job1 do < QA_Event None None False , id = 1872759243216 >
job2 do < QA_Event None None False , id = 1872759243944 >
job1 do < QA_Event None None False , id = 1872759244896 >
job2 do < QA_Event None None False , id = 1872759245512 >
job1 do < QA_Event None None False , id = 1872759245176 >
job2 do < QA_Event None None False , id = 1872759246240 >
job1 do < QA_Event None None False , id = 1872759244056 >
job2 do < QA_Event None None False , id = 1872759244952 >
job1 do < QA_Event None None False , id = 1872759301736 >
job2 do < QA_Event None None False , id = 1872759172904 >
job1 do < QA_Event None None False , id = 1872759171784 >
job2 do < QA_Event None None False , id = 1872759303696 >
job1 do < QA_Event None None False , id = 1872759244224 >
job2 do < QA_Event None None False , id = 1872759370528 >
job1 do < QA_Event None None False , id = 1872759371032 >
job2 do < QA_Event None None False , id = 1872759172848 >
job1 do < QA_Event None None False , id = 1872759373496 >
job2 do < QA_Event None None False , id = 1872759373048 >
job1 do < QA_Event None None False , id = 1872759243944 >
job2 do < QA_Event None None False , id = 1872759371592 >
job1 do < QA_Event None None False , id = 1872759246240 >
job2 do < QA_Event None None False , id = 1872759244056 >
job1 do < QA_Event None None False , id = 1872759438360 >
job2 do < QA_Event None None False , id = 1872759244952 >
job1 do < QA_Event None None False , id = 1872760546640 >
job2 do < QA_Event None None False , id = 1872759438976 >
job1 do < QA_Event None None False , id = 1872759172904 >
job2 do < QA_Event None None False , id = 1872759171784 >
job1 do < QA_Event None None False , id = 1872759303696 >
job2 do < QA_Event None None False , id = 1872759244224 >
.........
"""
"""
79 changes: 79 additions & 0 deletions QUANTAXIS/QAEngine/QAAsyncThread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import threading
from functools import wraps

from janus import Queue as QAAsyncQueue

from QUANTAXIS.QAEngine.QAEvent import QA_Event
from QUANTAXIS.QAUtil import QA_util_log_info, QA_util_random_with_topic


class QA_AsyncThread(threading.Thread):
_loop = asyncio.new_event_loop()
_queue: QAAsyncQueue = QAAsyncQueue(loop=_loop)

def __init__(self, name=None, *args, **kwargs):
super().__init__(*args, **kwargs)

self._stopped = False
self._main_loop = self.get_event_loop
self.name = QA_util_random_with_topic(
topic='QA_AsyncThread',
lens=3
) if name is None else name

def __repr__(self):
return '<QA_AsyncThread: {} id={} ident {}>'.format(
self.name,
id(self),
self.ident
)

@property
def queue(self):
return self._queue.async_q

def run(self):
asyncio.new_event_loop().run_until_complete(self.main())

async def event_hadler(self, event):
self.do(event)

def do(self, event):
raise NotImplementedError('QA ASYNCTHREAD 需要重载do方法')

def put(self, event):

event = event if isinstance(event, QA_Event) else QA_Event(
data=event, event_type=None)
self.queue.put_nowait(event)

def put_nowait(self, event):
self.put(event)

def stop(self):
self._stopped = True

def set_main_event_loop(self, loop):
self._main_loop = loop

def get_event_loop(self):
return self._loop

async def main(self):
print('start')
async_q = self._queue.async_q
main_loop = asyncio.get_event_loop()
while not (self._stopped and async_q.empty()):

try:
event = self.queue.get_nowait()
except asyncio.QueueEmpty:
pass
else:
asyncio.run_coroutine_threadsafe(
self.event_hadler(event),
main_loop
)
async_q.task_done()
await asyncio.sleep(0.0001)
74 changes: 62 additions & 12 deletions QUANTAXIS/QAEngine/asyncexec.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
import asyncio
import random
import time

import sys
import threading
from QUANTAXIS.QAEngine.asyncschedule import create_scheduler
from QUANTAXIS.QAEngine.QAThreadEngine import QA_Thread
from QUANTAXIS.QAEngine.QAEvent import QA_Event
from QUANTAXIS.QAEngine.QAAsyncThread import QA_AsyncThread, QAAsyncEMS

import asyncio
import janus



class QA_AsyncExec(QA_Thread):
async def coro(self,timeout=0.01):
class QA_AsyncExec1(QA_Thread):
# sys.setrecursionlimit(10000)
async def coro(self, timeout=0.01):
z = timeout*random.randint(2, 10)
await asyncio.sleep(z)

self.do()

await self.coro(timeout)


async def main(self):
scheduler = await create_scheduler()
# for i in range(2):
# # spawn jobs
await scheduler.spawn(self.coro(1/100))
await asyncio.sleep(100000000.0)
await scheduler.spawn(self.coro(0.001))

await asyncio.sleep(60*60*24)
await scheduler.close()



def do(self):
try:
event = self.queue.get()
Expand All @@ -35,3 +38,50 @@ def do(self):
def run(self):
asyncio.new_event_loop().run_until_complete(self.main())


class QA_AsyncExec(threading.Thread):
asyncEMS= QAAsyncEMS()
asyncThread = QA_AsyncThread(asyncEMS)


@asyncEMS.register(QA_Event)
async def event_hadler(self, event):
self.do(event)

def put(self, event):

event = event if isinstance(event, QA_Event) else QA_Event(data=event, event_type=None)

future = asyncio.run_coroutine_threadsafe(
self.asyncThread.queue.put(event),
asyncio.get_event_loop()
)
future.result() # wait for the event to be saved in the queue

def put_nowait(self, event):

event = event if isinstance(event, QA_Event) else QA_Event(data=event, event_type=None)

future = asyncio.run_coroutine_threadsafe(
self.asyncThread.queue.put_nowait(event),
worker.get_event_loop()
)
future.result() # wait for the event to be saved in the queue

async def main(self):
self.asyncThread.set_main_event_loop(asyncio.get_event_loop())
#threading.Thread(target=self.asyncThread.start).start()
self.asyncThread.start()
await asyncio.sleep(60*60*24)

self.asyncThread.stop()
self.asyncThread.join()

def do(self, event):
try:
print(event)
except:
pass

def run(self):
asyncio.new_event_loop().run_until_complete(self.main())
2 changes: 1 addition & 1 deletion QUANTAXIS/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
2017/4/8
"""

__version__ = '1.4.6'
__version__ = '1.4.7'
__author__ = 'yutiansut'

# fetch methods
Expand Down

0 comments on commit 3d4d078

Please sign in to comment.