一个client建立之后就会建立一个core engine,这些配置会通过QMZ IPC发送给core engine。
在MultiprocExecutor类中,可以清晰的找到三部曲:
1、创建RPC消息队列
# Initialize worker and set up message queues for SchedulerOutputs
# and ModelRunnerOutputs
max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024
self.rpc_broadcast_mq = MessageQueue(self.world_size,
self.world_size,
max_chunk_bytes=max_chunk_bytes)
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()2、初始化worker和开始工作
for rank in range(self.world_size):
unready_workers.append(
WorkerProc.make_worker_process(
vllm_config=self.vllm_config,
local_rank=rank,
rank=rank,
distributed_init_method=distributed_init_method,
input_shm_handle=scheduler_output_handle,
))
# Workers must be created before wait_for_ready to avoid
# deadlock, since worker.init_device() does a device sync.
self.workers = WorkerProc.wait_for_ready(unready_workers)
# Ensure message queues are ready. Will deadlock if re-ordered
# Must be kept consistent with the WorkerProc.
self.rpc_broadcast_mq.wait_until_ready()
for w in self.workers:
w.worker_response_mq.wait_until_ready()
self.start_worker_monitor()
success = True3、执行模型,拿到输出
# For pipeline parallel, we use a thread pool for asynchronous
# execute_model.
if self.max_concurrent_batches > 1:
# Note: must use only 1 IO thread to keep dequeue sequence
# from the response queue
# _async_aggregate_workers_output also assumes a single IO thread
self.io_thread_pool = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="mp_exec_io")
self.output_rank = self._get_output_rank()
self.has_connector = self.vllm_config.kv_transfer_config is not None
self.kv_output_aggregator = KVOutputAggregator(
self.parallel_config.world_size)由上可知,整个执行依赖于WorkerProc和MessageQueue,接下来一一解析
初始化的时候会把消息队列进行一对一的配对
# Initialize MessageQueue for receiving SchedulerOutput
self.rpc_broadcast_mq = MessageQueue.create_from_handle(
input_shm_handle, self.worker.rank)
# Initializes a message queue for sending the model output
self.worker_response_mq = MessageQueue(1, 1)最主要的就是worker_main函数
def monitor_parent_death():
try:
# This will block until parent process exits (pipe closes)
death_pipe.recv()
except EOFError:
# Parent process has exited, terminate this worker
logger.info("Parent process exited, terminating worker")
# Send signal to self to trigger clean shutdown
shutdown_event.set()
except Exception as e:
logger.warning("Death monitoring error: %s", e)
death_monitor = Thread(target=monitor_parent_death,
daemon=True,
name="WorkerDeathMonitor")
death_monitor.start()reader.close()
worker = WorkerProc(*args, **kwargs)
# Send READY once we know everything is loaded
ready_writer.send({
"status":
WorkerProc.READY_STR,
"handle":
worker.worker_response_mq.export_handle(),
})
# Ensure message queues are ready. Will deadlock if re-ordered.
# Must be kept consistent with the Executor
worker.rpc_broadcast_mq.wait_until_ready()
worker.worker_response_mq.wait_until_ready()最后:
finally:
if ready_writer is not None:
ready_writer.close()
if death_pipe is not None:
death_pipe.close()
# Clean up once worker exits busy loop
if worker is not None:
worker.shutdown()是通过
参考文献: