From a45f8c7051acb4b0385d4ba685c2e254753199f4 Mon Sep 17 00:00:00 2001 From: wilson Date: Wed, 1 May 2019 00:08:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=96=B0=E7=9A=84select=20be?= =?UTF-8?q?st=20ip=E7=9A=84=E6=96=B9=E6=B3=95=E4=B8=BA=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E9=80=89=E5=8F=96ip=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 可以用参数恢复到默认的方法 --- QUANTAXIS/QAFetch/QATdx.py | 134 +++++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 42 deletions(-) diff --git a/QUANTAXIS/QAFetch/QATdx.py b/QUANTAXIS/QAFetch/QATdx.py index 957062c81..ca9baa69b 100755 --- a/QUANTAXIS/QAFetch/QATdx.py +++ b/QUANTAXIS/QAFetch/QATdx.py @@ -50,6 +50,7 @@ from QUANTAXIS.QAUtil import Parallelism from QUANTAXIS.QAUtil.QACache import QA_util_cache + def init_fetcher(): """初始化获取 """ @@ -100,8 +101,16 @@ def ping(ip, port=7709, type_='stock'): return datetime.timedelta(9, 9, 0) -def select_best_ip(): - QA_util_log_info('Selecting the Best Server IP of TDX') +def select_best_ip(fetch_method='tick', reget=0): + """ + 可选择用哪种方法选择最优ip了,默认采用实际获取股票/期货的实际tick数据获取, + fetch_method=='code'代表获取服务器的code_list来计算耗时 + 在fetch_method==tick的时候,reget==1的话可以重新测试ip耗时 + """ + if fetch_method == 'tick': + QA_util_log_info('Selecting the Best Server IP by tick data fetch') + else: + QA_util_log_info('Selecting the Best Server IP by code list fetch') # 删除exclude ip import json @@ -117,15 +126,21 @@ def select_best_ip(): section='IPLIST', option='exclude', default_value=alist) exclude_from_stock_ip_list(ipexclude) - - ipdefault = qasetting.get_config( - section='IPLIST', option='default', default_value=default_ip) - - ipdefault = eval(ipdefault) if isinstance(ipdefault, str) else ipdefault + ipdefault = dict(stock=dict(ip=None, port=None), + future=dict(ip=None, port=None)) + if not reget: + ipdefault = qasetting.get_config( + section='IPLIST', option='default', default_value=default_ip) + + ipdefault = eval(ipdefault) if isinstance( + ipdefault, str) else ipdefault assert isinstance(ipdefault, dict) + # best_stock_ip if ipdefault['stock']['ip'] == None: - - best_stock_ip = get_ip_list_by_ping(stock_ip_list) + if fetch_method == 'code': + best_stock_ip = get_ip_list_by_ping(stock_ip_list) + else: + best_stock_ip = get_best_ip_by_real_data_fetch('stock') else: if ping(ipdefault['stock']['ip'], ipdefault['stock']['port'], 'stock') < datetime.timedelta(0, 1): print('USING DEFAULT STOCK IP') @@ -134,14 +149,19 @@ def select_best_ip(): print('DEFAULT STOCK IP is BAD, RETESTING') best_stock_ip = get_ip_list_by_ping(stock_ip_list) if ipdefault['future']['ip'] == None: - best_future_ip = get_ip_list_by_ping(future_ip_list, _type='future') + if fetch_method == 'code': + best_future_ip = get_ip_list_by_ping( + future_ip_list, _type='future') + elif fetch_method == 'tick': + best_future_ip = get_best_ip_by_real_data_fetch('future') else: if ping(ipdefault['future']['ip'], ipdefault['future']['port'], 'future') < datetime.timedelta(0, 1): print('USING DEFAULT FUTURE IP') best_future_ip = ipdefault['future'] else: print('DEFAULT FUTURE IP {} is BAD, RETESTING'.format(ipdefault)) - best_future_ip = get_ip_list_by_ping(future_ip_list, _type='future') + best_future_ip = get_ip_list_by_ping( + future_ip_list, _type='future') ipbest = {'stock': best_stock_ip, 'future': best_future_ip} qasetting.set_config( section='IPLIST', option='default', default_value=ipbest) @@ -155,6 +175,7 @@ def get_ip_list_by_ping(ip_list=[], _type='stock'): best_ip = get_ip_list_by_multi_process_ping(ip_list, 1, _type) return best_ip[0] + def get_best_ip_by_real_data_fetch(_type='stock'): """ 用特定的数据获取函数测试数据获得的时间,从而选择下载数据最快的服务器ip @@ -162,49 +183,57 @@ def get_best_ip_by_real_data_fetch(_type='stock'): """ from QUANTAXIS.QAUtil.QADate import QA_util_today_str import time - - #找到前两天的有效交易日期 - pre_trade_date=QA_util_get_real_date(QA_util_today_str()) - pre_trade_date=QA_util_get_real_date(pre_trade_date) - + + # 找到前两天的有效交易日期 + pre_trade_date = QA_util_get_real_date(QA_util_today_str()) + pre_trade_date = QA_util_get_real_date(pre_trade_date) + # 某个函数获取的耗时测试 def get_stock_data_by_ip(ips): - start=time.time() + api = TdxHq_API() + start = time.time() try: - QA_fetch_get_stock_transaction('000001',pre_trade_date,pre_trade_date,2,ips['ip'],ips['port']) - end=time.time() - return end-start + with api.connect(ips['ip'], ips['port'], time_out=0.7): + # 加个可以timeout的方法(但是期货好像不适用) + res = QA_fetch_get_stock_transaction( + '000001', pre_trade_date, pre_trade_date, 2, ips['ip'], ips['port']) + end = time.time() + return 9999 if res is None else end-start except: return 9999 def get_future_data_by_ip(ips): - start=time.time() + apix = TdxExHaq_API() + start = time.time() try: - QA_fetch_get_future_transaction('RBL8',pre_trade_date,pre_trade_date,2,ips['ip'],ips['port']) - end=time.time() - return end-start + res = QA_fetch_get_future_transaction( + 'RBL8', pre_trade_date, pre_trade_date, 2, ips['ip'], ips['port']) + end = time.time() + return 9999 if res is None else end-start except: return 9999 - func,ip_list=0,0 - if _type=='stock': - func,ip_list=get_stock_data_by_ip,stock_ip_list + func, ip_list = 0, 0 + if _type == 'stock': + func, ip_list = get_stock_data_by_ip, stock_ip_list else: - func,ip_list=get_future_data_by_ip,future_ip_list + func, ip_list = get_future_data_by_ip, future_ip_list from pathos.multiprocessing import Pool - def multiMap(func,sequence): - res=[] - pool=Pool(4) + + def multiMap(func, sequence): + res = [] + pool = Pool(4) for i in sequence: - res.append(pool.apply_async(func,(i,))) + res.append(pool.apply_async(func, (i,))) pool.close() pool.join() - return list(map(lambda x:x.get(),res)) - - res=multiMap(func,ip_list) - index=res.index(min(res)) + return list(map(lambda x: x.get(), res)) + + res = multiMap(func, ip_list) + index = res.index(min(res)) return ip_list[index] + def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'): ''' 根据ping排序返回可用的ip列表 2019 03 31 取消参数filename @@ -234,8 +263,8 @@ def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'): results = [x[1] for x in sorted(results, key=lambda x: x[0])] if _type: # store the data as binary data stream - cache.set(_type, results, age=86400) - print('saving ip list to {} cache {}.'.format(_type, len(results))) + cache.set(_type, results, age=86400) + print('saving ip list to {} cache {}.'.format(_type, len(results))) if len(results) > 0: if n == 0 and len(results) > 0: return results @@ -245,6 +274,7 @@ def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'): print('ALL IP PING TIMEOUT!') return [{'ip': None, 'port': None}] + global best_ip best_ip = { 'stock': { @@ -262,7 +292,7 @@ def get_ip_list_by_multi_process_ping(ip_list=[], n=0, _type='stock'): def get_extensionmarket_ip(ip, port): global best_ip if ip is None and port is None and best_ip['future']['ip'] is None and best_ip['future']['port'] is None: - best_ip = select_best_ip() + best_ip = select_best_ip(reget=1) ip = best_ip['future']['ip'] port = best_ip['future']['port'] elif ip is None and port is None and best_ip['future']['ip'] is not None and best_ip['future']['port'] is not None: @@ -286,7 +316,7 @@ def get_mainmarket_ip(ip, port): global best_ip if ip is None and port is None and best_ip['stock']['ip'] is None and best_ip['stock']['port'] is None: - best_ip = select_best_ip() + best_ip = select_best_ip(reget=1) ip = best_ip['stock']['ip'] port = best_ip['stock']['port'] elif ip is None and port is None and best_ip['stock']['ip'] is not None and best_ip['stock']['port'] is not None: @@ -296,6 +326,7 @@ def get_mainmarket_ip(ip, port): pass return ip, port + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_security_bars(code, _type, lens, ip=None, port=None): """按bar长度推算数据 @@ -332,6 +363,7 @@ def QA_fetch_get_security_bars(code, _type, lens, ip=None, port=None): else: return None + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_day(code, start_date, end_date, if_fq='00', frequence='day', ip=None, port=None): """获取日线及以上级别的数据 @@ -408,6 +440,7 @@ def QA_fetch_get_stock_day(code, start_date, end_date, if_fq='00', frequence='da else: print(e) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_min(code, start, end, frequence='1min', ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -448,6 +481,7 @@ def QA_fetch_get_stock_min(code, start, end, frequence='1min', ip=None, port=Non type=type_).set_index('datetime', drop=False, inplace=False)[start:end] return data.assign(datetime=data['datetime'].apply(lambda x: str(x))) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_latest(code, frequence='day', ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -485,6 +519,7 @@ def QA_fetch_get_stock_latest(code, frequence='day', ip=None, port=None): .set_index('date', drop=False) \ .drop(['year', 'month', 'day', 'hour', 'minute', 'datetime'], axis=1) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_realtime(code=['000001', '000002'], ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -512,6 +547,7 @@ def QA_fetch_get_stock_realtime(code=['000001', '000002'], ip=None, port=None): 'ask_vol4', 'bid4', 'bid_vol4', 'ask5', 'ask_vol5', 'bid5', 'bid_vol5']] return data.set_index(['datetime', 'code']) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_depth_market_data(code=['000001', '000002'], ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -631,6 +667,7 @@ def for_sh(code): else: return 'undefined' + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_list(type_='stock', ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -668,6 +705,7 @@ def QA_fetch_get_stock_list(type_='stock', ip=None, port=None): # .assign(szm=data['name'].apply(lambda x: ''.join([y[0] for y in lazy_pinyin(x)])))\ # .assign(quanpin=data['name'].apply(lambda x: ''.join(lazy_pinyin(x)))) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_index_list(ip=None, port=None): """获取指数列表 @@ -696,6 +734,7 @@ def QA_fetch_get_index_list(ip=None, port=None): return pd.concat([sz, sh]).query('sec=="index_cn"').sort_index().assign( name=data['name'].apply(lambda x: str(x)[0:6])) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_bond_list(ip=None, port=None): """bond @@ -719,6 +758,7 @@ def QA_fetch_get_bond_list(ip=None, port=None): return pd.concat([sz, sh]).query('sec=="bond_cn"').sort_index().assign( name=data['name'].apply(lambda x: str(x)[0:6])) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_bond_day(code, start_date, end_date, frequence='day', ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -768,6 +808,7 @@ def QA_fetch_get_bond_day(code, start_date, end_date, frequence='day', ip=None, 'minute', 'datetime'], axis=1)[start_date:end_date] return data.assign(date=data['date'].apply(lambda x: str(x)[0:10])) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_index_day(code, start_date, end_date, frequence='day', ip=None, port=None): """指数日线 @@ -822,6 +863,7 @@ def QA_fetch_get_index_day(code, start_date, end_date, frequence='day', ip=None, 'minute', 'datetime'], axis=1)[start_date:end_date] return data.assign(date=data['date'].apply(lambda x: str(x)[0:10])) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_index_min(code, start, end, frequence='1min', ip=None, port=None): '指数分钟线' @@ -871,6 +913,7 @@ def QA_fetch_get_index_min(code, start, end, frequence='1min', ip=None, port=Non # data return data.assign(datetime=data['datetime'].apply(lambda x: str(x))) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_index_latest(code, frequence='day', ip=None, port=None): ip, port = get_mainmarket_ip(ip, port) @@ -902,9 +945,11 @@ def QA_fetch_get_index_latest(code, frequence='day', ip=None, port=None): data = [] for item in code: if str(item)[0] in ['5', '1']: # ETF - data.append(api.to_df(api.get_security_bars(frequence, 1 if str(item)[0] in ['0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item)) + data.append(api.to_df(api.get_security_bars(frequence, 1 if str(item)[0] in [ + '0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item)) else: - data.append(api.to_df(api.get_index_bars(frequence, 1 if str(item)[0] in ['0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item)) + data.append(api.to_df(api.get_index_bars(frequence, 1 if str(item)[0] in [ + '0', '8', '9', '5'] else 0, item, 0, 1)).assign(code=item)) data = pd.concat(data, axis=0) return data \ .assign(date=pd.to_datetime(data['datetime'] @@ -937,6 +982,7 @@ def __QA_fetch_get_stock_transaction(code, day, retry, api): .assign(code=str(code)).assign(order=range(len(data_.index))).set_index('datetime', drop=False, inplace=False) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_transaction(code, start, end, retry=2, ip=None, port=None): ''' @@ -979,6 +1025,7 @@ def QA_fetch_get_stock_transaction(code, start, end, retry=2, ip=None, port=None else: return None + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_transaction_realtime(code, ip=None, port=None): '实时分笔成交 包含集合竞价 buyorsell 1--sell 0--buy 2--盘前' @@ -1000,6 +1047,7 @@ def QA_fetch_get_stock_transaction_realtime(code, ip=None, port=None): except: return None + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_xdxr(code, ip=None, port=None): '除权除息' @@ -1026,6 +1074,7 @@ def QA_fetch_get_stock_xdxr(code, ip=None, port=None): else: return None + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_info(code, ip=None, port=None): '股票基本信息' @@ -1035,6 +1084,7 @@ def QA_fetch_get_stock_info(code, ip=None, port=None): with api.connect(ip, port): return api.to_df(api.get_finance_info(market_code, code)) + @retry(stop_max_attempt_number=3, wait_random_min=50, wait_random_max=100) def QA_fetch_get_stock_block(ip=None, port=None): '板块数据'