Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Frontend][V1] Online serving performance improvements #12287

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import atexit
import gc
import importlib
import inspect
import multiprocessing
Expand Down Expand Up @@ -104,6 +105,11 @@ async def _force_log():
task.add_done_callback(_running_tasks.remove)
else:
task = None

# Mark the startup heap as static so that it's ignored by GC.
# Reduces pause times of oldest generation collections.
gc.collect()
gc.freeze()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call unfreeze at some point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is mostly static stuff that will be around for the lifetime of the process anyhow.

https://www.rippling.com/blog/the-garbage-collector-fights-back

try:
yield
finally:
Expand Down
30 changes: 19 additions & 11 deletions vllm/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
import time
from argparse import Namespace
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, ClassVar, Dict, List, Literal, Optional, Set, Union

import torch
from pydantic import BaseModel, ConfigDict, Field, model_validator
Expand Down Expand Up @@ -42,23 +42,31 @@ class OpenAIBaseModel(BaseModel):
# OpenAI API does allow extra fields
model_config = ConfigDict(extra="allow")

# Cache class field names
field_names: ClassVar[Optional[Set[str]]] = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was noticeable overhead creating this set every time one of these objects is instantiated.


@model_validator(mode="before")
@classmethod
def __log_extra_fields__(cls, data):
if isinstance(data, dict):

field_names = cls.field_names
if field_names is None:
if not isinstance(data, dict):
return data
# Get all class field names and their potential aliases
field_names = set()
for field_name, field in cls.model_fields.items():
field_names.add(field_name)
if hasattr(field, 'alias') and field.alias:
field_names.add(field.alias)

# Compare against both field names and aliases
extra_fields = data.keys() - field_names
if extra_fields:
logger.warning(
"The following fields were present in the request "
"but ignored: %s", extra_fields)
if alias := getattr(field, 'alias', None):
field_names.add(alias)
cls.field_names = field_names

# Compare against both field names and aliases
if any(k not in field_names for k in data):
logger.warning(
"The following fields were present in the request "
"but ignored: %s",
data.keys() - field_names)
return data


Expand Down
58 changes: 42 additions & 16 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import math
import os
from typing import AsyncGenerator, List, Mapping, Optional, Type, Union

import numpy as np

from vllm.config import ModelConfig, VllmConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.protocol import EngineClient
Expand All @@ -26,6 +29,11 @@

logger = init_logger(__name__)

# For now determined empirically.
# Larger => higher ITL variance
# Smaller => higher TTFT, throughput impacted
OUTPUT_PROCESSING_CHUNK_SIZE = 128


class AsyncLLM(EngineClient):

Expand Down Expand Up @@ -205,17 +213,15 @@ async def generate(

# The output_handler task pushes items into the queue.
# This task pulls from the queue and yields to caller.
while True:
finished = False
while not finished:
# Note: drain queue without await if possible (avoids
# task switching under load which helps performance).
out = q.get_nowait() if q.qsize() > 0 else await q.get()
out = q.get_nowait() if not q.empty() else await q.get()

# Note: both OutputProcessor and EngineCore handle their
# own request cleanup based on finished.
if out.finished:
yield out
break

finished = out.finished
yield out

# If the request is disconnected by the client, the
Expand All @@ -233,22 +239,42 @@ async def _run_output_handler(self):
# 1) Pull EngineCoreOutputs from the EngineCore.
outputs = await self.engine_core.get_output_async()

# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(
outputs.outputs)
# NOTE: RequestOutputs are pushed to their queues.
assert len(processed_outputs.request_outputs) == 0

# 3) Abort any reqs that finished due to stop strings.
await self.engine_core.abort_requests_async(
processed_outputs.reqs_to_abort)
# Split outputs into chunks of at most
# OUTPUT_PROCESSING_CHUNK_SIZE, so that we don't block the
# event loop for too long.
num_outputs = len(outputs.outputs)
if num_outputs <= OUTPUT_PROCESSING_CHUNK_SIZE:
slices = (outputs.outputs, )
else:
slices = np.array_split(
outputs.outputs,
math.ceil(num_outputs / OUTPUT_PROCESSING_CHUNK_SIZE))

iteration_stats = None
for i, outputs_slice in enumerate(slices):

# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(
outputs_slice, iteration_stats)
# NOTE: RequestOutputs are pushed to their queues.
assert not processed_outputs.request_outputs
iteration_stats = processed_outputs.iteration_stats

