Skip to content

Commit

Permalink
V0.9.62 更新一批代码 (#220)
Browse files Browse the repository at this point in the history
* 0.9.62 start coding

* 0.9.62 test rs_czsc

* 0.9.62 增加增量缓存数据函数

* 0.9.62 增加增量缓存数据函数

* 0.9.62 优化 streamlit 组件

* 0.9.62 新增 show_weight_distribution 组件

* 0.9.62 新增 CZSC_DATA_API 环境变量

* 0.9.62 rs_czsc

* 0.9.62 rs_czsc

* 0.9.62 不再支持 python 3.7

* 0.9.62 不再支持 python 3.7

* 0.9.62 remove pandas_ta

* 0.9.62 remove pandas_ta

* 0.9.62 fix bug

* 0.9.62 新增K线质量检查工具

* 0.9.62 update limit leverage

* 0.9.62 新增 clickhouse weights client

* 0.9.62 fix check_kline_quality

* 0.9.62 fix Optional

* 0.9.62 fix Optional

* 0.9.62 fix Optional
  • Loading branch information
zengbin93 authored Jan 1, 2025
1 parent 66dfdc1 commit 8daa121
Show file tree
Hide file tree
Showing 14 changed files with 1,574 additions and 132 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: Python package

on:
push:
branches: [ master, 'V0.9.61' ]
branches: [ master, 'V0.9.62' ]
pull_request:
branches: [ master ]

Expand All @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9, '3.10', '3.11']
python-version: [3.8, 3.9, '3.10', '3.11', '3.12']

steps:
- uses: actions/checkout@v2
Expand All @@ -30,7 +30,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install lxml==4.9.2
pip install lxml
pip install -r requirements.txt
- name: Lint with flake8
run: |
Expand Down
14 changes: 5 additions & 9 deletions czsc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
show_classify,
show_df_describe,
show_date_effect,
show_weight_distribution,
)

from czsc.utils.bi_info import (
Expand Down Expand Up @@ -196,13 +197,8 @@
)


from czsc.utils.kline_quality import (
check_high_low,
check_price_gap,
check_abnormal_volume,
check_zero_volume,
)

from czsc.utils.kline_quality import check_kline_quality
from czsc.traders import cwc

from czsc.utils.portfolio import (
max_sharp,
Expand All @@ -225,10 +221,10 @@
)


__version__ = "0.9.61"
__version__ = "0.9.62"
__author__ = "zengbin93"
__email__ = "[email protected]"
__date__ = "20241101"
__date__ = "20241208"


