diff --git a/.github/workflows/go_ci.yml b/.github/workflows/go_ci.yml index 01428540..ab708c1f 100644 --- a/.github/workflows/go_ci.yml +++ b/.github/workflows/go_ci.yml @@ -1,6 +1,10 @@ on: push: branches: ['main'] + paths: + - "go/**" + - "python/qianfan/tests/utils/mock_server.py" + - ".github/workflows/go_ci.yml" pull_request: paths: - "go/**" diff --git a/.github/workflows/java_ci.yml b/.github/workflows/java_ci.yml index c9f6aa9d..1fb1ed42 100644 --- a/.github/workflows/java_ci.yml +++ b/.github/workflows/java_ci.yml @@ -3,6 +3,8 @@ name: Java CI on: push: branches: ["main"] + paths: + - "java/**" pull_request: paths: - "java/**" diff --git a/.github/workflows/js_ci.yml b/.github/workflows/js_ci.yml index 5107995b..4ab536f4 100644 --- a/.github/workflows/js_ci.yml +++ b/.github/workflows/js_ci.yml @@ -3,6 +3,8 @@ name: JS CI on: push: branches: ["main"] + paths: + - "javascript/**" pull_request: paths: - "javascript/**" diff --git a/.github/workflows/py_ci.yml b/.github/workflows/py_ci.yml index 22935b8e..fbbc3a10 100644 --- a/.github/workflows/py_ci.yml +++ b/.github/workflows/py_ci.yml @@ -1,6 +1,9 @@ on: push: branches: ['main'] + paths: + - "python/**" + - ".github/workflows/py_ci.yml" pull_request: paths: - "python/**" diff --git a/docs/stress_test.md b/docs/stress_test.md index 01c2f1a9..2d85fbaf 100644 --- a/docs/stress_test.md +++ b/docs/stress_test.md @@ -1,9 +1,11 @@ # LLM服务性能测试 -千帆 Python SDK 提供了基于locust工具的对大模型服务进行快速压测以及性能评估的功能。 -该功能入口在Dataset对象的stress_test方法中。 +千帆 Python SDK 提供了基于locust工具的对大模型服务进行单轮/多轮快速压测以及性能评估的功能。 +该功能入口在Dataset对象的stress_test方法以及multi_stress_test方法中。 -> 当前使用notebook进行stress_test调用需要先设置环境变量,具体见下文以及cookbook/dataset/stress_test.ipynb。 + +两种方法的使用场景不同。stress_test方法用于单轮压测,multi_stress_test方法用于多轮压测。 +> 当前在notebook环境中进行压测调用需要先设置环境变量,具体设置以及使用方法见下文。 ## 安装准备 @@ -23,8 +25,9 @@ pip install 'qianfan[dataset_base]' ## 启动压测 +### stress_test方法 -以下为Python环境和Notebook环境的示例代码: +以下为stress_test方法的示例代码: Python环境: @@ -45,21 +48,21 @@ ds = Dataset.load(data_file="...") ds.stress_test( users=1, - workers=1, - spawn_rate=128, - model="ERNIE-Bot", + #model="ERNIE-Speed-8K", + endpoint="YourEndpoint", model_type="ChatCompletion" ) ``` -Notebook环境: +### multi_stress_test方法 + +接下来为multi_stress_test方法的示例代码: + +Python环境: ```python -from gevent import monkey -monkey.patch_all() -``` -```python + import os os.environ['QIANFAN_ENABLE_STRESS_TEST'] = "true" @@ -73,18 +76,34 @@ os.environ["QIANFAN_SECRET_KEY"] = "..." ds = Dataset.load(data_file="...") -ds.stress_test( - users=1, - workers=1, - spawn_rate=128, - model="ERNIE-Bot", - model_type="ChatCompletion" +ds.multi_stress_test( + origin_users=2, + workers=2, + spawn_rate=2, + #model="ERNIE-Speed-8K, + endpoint="YourEndpoint", + model_type="ChatCompletion", + runtime="10s", + rounds=2, + interval=2, + first_latency_threshold = 2, + round_latency_threshold = 30, + success_rate_threshold = 0.8, ) + ``` + +Notebook中需要首先配置的环境变量如下,其他运行代码同上: + +```python +from gevent import monkey +monkey.patch_all() +``` + ## 方法参数 -stress_test支持以下参数: +### stress_test支持以下参数: -- **workers (int)**:指定发压使用的worker数目,每个worker为1个进程; +- **workers (int)**:指定发压使用的worker数目,每个worker为1个进程,默认为1个进程; - **users (int)**:指定发压使用的总user数,必须大于worker数目;每个worker负责模拟${users}/${workers}个虚拟用户; - **runtime (str)**:指定发压任务的最大运行时间,格式为带时间单位的字符串,例如(300s, 20m, 3h, 1h30m);压测任务启动后会一直运行到数据集内所有数据都请求完毕,或到达该参数指定的最大运行时间;该参数默认值为'0s',表示不设最大运行时间; - **spawn_rate (int)**:指定每秒启动的user数目; @@ -93,9 +112,24 @@ stress_test支持以下参数: - **model_type (str)**:指定被测服务的模型类型。 目前只支持'ChatCompletion'与'Completion两类';默认值为'ChatCompletion'; - **hyperparameters (Optional[Dict[str, Any]])**:指定压测时使用的超参数; +### multi_stress_test方法支持以下参数: + +- **workers (int)**:指定发压使用的worker数目,每个worker为1个进程,默认为1个进程; +- **origin_users (int)**:指定发压使用的初始user数,必须大于worker数目;每个worker负责模拟${users}/${workers}个虚拟用户; +- **runtime (str)**:指定发压任务的最大运行时间,格式为带时间单位的字符串,例如(300s, 20m, 3h, 1h30m);压测任务启动后会一直运行到数据集内所有数据都请求完毕,或到达该参数指定的最大运行时间;该参数默认值为'0s',表示不设最大运行时间; +- **spawn_rate (int)**:指定每秒启动的user数目; +- **model (str)**:指定需要压测服务的模型名称。该参数与endpoint只能指定一个; +- **endpoint (str)**:指定需要压测服务的url路径。该参数与model只能指定一个; +- **model_type (str)**:指定被测服务的模型类型。 目前只支持'ChatCompletion'与'Completion两类';默认值为'ChatCompletion'; +- **hyperparameters (Optional[Dict[str, Any]])**:指定压测时使用的超参数; +- **rounds (int)**:指定压测轮数; +- **interval (int)**:指定压测轮之间的加压并发数,比如interval=2,则在第1轮压测结束后,会在第2轮开始时,额外启动两个user的并发,以此类推; +- **first_latency_threshold (float)**:指定首句时延的阈值,超过该阈值会停止在本轮压测,单位为秒; +- **round_latency_threshold (float)**:指定全长时延的阈值,超过该阈值会停止在本轮压测,单位为秒; +- **success_rate_threshold (float)**:指定请求成功率的阈值,低于该阈值会停止在本轮压测,单位为百分比; ## 数据格式 -可用于stress_test的数据集目前支持以下三种格式: +可用于压测的数据集目前支持以下三种格式: jsonl格式示例 @@ -120,33 +154,42 @@ txt格式示例 ## 输出内容 运行过程中会实时输出已发送请求的聚合指标。 -运行结束后会输出任务的日志路径,以及整体的聚合数据。 +运行结束后会输出整体的聚合数据,以及任务的日志路径。日志路径中的performance_table.html为压测结果的可视化表格。 整体聚合数据内容示例: - QPS: 4.02 - RPM: 55.46 - Latency Avg: 3.61 - Latency Min: 2.45 - Latency Max: 4.7 - Latency 50%: 3.6 - Latency 80%: 4.2 - FirstTokenLatency Avg: 1.54 - FirstTokenLatency Min: 0.85 - FirstTokenLatency Max: 2.62 - FirstTokenLatency 50%: 1.6 - FirstTokenLatency 80%: 1.9 - InputTokens Avg: 78.0 - OutputTokens Avg: 49.6 - TotalQuery: 11100 - SuccessQuery: 5800 - FailureQuery: 11042 - TotalTime: 62.75 - SuccessRate: 0.52% + user_num: 4 + worker_num: 2 + spawn_rate: 2 + model_type: ChatCompletion + hyperparameters: None + QPS: 1.16 + Latency Avg: 3.08 + Latency Min: 1.95 + Latency Max: 4.56 + Latency 50%: 3.0 + Latency 80%: 3.6 + FirstTokenLatency Avg: 0.74 + FirstTokenLatency Min: 0.0 + FirstTokenLatency Max: 2.15 + FirstTokenLatency 50%: 0.64 + FirstTokenLatency 80%: 0.8 + InputTokens Avg: 69.6 + OutputTokens Avg: 43.67 + TotalInputTokens Avg: 2088.0 + TotalOutputTokens Avg: 1266.33 + TotalQuery: 30 + SuccessQuery: 29 + FailureQuery: 1 + TotalTime: 28.63 + SuccessRate: 96.67% 各项指标含义如下: - +- **user_num**: 压测使用的本轮user数,即本轮发压的并发数; +- **worker_num**: 压测使用的worker数目,即进程数; +- **spawn_rate**: 每秒启动的user数目; +- **model_type**: 被压测服务的模型类型; +- **hyperparameters**: 压测使用的超参数; - **QPS**:服务每秒实际处理的请求数; -- **RPM**:每分钟实际处理的请求数; - **Latency Avg/Min/Max/50%/80%**:全长时延的平均值/最小值/最大值/50分位值/80分位值; - **FirstTokenLatency Avg/Min/Max/50%/80%**:首句时延的平均值/最小值/最大值/50分位值/80分位值; - **InputTokens Avg**:单次请求输入的token长度平均值; @@ -158,5 +201,4 @@ txt格式示例 ## Q&A - Q: 为什么要先执行`monkey.patch_all()`? -- A: 由于Locust中使用了gevent库来保证高并发性能,而gevent的高并发依赖于monkey patching的非阻塞I/O机制,但该机制在Notebook环境中默认未开启。因此,在开始测试前,需要进行monkey patching操作。这样做是为了确保整个环境,包括IPython/Jupyter自己的组件,都使用gevent兼容的非阻塞版本,从而避免因混合使用阻塞和非阻塞操作导致的不一致性和潜在的死锁问题。 - +- A: 由于Locust中使用了gevent库来保证高并发性能,而gevent的高并发依赖于monkey patching的非阻塞I/O机制,但该机制在Notebook环境中默认未开启。因此,在开始测试前,需要进行monkey patching操作。这样做是为了确保整个环境,包括IPython/Jupyter自己的组件,都使用gevent兼容的非阻塞版本,从而避免因混合使用阻塞和非阻塞操作导致的不一致性和潜在的死锁问题。 \ No newline at end of file diff --git a/python/pyproject.toml b/python/pyproject.toml index 236b4b56..be0ca5dd 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "qianfan" -version = "0.4.1" +version = "0.4.1.1" description = "文心千帆大模型平台 Python SDK" authors = [] license = "Apache-2.0" diff --git a/python/qianfan/dataset/stress_test/load_runner.py b/python/qianfan/dataset/stress_test/load_runner.py index c5fafb4a..c9b8f202 100644 --- a/python/qianfan/dataset/stress_test/load_runner.py +++ b/python/qianfan/dataset/stress_test/load_runner.py @@ -19,18 +19,21 @@ logger = logging.getLogger("yame.stats") logger.setLevel(logging.INFO) GlobalData.data["threshold_first"] = Value("i", 0) -GlobalData.data["total_requests"] = Value("i", 0) GlobalData.data["success_requests"] = Value("i", 0) GlobalData.data["first_latency_threshold"] = 0 def model_details(endpoint: str) -> Optional[Dict[str, Any]]: - info = resources.Service.V2.service_list() - for inf in info.body["result"]["serviceList"]: - temp = inf["url"].split("/") - if temp[-1] == endpoint: - return inf - return None + try: + info = resources.Service.V2.service_list() + for inf in info.body["result"]["serviceList"]: + temp = inf["url"].split("/") + if temp[-1] == endpoint: + return inf + return None + except Exception as e: + logging.error(f"An error occurred: {e}") + return None class QianfanLocustRunner(LocustRunner): @@ -100,46 +103,36 @@ def __init__( self.spawn_rate = spawn_rate self.rounds = rounds self.interval = interval - self.total_requests = Value("i", 0) + # 初始化基础 model_info 字典 + self.model_info = { + "modelname": None, + "modelVersionId": None, + "serviceId": None, + "serviceUrl": None, + "computer": None, + "replicasCount": None, + "origin_user_num": self.user_num, + "worker": self.worker_num, + "rounds": self.rounds, + "spawn_rate": self.spawn_rate, + "hyperparameters": self.hyperparameters, + "interval": self.interval, + } + # 如果是端点且端点不为空,尝试获取模型信息 if is_endpoint and endpoint is not None: model_info = model_details(endpoint) - if model_info is not None: - modelVersionId = model_info["modelId"] - serviceId = model_info["serviceId"] - serviceUrl = model_info["url"] - computer = model_info["resourceConfig"]["type"] - replicasCount = model_info["resourceConfig"]["replicasCount"] - modelname = model_info["name"] - self.model_info = { - "modelname": modelname, - "modelVersionId": modelVersionId, - "serviceId": serviceId, - "serviceUrl": serviceUrl, - "computer": computer, - "replicasCount": replicasCount, - "origin_user_num": self.user_num, - "worker": self.worker_num, - "rounds": self.rounds, - "spawn_rate": self.spawn_rate, - "hyperparameters": self.hyperparameters, - "interval": self.interval, - } - else: - model_info = None - self.model_info = { - "modelname": None, - "modelVersionId": None, - "serviceId": None, - "serviceUrl": None, - "computer": None, - "replicasCount": None, - "origin_user_num": self.user_num, - "worker": self.worker_num, - "rounds": self.rounds, - "spawn_rate": self.spawn_rate, - "hyperparameters": self.hyperparameters, - "interval": self.interval, - } + if model_info: + # 更新 model_info 字典 + self.model_info.update( + { + "modelname": model_info["name"], + "modelVersionId": model_info["modelId"], + "serviceId": model_info["serviceId"], + "serviceUrl": model_info["url"], + "computer": model_info["resourceConfig"]["type"], + "replicasCount": model_info["resourceConfig"]["replicasCount"], + } + ) def run(self, user_num: Optional[int] = None) -> Dict[str, Any]: """ @@ -202,7 +195,6 @@ def run(self, user_num: Optional[int] = None) -> Dict[str, Any]: logger.info("成功率低于阈值") return ret current_user_num += self.interval if self.interval is not None else 0 - GlobalData.data["total_requests"].value = 0 html_table = generate_html_table(html, self.model_info) html_path = round_result["record_dir"] + "/performance_table.html" diff --git a/python/qianfan/dataset/stress_test/load_statistics.py b/python/qianfan/dataset/stress_test/load_statistics.py index ea52b83b..88827a69 100644 --- a/python/qianfan/dataset/stress_test/load_statistics.py +++ b/python/qianfan/dataset/stress_test/load_statistics.py @@ -87,7 +87,6 @@ def gen_brief( + "model_type: %s\n" % model_type + "hyperparameters: %s\n" % hyperparameters + "QPS: %s\n" % round(qps, 2) - + "RPM: %s\n" % round(success_count / time * 60, 2) + "Latency Avg: %s\n" % round(lat_tuple[0] / 1000, 2) + "Latency Min: %s\n" % round(lat_tuple[1] / 1000, 2) + "Latency Max: %s\n" % round(lat_tuple[2] / 1000, 2) @@ -110,7 +109,6 @@ def gen_brief( ) statistics = { "QPS": round(qps, 2), - "RPM": round(success_count / time * 60, 2), "latency_avg": round(lat_tuple[0] / 1000, 2), "latency_min": round(lat_tuple[1] / 1000, 2), "latency_max": round(lat_tuple[2] / 1000, 2), @@ -227,7 +225,6 @@ def generate_html_table(data_rows: Any, model_info: Any) -> str: columns = [ "并发", "QPS", - "RPM", "Latency Avg", "Latency Min", "Latency Max", @@ -259,8 +256,6 @@ def generate_html_table(data_rows: Any, model_info: Any) -> str: value = row.get("concurrency", "") elif column == "QPS": value = row.get("QPS", "") - elif column == "RPM": - value = row.get("RPM", "") elif column == "Latency Avg": value = row.get("latency_avg", "") elif column == "Latency Min": diff --git a/python/qianfan/dataset/stress_test/qianfan_llm_load.py b/python/qianfan/dataset/stress_test/qianfan_llm_load.py index 888864d6..c557cdea 100644 --- a/python/qianfan/dataset/stress_test/qianfan_llm_load.py +++ b/python/qianfan/dataset/stress_test/qianfan_llm_load.py @@ -217,7 +217,6 @@ def _request_internal( try: kwargs["retry_count"] = 0 responses = self.chat_comp.do(messages=messages, **kwargs) - GlobalData.data["total_requests"].value += 1 except Exception as e: self.exc = e resp = QfResponse(-1) @@ -381,7 +380,6 @@ def _request_internal( start_time = time.time() start_perf_counter = time.perf_counter() responses = self.comp.do(prompt=prompt, **kwargs) - GlobalData.data["total_requests"].value += 1 for resp in responses: setattr(resp, "url", self.model) setattr(resp, "reason", None)