福生无量摸鱼天尊

vllm v1 源码解析 —— 单机八卡推理

2025/09/26
72
0

单机八卡,我们按照PP + TP的方式来进行方案说明,使用的是vllm框架,主要命令和函数如下:

python single_node_multi_gpu_demo.py --mode pipeline_parallel --tensor-parallel 4 --pipeline-parallel 2 --model facebook/opt-13b

def pipeline_parallel_inference(self, model_name: str, tensor_parallel_size: int, pipeline_parallel_size: int):
    """流水线并行推理 - 将模型层分布到多个GPU上"""
    print(f"🚀 启动流水线并行推理 - 模型: {model_name}")
    print(f"   张量并行: {tensor_parallel_size}, 流水线并行: {pipeline_parallel_size}")
    
    try:
        # 创建LLM实例,使用张量并行 + 流水线并行
        self.llm = LLM(
            model=model_name,
            tensor_parallel_size=tensor_parallel_size,
            pipeline_parallel_size=pipeline_parallel_size,
            # 流水线并行配置
            max_num_seqs=16,  # 流水线并行时建议较小的batch size
            gpu_memory_utilization=0.8,
            trust_remote_code=True,
        )
        
        prompts = self.setup_test_prompts()
        print(f"📝 处理 {len(prompts)} 个提示词...")
        
        # 批量推理
        start_time = time.time()
        outputs = self.llm.generate(prompts, self.sampling_params)
        end_time = time.time()
        
        # 显示结果
        self._display_results(outputs, end_time - start_time)
        
    except Exception as e:
        print(f"❌ 流水线并行推理失败: {e}")

在单机八卡的场景下,我们假设使用如下配置:

# 配置示例
parallel_config = ParallelConfig(
    tensor_parallel_size=4,      # 每个stage内4-way头部并行
    pipeline_parallel_size=2,    # 2个pipeline stage
)

# GPU拓扑结构
"""
Pipeline Stage 0 (前16层):
├── GPU 0 ← 头部 0-7,   layers 0-15
├── GPU 1 ← 头部 8-15,  layers 0-15  
├── GPU 2 ← 头部 16-23, layers 0-15
└── GPU 3 ← 头部 24-31, layers 0-15

Pipeline Stage 1 (后16层):
├── GPU 4 ← 头部 0-7,   layers 16-31
├── GPU 5 ← 头部 8-15,  layers 16-31
├── GPU 6 ← 头部 16-23, layers 16-31
└── GPU 7 ← 头部 24-31, layers 16-31
"""

既然是TP + PP,我们先来看TP是怎么实现的

Attention的TP

由上图不难知,这里Q要和K乘,然后再和V乘,所以这里有两个矩阵乘法。如果是多头注意力的话,是GPU并行的计算不同的注意力,然后进行汇总。

详细过程请看下面:

输入: [batch, seq_len, hidden_size]

=== 第1步: QKV投影 (ColumnParallelLinear) ===
GPU 0: Input @ W_qkv_0 → QKV_0 [batch, seq_len, (heads_0 + 2*kv_heads_0) * head_dim]
GPU 1: Input @ W_qkv_1 → QKV_1 [batch, seq_len, (heads_1 + 2*kv_heads_1) * head_dim]  
GPU 2: Input @ W_qkv_2 → QKV_2 [batch, seq_len, (heads_2 + 2*kv_heads_2) * head_dim]
GPU 3: Input @ W_qkv_3 → QKV_3 [batch, seq_len, (heads_3 + 2*kv_heads_3) * head_dim]

权重分布:
- W_qkv_0: 负责头部 0-7 的Q、K、V权重
- W_qkv_1: 负责头部 8-15 的Q、K、V权重  
- W_qkv_2: 负责头部 16-23 的Q、K、V权重
- W_qkv_3: 负责头部 24-31 的Q、K、V权重

=== 第2步: QKV分割 (在各自GPU上) ===
GPU 0: QKV_0.split() → Q_0, K_0, V_0 (头部 0-7)
GPU 1: QKV_1.split() → Q_1, K_1, V_1 (头部 8-15)
GPU 2: QKV_2.split() → Q_2, K_2, V_2 (头部 16-23)  
GPU 3: QKV_3.split() → Q_3, K_3, V_3 (头部 24-31)

=== 第3步: 注意力计算 (各GPU独立) ===
GPU 0: Attention(Q_0, K_0, V_0) → AttnOut_0 [batch, seq_len, heads_0 * head_dim]
GPU 1: Attention(Q_1, K_1, V_1) → AttnOut_1 [batch, seq_len, heads_1 * head_dim]
GPU 2: Attention(Q_2, K_2, V_2) → AttnOut_2 [batch, seq_len, heads_2 * head_dim]
GPU 3: Attention(Q_3, K_3, V_3) → AttnOut_3 [batch, seq_len, heads_3 * head_dim]

关键点: 每个GPU的Q、K、V都是完整的,可以独立计算注意力!

=== 第4步: 输出投影 (RowParallelLinear) ===
GPU 0: W_o  @ AttnOut_0 → PartialOut_0  # 前1/4行
GPU 1: W_o @ AttnOut_1 → PartialOut_1  # 第2个1/4行
GPU 2: W_o @ AttnOut_2 → PartialOut_2  # 第3个1/4行  
GPU 3: W_o @ AttnOut_3 → PartialOut_3  # 最后1/4行

