Skip to content

Commit

Permalink
Some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrmchlsmth committed Nov 15, 2024
1 parent f13d18d commit 12b8813
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 48 deletions.
2 changes: 2 additions & 0 deletions vllm/distributed/device_communicators/shm_broadcast.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import struct
import time
import sys
from contextlib import contextmanager
from dataclasses import dataclass, field
from multiprocessing import shared_memory
Expand Down Expand Up @@ -29,6 +30,7 @@
if sys.version_info[:3] >= (3, 11, 1):
USE_SCHED_YIELD = True


class ShmRingBuffer:

def __init__(self,
Expand Down
15 changes: 8 additions & 7 deletions vllm/v1/core/scheduler_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from vllm.multimodal import MultiModalKwargs
from vllm.multimodal.base import PlaceholderRange


class NewRequestData(msgspec.Struct,
array_like=True,
omit_defaults=True,
Expand Down Expand Up @@ -41,9 +42,9 @@ def from_request(


class ResumedRequestData(msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False):
array_like=True,
omit_defaults=True,
gc=False):

req_id: str
block_ids: List[int]
Expand All @@ -64,9 +65,9 @@ def from_request(


class RunningRequestData(msgspec.Struct,
array_like=True,
omit_defaults=True,
gc=False):
array_like=True,
omit_defaults=True,
gc=False):

req_id: str
new_block_ids: List[int]
Expand All @@ -85,6 +86,7 @@ def from_request(
num_computed_tokens=num_computed_tokens,
)


class SchedulerOutput(msgspec.Struct,
array_like=True,
omit_defaults=True,
Expand All @@ -101,4 +103,3 @@ class SchedulerOutput(msgspec.Struct,
preempted_req_ids: Set[str]
finished_req_ids: Set[str]
free_encoder_input_ids: List[Tuple[str, int]]

28 changes: 10 additions & 18 deletions vllm/v1/executor/multiproc_gpu_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,15 @@ def __init__(self, vllm_config: VllmConfig) -> None:
result_handler.start()
self.worker_monitor.start()

self._run_workers("initialize")
self._run_workers("load_model")

# Initialize worker and set up message queues for SchedulerOutputs
# and ModelRunnerOutputs
self.scheduler_output_sender = MessageQueue(world_size, world_size)
model_output_receiver_handle = self._run_workers(
"initialize_message_queues",
self.scheduler_output_sender.export_handle())[0]
"initialize", self.scheduler_output_sender.export_handle())[0]
self.model_output_receiver = MessageQueue.create_from_handle(
model_output_receiver_handle, 0)

# Message queues are not valid until all readers and writers call
# wait_until_ready()
wait_futures = self._run_workers("finish_message_queue_initialization",
run_async=True)
self.scheduler_output_sender.wait_until_ready()
self.model_output_receiver.wait_until_ready()
for output in wait_futures:
output.get()
self._run_workers("load_model")

# Flag that's set if workers are waiting in the main execution loop
self.workers_in_busy_loop = False
Expand Down Expand Up @@ -187,20 +176,23 @@ def initialize_cache(self, num_gpu_blocks: int) -> None:
self._run_workers("initialize_cache", num_gpu_blocks)
self._run_workers("compile_or_warm_up_model")

def start_workers(self):
self._run_workers("execute_model_busy_loop", run_async=True)
self.scheduler_output_sender.wait_until_ready()
self.model_output_receiver.wait_until_ready()
self.workers_in_busy_loop = True

def execute_model(
self,
scheduler_output,
) -> ModelRunnerOutput:
# TODO: Find a better way to start this loop
if not self.workers_in_busy_loop:
self._run_workers("execute_model_busy_loop", run_async=True)
self.workers_in_busy_loop = True
self.start_workers()

self.scheduler_output_sender.enqueue(scheduler_output)
model_output = self.model_output_receiver.dequeue(ModelRunnerOutput)
return model_output

def check_health(self) -> None:
# GPUExecutor will always be healthy as long as
# it's running.
# GPUExecutor will always be healthy as long as it's running.
return
42 changes: 19 additions & 23 deletions vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def initialize(self):
self.local_rank)
# Set random seed.
set_random_seed(self.model_config.seed)

# Construct the model runner
self.model_runner = GPUModelRunner(self.vllm_config, self.device)

Expand Down Expand Up @@ -187,30 +186,13 @@ def __init__(
self.worker = Worker(vllm_config, local_rank, rank,
distributed_init_method)

def initialize_message_queues(self, scheduler_output_receiver_handle):
# Initialize MessageQueue for receiving SchedulerOutput
# Add 1 rank to account for driver process
self.scheduler_output_receiver = MessageQueue.create_from_handle(
scheduler_output_receiver_handle, self.worker.rank)

# Initialize group coordinator for sending the ModelRunnerOutput
# to the driver process
if self.worker.rank == 0:
self.model_output_sender = MessageQueue(1, 1)
return self.model_output_sender.export_handle()
else:
self.model_output_sender = None
return None

# Message queues are not valid until all readers and writers call
# wait_until_ready()
def finish_message_queue_initialization(self):
# Main busy loop for Multiprocessing Workers
def execute_model_busy_loop(self):
# Ensure shm message queues are ready before entering the busy loop
self.scheduler_output_receiver.wait_until_ready()
if self.worker.rank == 0:
self.model_output_sender.wait_until_ready()

# Main busy loop for Multiprocessing Workers
def execute_model_busy_loop(self):
with torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
Expand Down Expand Up @@ -238,9 +220,23 @@ def execute_model_busy_loop(self):

p.step()

# Wrapper methods defined here
def initialize(self):
def initialize(self, input_shm_handle):
# Initialize MessageQueue for receiving SchedulerOutput
# Add 1 rank to account for driver process
self.scheduler_output_receiver = MessageQueue.create_from_handle(
input_shm_handle, self.worker.rank)

# Initialize group coordinator for sending the ModelRunnerOutput
# to the driver process
if self.worker.rank == 0:
self.model_output_sender = MessageQueue(1, 1)
output_shm_handle = self.model_output_sender.export_handle()
else:
self.model_output_sender = None
output_shm_handle = None

self.worker.initialize()
return output_shm_handle

def load_model(self):
self.worker.load_model()
Expand Down

0 comments on commit 12b8813

Please sign in to comment.