福生无量摸鱼天尊

vllm v1 源码解析 —— Core

2025/09/23
12
0

一个client建立之后就会建立一个core engine,这些配置会通过QMZ IPC发送给core engine。

Core engine Architecture

Worker and Executor

MultiprocExecutor

在MultiprocExecutor类中,可以清晰的找到三部曲:

1、创建RPC消息队列

# Initialize worker and set up message queues for SchedulerOutputs
# and ModelRunnerOutputs
max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
self.rpc_broadcast_mq = MessageQueue(self.world_size,
                                     self.world_size,
                                     max_chunk_bytes=max_chunk_bytes)
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()

2、初始化worker和开始工作

for rank in range(self.world_size):
    unready_workers.append(
        WorkerProc.make_worker_process(
            vllm_config=self.vllm_config,
            local_rank=rank,
            rank=rank,
            distributed_init_method=distributed_init_method,
            input_shm_handle=scheduler_output_handle,
        ))

# Workers must be created before wait_for_ready to avoid
# deadlock, since worker.init_device() does a device sync.
self.workers = WorkerProc.wait_for_ready(unready_workers)

# Ensure message queues are ready. Will deadlock if re-ordered
# Must be kept consistent with the WorkerProc.
self.rpc_broadcast_mq.wait_until_ready()
for w in self.workers:
    w.worker_response_mq.wait_until_ready()

self.start_worker_monitor()
success = True

3、执行模型,拿到输出

# For pipeline parallel, we use a thread pool for asynchronous
# execute_model.
if self.max_concurrent_batches > 1:
    # Note: must use only 1 IO thread to keep dequeue sequence
    # from the response queue
    # _async_aggregate_workers_output also assumes a single IO thread
    self.io_thread_pool = ThreadPoolExecutor(
        max_workers=1, thread_name_prefix="mp_exec_io")

self.output_rank = self._get_output_rank()
self.has_connector = self.vllm_config.kv_transfer_config is not None
self.kv_output_aggregator = KVOutputAggregator(
    self.parallel_config.world_size)

由上可知,整个执行依赖于WorkerProc和MessageQueue,接下来一一解析

WorkerProc

初始化的时候会把消息队列进行一对一的配对

# Initialize MessageQueue for receiving SchedulerOutput
self.rpc_broadcast_mq = MessageQueue.create_from_handle(
    input_shm_handle, self.worker.rank)

# Initializes a message queue for sending the model output
self.worker_response_mq = MessageQueue(1, 1)

最主要的就是worker_main函数

def monitor_parent_death():
    try:
        # This will block until parent process exits (pipe closes)
        death_pipe.recv()
    except EOFError:
        # Parent process has exited, terminate this worker
        logger.info("Parent process exited, terminating worker")
        # Send signal to self to trigger clean shutdown
        shutdown_event.set()
    except Exception as e:
        logger.warning("Death monitoring error: %s", e)


death_monitor = Thread(target=monitor_parent_death,
                        daemon=True,
                        name="WorkerDeathMonitor")
death_monitor.start()

reader.close()
worker = WorkerProc(*args, **kwargs)

# Send READY once we know everything is loaded
ready_writer.send({
    "status":
    WorkerProc.READY_STR,
    "handle":
    worker.worker_response_mq.export_handle(),
})

# Ensure message queues are ready. Will deadlock if re-ordered.
# Must be kept consistent with the Executor
worker.rpc_broadcast_mq.wait_until_ready()
worker.worker_response_mq.wait_until_ready()

最后:

finally:
    if ready_writer is not None:
        ready_writer.close()
    if death_pipe is not None:
        death_pipe.close()
    # Clean up once worker exits busy loop
    if worker is not None:
        worker.shutdown()

是通过

参考文献:

deepwiki —— vllm