# Allow other asyncio tasks to run between chunks
if i + 1 < len(slices):
await asyncio.sleep(0)

# 3) Abort any reqs that finished due to stop strings.
await self.engine_core.abort_requests_async(
processed_outputs.reqs_to_abort)

# 4) Logging.
# TODO(rob): make into a coroutine and launch it in
# background thread once we add Prometheus.
assert iteration_stats is not None
self._log_stats(
scheduler_stats=outputs.scheduler_stats,
iteration_stats=processed_outputs.iteration_stats,
iteration_stats=iteration_stats,
)

except Exception as e:
Expand Down
20 changes: 17 additions & 3 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import os
import signal
import weakref
from abc import ABC, abstractmethod
from typing import List, Type
from typing import List, Optional, Type

import msgspec
import zmq
Expand Down Expand Up @@ -242,10 +243,23 @@ def __init__(self, vllm_config: VllmConfig,
log_stats=True,
)

self.outputs_queue: Optional[asyncio.Queue[bytes]] = None
self.queue_task: Optional[asyncio.Task] = None

async def get_output_async(self) -> EngineCoreOutputs:
if self.outputs_queue is None:
# Perform IO in separate task to parallelize as much as possible
self.outputs_queue: asyncio.Queue[bytes] = asyncio.Queue()

async def process_outputs_socket():
while True:
(frame, ) = await self.output_socket.recv_multipart(
copy=False)
self.outputs_queue.put_nowait(frame.buffer)

self.queue_task = asyncio.create_task(process_outputs_socket())

frames = await self.output_socket.recv_multipart(copy=False)
return self.decoder.decode(frames[0].buffer)
return self.decoder.decode(await self.outputs_queue.get())

async def _send_input(self, request_type: EngineCoreRequestType,
request: EngineCoreRequestUnion) -> None:
Expand Down
6 changes: 4 additions & 2 deletions vllm/v1/engine/output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def add_request(
def process_outputs(
self,
engine_core_outputs: List[EngineCoreOutput],
iteration_stats: Optional[IterationStats] = None,
) -> OutputProcessorOutput:
"""
Process the EngineCoreOutputs:
Expand Down Expand Up @@ -133,7 +134,8 @@ def process_outputs(

request_outputs: List[RequestOutput] = []
reqs_to_abort: List[str] = []
iteration_stats = IterationStats(self.log_stats)
if not iteration_stats:
iteration_stats = IterationStats(self.log_stats)
Comment on lines -136 to +138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass in an iteration stats just to override it? Is the arg more like a bool?

for engine_core_output in engine_core_outputs:
req_id = engine_core_output.request_id
req_state = self.request_states.get(req_id)
Expand Down Expand Up @@ -175,8 +177,8 @@ def process_outputs(
iteration_stats=iteration_stats,
)

@staticmethod
def _make_request_output(
self,
request_state: RequestState,
detokenizer_output: Optional[DetokenizerOutput],
) -> Optional[RequestOutput]:
Expand Down
18 changes: 6 additions & 12 deletions vllm/v1/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def __init__(
# recomputing.
self._kv_block_hashes: List[BlockHashType] = []

# Read-only views
# Prevent directly appending to the these lists since
# they should also be updated simultaneously.
self.output_token_ids = ConstantList(self._output_token_ids)
self.all_token_ids = ConstantList(self._all_token_ids)

@classmethod
def from_engine_core_request(cls, request: EngineCoreRequest) -> "Request":
return cls(
Expand All @@ -79,18 +85,6 @@ def from_engine_core_request(cls, request: EngineCoreRequest) -> "Request":
lora_request=request.lora_request,
)

@property
def output_token_ids(self) -> ConstantList[int]:
# Prevent directly appending to the output_token_ids since
# all_token_ids should also be updated simultaneously.
return ConstantList(self._output_token_ids)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid constructing these objects every time the properties are accessed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thought properties were cached after the first call, nice call

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thought properties were cached after the first call, nice call

That would involve the use of cached_property.


@property
def all_token_ids(self) -> ConstantList[int]:
# Prevent directly appending to the all_token_ids since
# output_token_ids should also be updated simultaneously
return ConstantList(self._all_token_ids)

def append_output_token_ids(
self,
token_ids: Union[int, List[int]],
Expand Down
Loading