Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improved logging #89

Merged
merged 12 commits into from
Jan 21, 2025
62 changes: 12 additions & 50 deletions aidial_analytics_realtime/analytics.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
from datetime import datetime
from decimal import Decimal
from enum import Enum
from logging import Logger
from typing import Awaitable, Callable
from uuid import uuid4

from influxdb_client import Point
from langid.langid import LanguageIdentifier, model
from typing_extensions import assert_never

from aidial_analytics_realtime.dial import (
get_chat_completion_request_contents,
get_chat_completion_response_contents,
get_embeddings_request_contents,
)
from aidial_analytics_realtime.influx_writer import InfluxWriterAsync
from aidial_analytics_realtime.langid import detect_lang_by_text
from aidial_analytics_realtime.rates import RatesCalculator
from aidial_analytics_realtime.topic_model import TopicModel
from aidial_analytics_realtime.utils.concurrency import (
run_in_cpu_tasks_executor,
)

identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)


class RequestType(Enum):
Expand All @@ -29,41 +23,21 @@ class RequestType(Enum):


async def detect_lang(
logger: Logger, request: dict, response: dict, request_type: RequestType
request: dict, response: dict, request_type: RequestType
) -> str:
match request_type:
case RequestType.CHAT_COMPLETION:
request_contents = get_chat_completion_request_contents(
logger, request
)
response_content = get_chat_completion_response_contents(
logger, response
)
request_contents = get_chat_completion_request_contents(request)
response_content = get_chat_completion_response_contents(response)
text = "\n\n".join(request_contents[-1:] + response_content)
case RequestType.EMBEDDING:
text = "\n\n".join(get_embeddings_request_contents(logger, request))
text = "\n\n".join(get_embeddings_request_contents(request))
case _:
assert_never(request_type)

return to_string(await detect_lang_by_text(text))


async def detect_lang_by_text(text: str) -> str | None:
text = text.strip()

if not text:
return None

try:
lang, prob = await run_in_cpu_tasks_executor(identifier.classify, text)
if prob > 0.998:
return lang
except Exception:
pass

return None


def to_string(obj: str | None) -> str:
return obj or "undefined"

Expand All @@ -73,7 +47,6 @@ def build_execution_path(path: list | None):