def welcome():
Expand Down
185 changes: 182 additions & 3 deletions czsc/connectors/cooperation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
email: [email protected]
create_dt: 2023/11/15 20:45
describe: CZSC开源协作团队内部使用数据接口
接口说明:https://s0cqcxuy3p.feishu.cn/wiki/StQbwOrWdiJPpikET9EcrRVEnrd
"""
import os
import time
import czsc
import requests
import loguru
import pandas as pd
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from czsc import RawBar, Freq

# 首次使用需要打开一个python终端按如下方式设置 token或者在环境变量中设置 CZSC_TOKEN
# czsc.set_url_token(token='your token', url='http://zbczsc.com:9106')

cache_path = os.getenv("CZSC_CACHE_PATH", os.path.expanduser("~/.quant_data_cache"))
dc = czsc.DataClient(token=os.getenv("CZSC_TOKEN"), url="http://zbczsc.com:9106", cache_path=cache_path)
url = os.getenv("CZSC_DATA_API", "http://zbczsc.com:9106")
dc = czsc.DataClient(token=os.getenv("CZSC_TOKEN"), url=url, cache_path=cache_path)


def get_groups():
Expand Down Expand Up @@ -315,3 +316,181 @@ def get_stk_strategy(name="STK_001", **kwargs):
dfw = pd.merge(dfw, dfb, on=["dt", "symbol"], how="left")
dfh = dfw[["dt", "symbol", "weight", "n1b"]].copy()
return dfh


# ======================================================================================================================
# 增量更新本地缓存数据
# ======================================================================================================================
def get_all_strategies(ttl=3600 * 24 * 7, logger=loguru.logger, path=cache_path):
"""获取所有策略的元数据
:param ttl: int, optional, 缓存时间,单位秒,默认为 7 天
:param logger: loguru.logger, optional, 日志记录器
:param path: str, optional, 缓存路径
:return: pd.DataFrame, 包含字段 name, description, author, base_freq, outsample_sdt;示例如下:
=========== ===================== ========= ========= ============
name description author base_freq outsample_sdt
=========== ===================== ========= ========= ============
STK_001 A股选股策略 ZB 1分钟 20220101
STK_002 A股选股策略 ZB 1分钟 20220101
STK_003 A股选股策略 ZB 1分钟 20220101
=========== ===================== ========= ========= ============
"""
path = Path(path) / "strategy"
path.mkdir(exist_ok=True, parents=True)
file_metas = path / "metas.feather"

if file_metas.exists() and (time.time() - file_metas.stat().st_mtime) < ttl:
logger.info("【缓存命中】获取所有策略的元数据")
dfm = pd.read_feather(file_metas)

else:
logger.info("【全量刷新】获取所有策略的元数据并刷新缓存")
dfm = dc.get_all_strategies(v=2, ttl=0)
dfm.to_feather(file_metas)

return dfm


def __update_strategy_dailys(file_cache, strategy, logger=loguru.logger):
"""更新策略的日收益数据"""
# 刷新缓存数据
if file_cache.exists():
df = pd.read_feather(file_cache)

cache_sdt = (df["dt"].max() - pd.Timedelta(days=3)).strftime("%Y%m%d")
cache_edt = (pd.Timestamp.now() + pd.Timedelta(days=1)).strftime("%Y%m%d")
logger.info(f"【增量刷新缓存】获取策略 {strategy} 的日收益数据:{cache_sdt} - {cache_edt}")

dfc = dc.sub_strategy_dailys(strategy=strategy, v=2, sdt=cache_sdt, edt=cache_edt, ttl=0)
dfc["dt"] = pd.to_datetime(dfc["dt"])
df = pd.concat([df, dfc]).drop_duplicates(["dt", "symbol", "strategy"], keep="last")

else:
cache_edt = (pd.Timestamp.now() + pd.Timedelta(days=1)).strftime("%Y%m%d")
logger.info(f"【全量刷新缓存】获取策略 {strategy} 的日收益数据:20170101 - {cache_edt}")
df = dc.sub_strategy_dailys(strategy=strategy, v=2, sdt="20170101", edt=cache_edt, ttl=0)

df = df.reset_index(drop=True)
df["dt"] = pd.to_datetime(df["dt"])
df.to_feather(file_cache)
return df


def get_strategy_dailys(
strategy="FCS001", symbol=None, sdt="20240101", edt=None, logger=loguru.logger, path=cache_path
):
"""获取策略的历史日收益数据
:param strategy: 策略名称
:param symbol: 品种名称
:param sdt: 开始时间
:param edt: 结束时间
:param logger: loguru.logger, optional, 日志记录器
:param path: str, optional, 缓存路径
:return: pd.DataFrame, 包含字段 dt, symbol, strategy, returns;示例如下:
=================== ========== ======== =========
dt strategy symbol returns
=================== ========== ======== =========
2017-01-10 00:00:00 STK_001 A股选股 0.001
2017-01-11 00:00:00 STK_001 A股选股 0.012
2017-01-12 00:00:00 STK_001 A股选股 0.011
=================== ========== ======== =========
"""
path = Path(path) / "strategy" / "dailys"
path.mkdir(exist_ok=True, parents=True)
file_cache = path / f"{strategy}.feather"

if edt is None:
edt = pd.Timestamp.now().strftime("%Y%m%d %H:%M:%S")

# 判断缓存数据是否能满足需求
if file_cache.exists():
df = pd.read_feather(file_cache)

if df["dt"].max() >= pd.Timestamp(edt):
logger.info(f"【缓存命中】获取策略 {strategy} 的日收益数据:{sdt} - {edt}")

dfd = df[(df["dt"] >= pd.Timestamp(sdt)) & (df["dt"] <= pd.Timestamp(edt))].copy()
if symbol:
dfd = dfd[dfd["symbol"] == symbol].copy()
return dfd

# 刷新缓存数据
logger.info(f"【缓存刷新】获取策略 {strategy} 的日收益数据:{sdt} - {edt}")
df = __update_strategy_dailys(file_cache, strategy, logger=logger)
dfd = df[(df["dt"] >= pd.Timestamp(sdt)) & (df["dt"] <= pd.Timestamp(edt))].copy()
if symbol:
dfd = dfd[dfd["symbol"] == symbol].copy()
return dfd


def __update_strategy_weights(file_cache, strategy, logger=loguru.logger):
"""更新策略的持仓权重数据"""
# 刷新缓存数据
if file_cache.exists():
df = pd.read_feather(file_cache)

cache_sdt = (df["dt"].max() - pd.Timedelta(days=3)).strftime("%Y%m%d")
cache_edt = (pd.Timestamp.now() + pd.Timedelta(days=1)).strftime("%Y%m%d")
logger.info(f"【增量刷新缓存】获取策略 {strategy} 的持仓权重数据:{cache_sdt} - {cache_edt}")

dfc = dc.post_request(api_name=strategy, v=2, sdt=cache_sdt, edt=cache_edt, hist=1, ttl=0)
dfc["dt"] = pd.to_datetime(dfc["dt"])
dfc["strategy"] = strategy

df = pd.concat([df, dfc]).drop_duplicates(["dt", "symbol", "weight"], keep="last")

else:
cache_edt = (pd.Timestamp.now() + pd.Timedelta(days=1)).strftime("%Y%m%d")
logger.info(f"【全量刷新缓存】获取策略 {strategy} 的持仓权重数据:20170101 - {cache_edt}")
df = dc.post_request(api_name=strategy, v=2, sdt="20170101", edt=cache_edt, hist=1, ttl=0)
df["dt"] = pd.to_datetime(df["dt"])
df["strategy"] = strategy

df = df.reset_index(drop=True)
df.to_feather(file_cache)
return df


def get_strategy_weights(strategy="FCS001", sdt="20240101", edt=None, logger=loguru.logger, path=cache_path):
"""获取策略的历史持仓权重数据
:param strategy: 策略名称
:param sdt: 开始时间
:param edt: 结束时间
:param logger: loguru.logger, optional, 日志记录器
:param path: str, optional, 缓存路径
:return: pd.DataFrame, 包含字段 dt, symbol, weight, update_time, strategy;示例如下:
=================== ========= ======== =================== ==========
dt symbol weight update_time strategy
=================== ========= ======== =================== ==========
2017-01-09 00:00:00 000001.SZ 0 2024-07-27 16:13:29 STK_001
2017-01-10 00:00:00 000001.SZ 0 2024-07-27 16:13:29 STK_001
2017-01-11 00:00:00 000001.SZ 0 2024-07-27 16:13:29 STK_001
=================== ========= ======== =================== ==========
"""
path = Path(path) / "strategy" / "weights"
path.mkdir(exist_ok=True, parents=True)
file_cache = path / f"{strategy}.feather"

if edt is None:
edt = pd.Timestamp.now().strftime("%Y%m%d %H:%M:%S")

# 判断缓存数据是否能满足需求
if file_cache.exists():
df = pd.read_feather(file_cache)

if df["dt"].max() >= pd.Timestamp(edt):
logger.info(f"【缓存命中】获取策略 {strategy} 的历史持仓权重数据:{sdt} - {edt}")
dfd = df[(df["dt"] >= pd.Timestamp(sdt)) & (df["dt"] <= pd.Timestamp(edt))].copy()
return dfd

# 刷新缓存数据
logger.info(f"【缓存刷新】获取策略 {strategy} 的历史持仓权重数据:{sdt} - {edt}")
df = __update_strategy_weights(file_cache, strategy, logger=logger)
dfd = df[(df["dt"] >= pd.Timestamp(sdt)) & (df["dt"] <= pd.Timestamp(edt))].copy()
return dfd
28 changes: 22 additions & 6 deletions czsc/eda.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,22 +554,38 @@ def limit_leverage(df: pd.DataFrame, leverage: float = 1.0, **kwargs):
- window: int, 滚动窗口,默认为 300
- min_periods: int, 最小样本数,小于该值的窗口不计算均值,默认为 50
- weight: str, 权重列名,默认为 'weight'
- method: str, 计算均值的方法,'abs_mean' 或 'abs_max',默认为 'abs_mean'
abs_mean: 计算绝对均值作为调整杠杆的标准
abs_max: 计算绝对最大值作为调整杠杆的标准
:return: DataFrame
"""
window = kwargs.get("window", 300)
min_periods = kwargs.get("min_periods", 50)
weight = kwargs.get("weight", "weight")
method = kwargs.get("method", "abs_mean")

