diff --git a/aidial_analytics_realtime/analytics.py b/aidial_analytics_realtime/analytics.py index 20a0dc5..2cd561e 100644 --- a/aidial_analytics_realtime/analytics.py +++ b/aidial_analytics_realtime/analytics.py @@ -82,8 +82,8 @@ async def make_point( user_hash: str, user_title: str, timestamp: datetime, - request: dict, - response: dict, + request: dict | None, + response: dict | None, request_type: RequestType, usage: dict | None, topic_model: TopicModel, @@ -95,36 +95,40 @@ async def make_point( topic = None response_content = "" request_content = "" - match request_type: - case RequestType.CHAT_COMPLETION: - response_contents = get_chat_completion_response_contents( - logger, response - ) - request_contents = get_chat_completion_request_contents( - logger, request - ) - request_content = "\n".join(request_contents) - response_content = "\n".join(response_contents) + if response is not None and request is not None: + match request_type: + case RequestType.CHAT_COMPLETION: + response_contents = get_chat_completion_response_contents( + logger, response + ) + request_contents = get_chat_completion_request_contents( + logger, request + ) + + request_content = "\n".join(request_contents) + response_content = "\n".join(response_contents) - if chat_id: - topic = to_string( - await topic_model.get_topic_by_text( - "\n\n".join(request_contents + response_contents) + if chat_id: + topic = to_string( + await topic_model.get_topic_by_text( + "\n\n".join(request_contents + response_contents) + ) ) + case RequestType.EMBEDDING: + request_contents = get_embeddings_request_contents( + logger, request ) - case RequestType.EMBEDDING: - request_contents = get_embeddings_request_contents(logger, request) - request_content = "\n".join(request_contents) - if chat_id: - topic = to_string( - await topic_model.get_topic_by_text( - "\n\n".join(request_contents) + request_content = "\n".join(request_contents) + if chat_id: + topic = to_string( + await topic_model.get_topic_by_text( + "\n\n".join(request_contents) + ) ) - ) - case _: - assert_never(request_type) + case _: + assert_never(request_type) price = Decimal(0) deployment_price = Decimal(0) @@ -162,7 +166,7 @@ async def make_point( "language", ( "undefined" - if not chat_id + if not chat_id or request is None or response is None else await detect_lang(logger, request, response, request_type) ), ) @@ -174,6 +178,7 @@ async def make_point( ( response["id"] if request_type == RequestType.CHAT_COMPLETION + and response is not None else uuid4() ), ) @@ -183,12 +188,16 @@ async def make_point( .field( "number_request_messages", ( - len(request["messages"]) - if request_type == RequestType.CHAT_COMPLETION + 0 + if request is None else ( - 1 - if isinstance(request["input"], str) - else len(request["input"]) + len(request["messages"]) + if request_type == RequestType.CHAT_COMPLETION + else ( + 1 + if isinstance(request["input"], str) + else len(request["input"]) + ) ) ), ) @@ -239,7 +248,10 @@ def make_rate_point( return point -async def parse_usage_per_model(response: dict): +async def parse_usage_per_model(response: dict | None): + if response is None: + return [] + statistics = response.get("statistics") if statistics is None: return [] @@ -265,8 +277,8 @@ async def on_message( user_hash: str, user_title: str, timestamp: datetime, - request: dict, - response: dict, + request: dict | None, + response: dict | None, type: RequestType, topic_model: TopicModel, rates_calculator: RatesCalculator, @@ -275,9 +287,11 @@ async def on_message( trace: dict | None, execution_path: list | None, ): - logger.info(f"Chat completion response length {len(response)}") + logger.info(f"Chat completion response length {len(response or [])}") usage_per_model = await parse_usage_per_model(response) + response_usage = None if response is None else response.get("usage") + if token_usage is not None: point = await make_point( logger, @@ -314,7 +328,7 @@ async def on_message( request, response, type, - response.get("usage"), + response_usage, topic_model, rates_calculator, parent_deployment, diff --git a/aidial_analytics_realtime/app.py b/aidial_analytics_realtime/app.py index 753eb90..c8ed69d 100644 --- a/aidial_analytics_realtime/app.py +++ b/aidial_analytics_realtime/app.py @@ -96,42 +96,47 @@ async def on_chat_completion_message( if response["status"] != "200": return - request_body = json.loads(request["body"]) - stream = request_body.get("stream", False) - model = request_body.get("model", deployment) - response_body = None - if stream: - body = response["body"] - chunks = body.split("\n\ndata: ") + request_body = None + model: str | None = None - chunks = [chunk.strip() for chunk in chunks] + if (request_body_str := request.get("body")) is not None: - chunks[0] = chunks[0][chunks[0].find("data: ") + 6 :] - if chunks[-1] == "[DONE]": - chunks.pop(len(chunks) - 1) + request_body = json.loads(request_body_str) + stream = request_body.get("stream", False) + model = request_body.get("model", deployment) - response_body = json.loads(chunks[-1]) - for chunk in chunks[0 : len(chunks) - 1]: - chunk = json.loads(chunk) + if stream: + body = response["body"] + chunks = body.split("\n\ndata: ") - response_body["choices"] = merge( - response_body["choices"], chunk["choices"] - ) + chunks = [chunk.strip() for chunk in chunks] - for i in range(len(response_body["choices"])): - response_body["choices"][i]["message"] = response_body["choices"][ - i - ]["delta"] - del response_body["choices"][i]["delta"] - else: - response_body = json.loads(response["body"]) + chunks[0] = chunks[0][chunks[0].find("data: ") + 6 :] + if chunks[-1] == "[DONE]": + chunks.pop(len(chunks) - 1) + + response_body = json.loads(chunks[-1]) + for chunk in chunks[0 : len(chunks) - 1]: + chunk = json.loads(chunk) + + response_body["choices"] = merge( + response_body["choices"], chunk["choices"] + ) + + for i in range(len(response_body["choices"])): + response_body["choices"][i]["message"] = response_body[ + "choices" + ][i]["delta"] + del response_body["choices"][i]["delta"] + else: + response_body = json.loads(response["body"]) await on_message( logger, influx_writer, deployment, - model, + model or deployment, project_id, chat_id, upstream_url, @@ -171,6 +176,16 @@ async def on_embedding_message( if response["status"] != "200": return + request_body_str = request.get("body") + response_body_str = response.get("body") + + request_body = ( + None if request_body_str is None else json.loads(request_body_str) + ) + response_body = ( + None if response_body_str is None else json.loads(response_body_str) + ) + await on_message( logger, influx_writer, @@ -182,8 +197,8 @@ async def on_embedding_message( user_hash, user_title, timestamp, - json.loads(request["body"]), - json.loads(response["body"]), + request_body, + response_body, RequestType.EMBEDDING, topic_model, rates_calculator, diff --git a/tests/test_app.py b/tests/test_app.py index 438b529..e100c64 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -93,6 +93,81 @@ def test_chat_completion_plain_text(): ] +def test_chat_completion_plain_text_no_body(): + write_api_mock = InfluxWriterMock() + app.app.dependency_overrides[app.InfluxWriterAsync] = lambda: write_api_mock + app.app.dependency_overrides[app.TopicModel] = lambda: TestTopicModel() + + client = TestClient(app.app) + response = client.post( + "/data", + json=[ + { + "message": json.dumps( + { + "apiType": "DialOpenAI", + "chat": {"id": "chat-1"}, + "project": {"id": "PROJECT-KEY"}, + "user": {"id": "", "title": ""}, + "deployment": "gpt-4", + "token_usage": { + "completion_tokens": 189, + "prompt_tokens": 22, + "total_tokens": 211, + "deployment_price": 0.001, + "price": 0.001, + }, + "request": { + "protocol": "HTTP/1.1", + "method": "POST", + "uri": "/openai/deployments/gpt-4/chat/completions?api-version=2023-03-15-preview", + "time": "2023-08-16T19:42:39.997", + }, + "response": {"status": "200"}, + } + ) + }, + { + "message": json.dumps( + { + "apiType": "DialOpenAI", + "chat": {"id": "chat-2"}, + "project": {"id": "PROJECT-KEY-2"}, + "user": {"id": "", "title": ""}, + "deployment": "gpt-4", + "token_usage": { + "completion_tokens": 189, + "prompt_tokens": 22, + "total_tokens": 211, + "deployment_price": 0.001, + "price": 0.001, + }, + "request": { + "protocol": "HTTP/1.1", + "method": "POST", + "uri": "/openai/deployments/gpt-4/chat/completions", + "time": "2023-11-24T03:33:40.39", + }, + "response": {"status": "200"}, + } + ) + }, + ], + ) + assert response.status_code == 200 + assert len(write_api_mock.points) == 2 + + assert re.match( + r'analytics,core_parent_span_id=undefined,core_span_id=undefined,deployment=gpt-4,execution_path=undefined,language=undefined,model=gpt-4,parent_deployment=undefined,project_id=PROJECT-KEY,response_id=(.+?),title=undefined,trace_id=undefined,upstream=undefined chat_id="chat-1",completion_tokens=189i,deployment_price=0.001,number_request_messages=0i,price=0.001,prompt_tokens=22i,user_hash="undefined" 1692214959997000000', + write_api_mock.points[0], + ) + + assert re.match( + r'analytics,core_parent_span_id=undefined,core_span_id=undefined,deployment=gpt-4,execution_path=undefined,language=undefined,model=gpt-4,parent_deployment=undefined,project_id=PROJECT-KEY-2,response_id=(.+?),title=undefined,trace_id=undefined,upstream=undefined chat_id="chat-2",completion_tokens=189i,deployment_price=0.001,number_request_messages=0i,price=0.001,prompt_tokens=22i,user_hash="undefined" 1700796820390000000', + write_api_mock.points[1], + ) + + def test_chat_completion_list_content(): write_api_mock = InfluxWriterMock() app.app.dependency_overrides[app.InfluxWriterAsync] = lambda: write_api_mock @@ -284,8 +359,8 @@ def test_embeddings_plain_text(): "model": "text-embedding-3-small", "object": "list", "usage": { - "prompt_tokens": 2, - "total_tokens": 2, + "prompt_tokens": 43, + "total_tokens": 43, }, } ), @@ -303,6 +378,57 @@ def test_embeddings_plain_text(): ) +def test_embeddings_no_body(): + write_api_mock: app.InfluxWriterAsync = InfluxWriterMock() + app.app.dependency_overrides[app.InfluxWriterAsync] = lambda: write_api_mock + app.app.dependency_overrides[app.TopicModel] = lambda: TestTopicModel() + + client = TestClient(app.app) + response = client.post( + "/data", + json=[ + { + "message": json.dumps( + { + "apiType": "DialOpenAI", + "chat": {"id": "chat-1"}, + "project": {"id": "PROJECT-KEY"}, + "user": {"id": "", "title": ""}, + "deployment": "text-embedding-3-small", + "token_usage": { + "completion_tokens": 0, + "prompt_tokens": 2, + "total_tokens": 2, + "deployment_price": 0.001, + "price": 0.001, + }, + "parent_deployment": "assistant", + "trace": { + "trace_id": "5dca3d6ed5d22b6ab574f27a6ab5ec14", + "core_span_id": "9ade2b6fef0a716d", + "core_parent_span_id": "20e7e64715abbe97", + }, + "execution_path": [None, "b", "c"], + "request": { + "protocol": "HTTP/1.1", + "method": "POST", + "uri": "/openai/deployments/text-embedding-3-small/embeddings?api-version=2023-03-15-preview", + "time": "2023-08-16T19:42:39.997", + }, + "response": {"status": "200"}, + } + ) + }, + ], + ) + assert response.status_code == 200 + assert len(write_api_mock.points) == 1 + assert re.match( + r'analytics,core_parent_span_id=20e7e64715abbe97,core_span_id=9ade2b6fef0a716d,deployment=text-embedding-3-small,execution_path=undefined/b/c,language=undefined,model=text-embedding-3-small,parent_deployment=assistant,project_id=PROJECT-KEY,response_id=(.+?),title=undefined,trace_id=5dca3d6ed5d22b6ab574f27a6ab5ec14,upstream=undefined chat_id="chat-1",completion_tokens=0i,deployment_price=0.001,number_request_messages=0i,price=0.001,prompt_tokens=2i,user_hash="undefined" 1692214959997000000', + write_api_mock.points[0], + ) + + def test_embeddings_tokens(): write_api_mock: app.InfluxWriterAsync = InfluxWriterMock() app.app.dependency_overrides[app.InfluxWriterAsync] = lambda: write_api_mock @@ -362,8 +488,8 @@ def test_embeddings_tokens(): "model": "text-embedding-3-small", "object": "list", "usage": { - "prompt_tokens": 2, - "total_tokens": 2, + "prompt_tokens": 43, + "total_tokens": 43, }, } ),