async def make_point(
logger: Logger,
deployment: str,
model: str,
project_id: str,
Expand All @@ -100,25 +73,21 @@ async def make_point(
match request_type:
case RequestType.CHAT_COMPLETION:
response_contents = get_chat_completion_response_contents(
logger, response
)
request_contents = get_chat_completion_request_contents(
logger, request
response
)
request_contents = get_chat_completion_request_contents(request)

request_content = "\n".join(request_contents)
response_content = "\n".join(response_contents)

if chat_id:
topic = to_string(
await topic_model.get_topic_by_text(
"\n\n".join(request_contents + response_contents)
"\n\n".join(request_contents + response_contents),
)
)
case RequestType.EMBEDDING:
request_contents = get_embeddings_request_contents(
logger, request
)
request_contents = get_embeddings_request_contents(request)

request_content = "\n".join(request_contents)
if chat_id:
Expand Down Expand Up @@ -167,7 +136,7 @@ async def make_point(
(
"undefined"
if not chat_id or request is None or response is None
else await detect_lang(logger, request, response, request_type)
else await detect_lang(request, response, request_type)
),
)
.tag("upstream", to_string(upstream_url))
Expand Down Expand Up @@ -267,8 +236,7 @@ async def parse_usage_per_model(response: dict | None):


async def on_message(
logger: Logger,
influx_writer: Callable[[Point], Awaitable[None]],
influx_writer: InfluxWriterAsync,
deployment: str,
model: str,
project_id: str,
Expand All @@ -287,14 +255,11 @@ async def on_message(
trace: dict | None,
execution_path: list | None,
):
logger.info(f"Chat completion response length {len(response or [])}")

usage_per_model = await parse_usage_per_model(response)
response_usage = None if response is None else response.get("usage")

if token_usage is not None:
point = await make_point(
logger,
deployment,
model,
project_id,
Expand All @@ -316,7 +281,6 @@ async def on_message(
await influx_writer(point)
elif len(usage_per_model) == 0:
point = await make_point(
logger,
deployment,
model,
project_id,
Expand All @@ -338,7 +302,6 @@ async def on_message(
await influx_writer(point)
else:
point = await make_point(
logger,
deployment,
model,
project_id,
Expand All @@ -361,7 +324,6 @@ async def on_message(

for usage in usage_per_model:
point = await make_point(
logger,
deployment,
usage["model"],
project_id,
Expand Down
88 changes: 70 additions & 18 deletions aidial_analytics_realtime/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio
import contextlib
import json
import logging
import re
import sys
from datetime import datetime

import aiohttp
import starlette.requests
import uvicorn
from fastapi import Depends, FastAPI, Request
from fastapi.responses import JSONResponse
Expand All @@ -22,7 +26,10 @@
from aidial_analytics_realtime.topic_model import TopicModel
from aidial_analytics_realtime.universal_api_utils import merge
from aidial_analytics_realtime.utils.concurrency import cpu_task_executor
from aidial_analytics_realtime.utils.log_config import configure_loggers, logger
from aidial_analytics_realtime.utils.logging import add_logger_prefix
from aidial_analytics_realtime.utils.logging import app_logger as logger
from aidial_analytics_realtime.utils.logging import configure_loggers
from aidial_analytics_realtime.utils.timer import Timer

RATE_PATTERN = r"/v1/(.+?)/rate"
CHAT_COMPLETION_PATTERN = r"/openai/deployments/(.+?)/chat/completions"
Expand Down Expand Up @@ -61,7 +68,6 @@ async def on_rate_message(
response: dict,
influx_writer: InfluxWriterAsync,
):
logger.info(f"Rate message length {len(request) + len(response)}")
request_body = json.loads(request["body"])
point = make_rate_point(
deployment,
Expand Down Expand Up @@ -133,7 +139,6 @@ async def on_chat_completion_message(
response_body = json.loads(response["body"])

await on_message(
logger,
influx_writer,
deployment,
model or deployment,
Expand Down Expand Up @@ -187,7 +192,6 @@ async def on_embedding_message(
)

await on_message(
logger,
influx_writer,
deployment,
deployment,
Expand Down Expand Up @@ -298,28 +302,76 @@ async def on_log_messages(
topic_model: TopicModel = Depends(),
rates_calculator: RatesCalculator = Depends(),
):

data = await request.json()

statuses = []
for idx, item in enumerate(data):
try:
await on_log_message(
json.loads(item["message"]),
influx_writer,
topic_model,
rates_calculator,
)
except Exception as e:
logging.exception(f"Error processing message #{idx}")
statuses.append({"status": "error", "error": str(e)})
else:
statuses.append({"status": "success"})
n = len(data)
logger.info(f"number of messages: {n}")

statuses: list[dict] = []

async with Timer(logger.debug, format="request {elapsed}"):

async def _task(i: int, message_str: str) -> dict:
add_logger_prefix(f"[{i}/{n}]")

async with Timer(logger.debug, format="message {elapsed}"):
return await process_message(
json.loads(message_str),
influx_writer,
topic_model,
rates_calculator,
)

statuses = await asyncio.gather(
*[
_task(i, message["message"])
for i, message in enumerate(data, start=1)
]
)

if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"response: {json.dumps(statuses)}")

# Returning 200 code even if processing of some messages has failed,
# since the log broker that sends the messages may decide to retry the failed requests.
return JSONResponse(content=statuses, status_code=200)


async def process_message(
message: dict,
influx_writer: InfluxWriterAsync,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
) -> dict:
def _error(reason: str | None = None) -> dict:
error = str(sys.exc_info()[1])
ret = {"status": "error"}
if error:
ret["error"] = error
if reason:
ret["reason"] = reason
logger.error(reason)
else:
logger.exception("caught exception")
return ret

try:
await on_log_message(
message, influx_writer, topic_model, rates_calculator
)
logger.info("success")
return {"status": "success"}
except starlette.requests.ClientDisconnect:
return _error("client disconnect")
adubovik marked this conversation as resolved.
Show resolved Hide resolved
except aiohttp.ClientConnectionError:
return _error("connection error")
except asyncio.TimeoutError:
return _error("timeout")
except Exception:
return _error()


@app.get("/health")
def health():
return {"status": "ok"}
Expand Down
39 changes: 14 additions & 25 deletions aidial_analytics_realtime/dial.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
from logging import Logger
from typing import Iterator, List

from aidial_analytics_realtime.utils.logging import app_logger as logger

def get_chat_completion_request_contents(
logger: Logger, request: dict
) -> List[str]:
return list(_chat_completion_request_contents(logger, request))

def get_chat_completion_request_contents(request: dict) -> List[str]:
return list(_chat_completion_request_contents(request))

def get_chat_completion_response_contents(
logger: Logger, response: dict
) -> List[str]:
return list(_chat_completion_response_contents(logger, response))

def get_chat_completion_response_contents(response: dict) -> List[str]:
return list(_chat_completion_response_contents(response))

def get_embeddings_request_contents(logger: Logger, request: dict) -> List[str]:
return list(_embeddings_request_contents(logger, request))

def get_embeddings_request_contents(request: dict) -> List[str]:
return list(_embeddings_request_contents(request))

def _chat_completion_request_contents(
logger: Logger, request: dict
) -> Iterator[str]:

def _chat_completion_request_contents(request: dict) -> Iterator[str]:
for message in request["messages"]:
yield from _chat_completion_message_contents(logger, message)
yield from _chat_completion_message_contents(message)


def _chat_completion_response_contents(
logger: Logger, response: dict
) -> Iterator[str]:
def _chat_completion_response_contents(response: dict) -> Iterator[str]:
message = response["choices"][0]["message"]
yield from _chat_completion_message_contents(logger, message)
yield from _chat_completion_message_contents(message)


def _embeddings_request_contents(
logger: Logger, request: dict
) -> Iterator[str]:
def _embeddings_request_contents(request: dict) -> Iterator[str]:
inp = request.get("input")

if isinstance(inp, str):
Expand All @@ -47,9 +38,7 @@ def _embeddings_request_contents(
logger.warning(f"Unexpected type of embeddings input: {type(inp)}")


def _chat_completion_message_contents(
logger: Logger, message: dict
) -> Iterator[str]:
def _chat_completion_message_contents(message: dict) -> Iterator[str]:
content = message.get("content")
if content is None:
return
Expand Down
6 changes: 5 additions & 1 deletion aidial_analytics_realtime/influx_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

from aidial_analytics_realtime.utils.logging import app_logger as logger
from aidial_analytics_realtime.utils.timer import Timer

InfluxWriterAsync = Callable[[Point], Awaitable[None]]


Expand All @@ -19,6 +22,7 @@ def create_influx_writer() -> Tuple[InfluxDBClientAsync, InfluxWriterAsync]:
influx_write_api = client.write_api()

async def influx_writer_impl(record: Point):
await influx_write_api.write(bucket=influx_bucket, record=record)
with Timer(logger.debug, format="influx {elapsed}"):
await influx_write_api.write(bucket=influx_bucket, record=record)

return client, influx_writer_impl
Loading
Loading