=== 第5步: All-Reduce聚合 ===
FinalOut = Sum(PartialOut_0, PartialOut_1, PartialOut_2, PartialOut_3)

  • Prefill阶段,需要把输入的token进行RoPE,也就是embedding,这就是第一步,产生对应的QKV,然后对QKV进行分割,计算对应的attention。此后已经算过的KV可以进行缓存,也就是KV cache

  • 这里重点是矩阵乘法的时候,产生QKV_0QKV_1QKV_2QKV_3的时候,第三维度hidden_size膨胀了,所以要按列切分放到不同的GPU中进行多头的计算。每个GPU有自己的完整的QKV矩阵。

  • GPU进行attention计算的时候,要对输出进行投影。由于行×列可知,AttnOut的第三维是列,所以投影矩阵需要从行切开,所以这里是行切分,得到的但是结果的部分连续行。

  • QKV在每个GPU的完整性是有利于KV cache,每个GPU可以独立缓存自己完整的KV数据,无需跨GPU去访问数据,别的GPU cache miss也不用理会,节省通信开销

  • 每个kv cache都是按照block来管理的,

# 假设32个注意力头在4张GPU上的分布
GPU 0: 头部 0-7   (num_heads_per_gpu = 8)
GPU 1: 头部 8-15  (num_heads_per_gpu = 8) 
GPU 2: 头部 16-23 (num_heads_per_gpu = 8)
GPU 3: 头部 24-31 (num_heads_per_gpu = 8)

# KV Cache存储布局
每个GPU的KV Cache: [num_blocks, block_size, heads_per_gpu, head_size]
                   [    1024,        16,             8,        128]


# 源码: vllm/worker/cache_engine.py:75-76
kv_cache_generic_shape = self.attn_backend.get_kv_cache_shape(
    num_blocks, self.block_size, self.num_kv_heads, self.head_size)

# FlashAttention后端的形状定义 (vllm/attention/backends/flash_attn.py:76-78)
def get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size):
    return (2, num_blocks, block_size, num_kv_heads, head_size)
    #      ↑   ↑          ↑           ↑             ↑
    #      K/V 块数量      每块token数  当前GPU头数   每头维度
  • 更具体的kv cache源码在别的博客细讲。

这样,我们就对整体的TP和各个GPU中实际运行的TP有本质上的理解,结合公式,我们从kv cache的角度理解了实际数据的运行过程,接下来我们来看源码中的TP

vllm的TP

TP的作用是把模型的权重进行切分,放到不同的GPU中,计算完了再进行组合,当然有按照行切或者列切的方法,vllm选择"列切分→行切分"只是其中一种策略,理论上来说有四种。

  • 列切分 → 行切分 (VLLM默认)

# 第一层: ColumnParallelLinear (列切分)
W1: [hidden, intermediate] → 按列切分给4个GPU
H_parts = Input @ W1_parts  # 每GPU得到不同的中间激活

# 第二层: RowParallelLinear (行切分)  
W2: [intermediate, hidden] → 按行切分给4个GPU
O_parts = H_parts @ W2_parts  # 每GPU计算部分输出

# 通信: 1次All-Reduce (在最后)
Output = sum(O_parts)

  • 行切分 → 列切分

# 第一层: RowParallelLinear (行切分)
W1: [hidden, intermediate] → 按输出维度切分
H_parts = Input @ W1_parts  # 每GPU计算部分中间激活
H_full = sum(H_parts)  # All-Reduce聚合

# 第二层: ColumnParallelLinear (列切分)
W2: [intermediate, hidden] → 按列切分
O_parts = H_full @ W2_parts  # 每GPU计算部分输出

# 通信: 1次All-Reduce + 1次All-Gather
Output = concat(O_parts)  # 拼接最终输出

  • 纯列切分

# 两层都是列切分
# 通信: 2次All-Gather

  • 纯行切分

# 两层都是行切分  
# 通信: 2次All-Reduce

由上可见,由于attention计算的特殊性,vllm默认的先列后行的切分是巧妙的设计。在vllm中,行切分和列切分的源代码均在vllm/vllm/model_executor/layers/linear.py 。其中对列切分的qkv部分代码如下:

class QKVParallelLinear(ColumnParallelLinear):
"""Linear layers for the attention's QKV transformation."""
    def __init__():
        tp_size = get_tensor_model_parallel_world_size()
        input_size = self.hidden_size
        output_size = (self.num_heads +
                       2 * self.num_kv_heads) * tp_size * self.head_size
        self.output_sizes = [
            self.num_heads * self.head_size * tp_size,  # q_proj
            self.num_kv_heads * self.head_size * tp_size,  # k_proj
            self.num_kv_heads * self.head_size * tp_size,  # v_proj 
        ]


class ColumnParallelLinear(LinearBase):
    """Linear layer with column parallelism."""
    def __init__():
        # Divide the weight matrix along the last dimension.
        self.tp_size = get_tensor_model_parallel_world_size()
        self.input_size_per_partition = input_size
        self.output_size_per_partition = divide(output_size, self.tp_size)
        self.output_partition_sizes = [self.output_size_per_partition]
        # If QKV or MergedColumn, use output size of each partition.
        if hasattr(self, "output_sizes"):
            self.output_partition_sizes = [
                divide(output_size, self.tp_size)
                for output_size in self.output_sizes
            ]

    def weight_loader(self, param: Parameter, loaded_weight: torch.Tensor)

    def forward(
        self, input_
    ) -> Union[torch.Tensor, tuple[torch.Tensor, Optional[Parameter]]]:
        output_parallel = self.quant_method.apply(self, input_, bias)
        output = tensor_model_parallel_all_gather(output_parallel)