assert weight in df.columns, f"数据中不包含权重列 {weight}"
assert df['symbol'].nunique() == 1, "数据中包含多个品种,必须单品种"
assert df['dt'].is_monotonic_increasing, "数据未按日期排序,必须升序排列"
assert df['dt'].is_unique, "数据中存在重复dt,必须唯一"

if kwargs.get("copy", False):
df = df.copy()

abs_mean = df[weight].abs().rolling(window=window, min_periods=min_periods).mean().fillna(leverage)
adjust_ratio = leverage / abs_mean
df[weight] = (df[weight] * adjust_ratio).clip(-leverage, leverage)
df = df.sort_values(["dt", "symbol"], ascending=True).reset_index(drop=True)

for symbol in df['symbol'].unique():
dfx = df[df['symbol'] == symbol].copy()
# assert dfx['dt'].is_monotonic_increasing, f"{symbol} 数据未按日期排序,必须升序排列"
assert dfx['dt'].is_unique, f"{symbol} 数据中存在重复dt,必须唯一"

if method == "abs_mean":
bench = dfx[weight].abs().rolling(window=window, min_periods=min_periods).mean().fillna(leverage)
elif method == "abs_max":
bench = dfx[weight].abs().rolling(window=window, min_periods=min_periods).max().fillna(leverage)
else:
raise ValueError(f"不支持的 method: {method}")

adjust_ratio = leverage / bench
df.loc[df['symbol'] == symbol, weight] = (dfx[weight] * adjust_ratio).clip(-leverage, leverage)

return df

2 changes: 1 addition & 1 deletion czsc/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def init_bar_generator(self, bars: List[RawBar], **kwargs):
:return:
"""
base_freq = str(bars[0].freq.value)
bg: BarGenerator = kwargs.get("bg", None)
bg: BarGenerator = kwargs.pop("bg", None)
freqs = self.sorted_freqs[1:] if base_freq in self.sorted_freqs else self.sorted_freqs

if bg is None:
Expand Down
Loading

0 comments on commit 8daa121

Please sign in to comment.