单机八卡,我们按照PP + TP的方式来进行方案说明,使用的是vllm框架,主要命令和函数如下:
python single_node_multi_gpu_demo.py --mode pipeline_parallel --tensor-parallel 4 --pipeline-parallel 2 --model facebook/opt-13b
def pipeline_parallel_inference(self, model_name: str, tensor_parallel_size: int, pipeline_parallel_size: int):
"""流水线并行推理 - 将模型层分布到多个GPU上"""
print(f"🚀 启动流水线并行推理 - 模型: {model_name}")
print(f" 张量并行: {tensor_parallel_size}, 流水线并行: {pipeline_parallel_size}")
try:
# 创建LLM实例,使用张量并行 + 流水线并行
self.llm = LLM(
model=model_name,
tensor_parallel_size=tensor_parallel_size,
pipeline_parallel_size=pipeline_parallel_size,
# 流水线并行配置
max_num_seqs=16, # 流水线并行时建议较小的batch size
gpu_memory_utilization=0.8,
trust_remote_code=True,
)
prompts = self.setup_test_prompts()
print(f"📝 处理 {len(prompts)} 个提示词...")
# 批量推理
start_time = time.time()
outputs = self.llm.generate(prompts, self.sampling_params)
end_time = time.time()
# 显示结果
self._display_results(outputs, end_time - start_time)
except Exception as e:
print(f"❌ 流水线并行推理失败: {e}")在单机八卡的场景下,我们假设使用如下配置:
# 配置示例
parallel_config = ParallelConfig(
tensor_parallel_size=4, # 每个stage内4-way头部并行
pipeline_parallel_size=2, # 2个pipeline stage
)
# GPU拓扑结构
"""
Pipeline Stage 0 (前16层):
├── GPU 0 ← 头部 0-7, layers 0-15
├── GPU 1 ← 头部 8-15, layers 0-15
├── GPU 2 ← 头部 16-23, layers 0-15
└── GPU 3 ← 头部 24-31, layers 0-15
Pipeline Stage 1 (后16层):
├── GPU 4 ← 头部 0-7, layers 16-31
├── GPU 5 ← 头部 8-15, layers 16-31
├── GPU 6 ← 头部 16-23, layers 16-31
└── GPU 7 ← 头部 24-31, layers 16-31
"""既然是TP + PP,我们先来看TP是怎么实现的

由上图不难知,这里Q要和K乘,然后再和V乘,所以这里有两个矩阵乘法。如果是多头注意力的话,是GPU并行的计算不同的注意力,然后进行汇总。
详细过程请看下面:
输入: [batch, seq_len, hidden_size]
=== 第1步: QKV投影 (ColumnParallelLinear) ===
GPU 0: Input @ W_qkv_0 → QKV_0 [batch, seq_len, (heads_0 + 2*kv_heads_0) * head_dim]
GPU 1: Input @ W_qkv_1 → QKV_1 [batch, seq_len, (heads_1 + 2*kv_heads_1) * head_dim]
GPU 2: Input @ W_qkv_2 → QKV_2 [batch, seq_len, (heads_2 + 2*kv_heads_2) * head_dim]
GPU 3: Input @ W_qkv_3 → QKV_3 [batch, seq_len, (heads_3 + 2*kv_heads_3) * head_dim]
权重分布:
- W_qkv_0: 负责头部 0-7 的Q、K、V权重
- W_qkv_1: 负责头部 8-15 的Q、K、V权重
- W_qkv_2: 负责头部 16-23 的Q、K、V权重
- W_qkv_3: 负责头部 24-31 的Q、K、V权重
=== 第2步: QKV分割 (在各自GPU上) ===
GPU 0: QKV_0.split() → Q_0, K_0, V_0 (头部 0-7)
GPU 1: QKV_1.split() → Q_1, K_1, V_1 (头部 8-15)
GPU 2: QKV_2.split() → Q_2, K_2, V_2 (头部 16-23)
GPU 3: QKV_3.split() → Q_3, K_3, V_3 (头部 24-31)
=== 第3步: 注意力计算 (各GPU独立) ===
GPU 0: Attention(Q_0, K_0, V_0) → AttnOut_0 [batch, seq_len, heads_0 * head_dim]
GPU 1: Attention(Q_1, K_1, V_1) → AttnOut_1 [batch, seq_len, heads_1 * head_dim]
GPU 2: Attention(Q_2, K_2, V_2) → AttnOut_2 [batch, seq_len, heads_2 * head_dim]
GPU 3: Attention(Q_3, K_3, V_3) → AttnOut_3 [batch, seq_len, heads_3 * head_dim]
关键点: 每个GPU的Q、K、V都是完整的,可以独立计算注意力!
=== 第4步: 输出投影 (RowParallelLinear) ===
GPU 0: W_o @ AttnOut_0 → PartialOut_0 # 前1/4行
GPU 1: W_o @ AttnOut_1 → PartialOut_1 # 第2个1/4行
GPU 2: W_o @ AttnOut_2 → PartialOut_2 # 第3个1/4行
GPU 3: W_o @ AttnOut_3 → PartialOut_3 # 最后1/4行
=== 第5步: All-Reduce聚合 ===
FinalOut = Sum(PartialOut_0, PartialOut_1, PartialOut_2, PartialOut_3)在Prefill阶段,需要把输入的token进行RoPE,也就是embedding,这就是第一步,产生对应的QKV,然后对QKV进行分割,计算对应的attention。此后已经算过的KV可以进行缓存,也就是KV cache。
这里重点是矩阵乘法的时候,产生QKV_0、QKV_1、QKV_2、QKV_3的时候,第三维度hidden_size膨胀了,所以要按列切分放到不同的GPU中进行多头的计算。每个GPU有自己的完整的QKV矩阵。
在GPU进行attention计算的时候,要对输出进行投影。由于行×列可知,AttnOut的第三维是列,所以投影矩阵需要从行切开,所以这里是行切分,得到的但是结果的部分连续行。
其QKV在每个GPU的完整性是有利于KV cache,每个GPU可以独立缓存自己完整的KV数据,无需跨GPU去访问数据,别的GPU cache miss也不用理会,节省通信开销

每个kv cache都是按照block来管理的,
# 假设32个注意力头在4张GPU上的分布
GPU 0: 头部 0-7 (num_heads_per_gpu = 8)
GPU 1: 头部 8-15 (num_heads_per_gpu = 8)
GPU 2: 头部 16-23 (num_heads_per_gpu = 8)
GPU 3: 头部 24-31 (num_heads_per_gpu = 8)
# KV Cache存储布局
每个GPU的KV Cache: [num_blocks, block_size, heads_per_gpu, head_size]
[ 1024, 16, 8, 128]
# 源码: vllm/worker/cache_engine.py:75-76
kv_cache_generic_shape = self.attn_backend.get_kv_cache_shape(
num_blocks, self.block_size, self.num_kv_heads, self.head_size)
# FlashAttention后端的形状定义 (vllm/attention/backends/flash_attn.py:76-78)
def get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size):
return (2, num_blocks, block_size, num_kv_heads, head_size)
# ↑ ↑ ↑ ↑ ↑
# K/V 块数量 每块token数 当前GPU头数 每头维度更具体的kv cache源码在别的博客细讲。
这样,我们就对整体的TP和各个GPU中实际运行的TP有本质上的理解,结合公式,我们从kv cache的角度理解了实际数据的运行过程,接下来我们来看源码中的TP。
TP的作用是把模型的权重进行切分,放到不同的GPU中,计算完了再进行组合,当然有按照行切或者列切的方法,vllm选择"列切分→行切分"只是其中一种策略,理论上来说有四种。
列切分 → 行切分 (VLLM默认)
# 第一层: ColumnParallelLinear (列切分)
W1: [hidden, intermediate] → 按列切分给4个GPU
H_parts = Input @ W1_parts # 每GPU得到不同的中间激活
# 第二层: RowParallelLinear (行切分)
W2: [intermediate, hidden] → 按行切分给4个GPU
O_parts = H_parts @ W2_parts # 每GPU计算部分输出
# 通信: 1次All-Reduce (在最后)
Output = sum(O_parts)行切分 → 列切分
# 第一层: RowParallelLinear (行切分)
W1: [hidden, intermediate] → 按输出维度切分
H_parts = Input @ W1_parts # 每GPU计算部分中间激活
H_full = sum(H_parts) # All-Reduce聚合
# 第二层: ColumnParallelLinear (列切分)
W2: [intermediate, hidden] → 按列切分
O_parts = H_full @ W2_parts # 每GPU计算部分输出
# 通信: 1次All-Reduce + 1次All-Gather
Output = concat(O_parts) # 拼接最终输出纯列切分
# 两层都是列切分
# 通信: 2次All-Gather纯行切分
# 两层都是行切分
# 通信: 2次All-Reduce由上可见,由于attention计算的特殊性,vllm默认的先列后行的切分是巧妙的设计。在vllm中,行切分和列切分的源代码均在vllm/vllm/model_executor/layers/linear.py 。其中对列切分的qkv部分代码如下:
class QKVParallelLinear(ColumnParallelLinear):
"""Linear layers for the attention's QKV transformation."""
def __init__():
tp_size = get_tensor_model_parallel_world_size()
input_size = self.hidden_size
output_size = (self.num_heads +
2 * self.num_kv_heads) * tp_size * self.head_size
self.output_sizes = [
self.num_heads * self.head_size * tp_size, # q_proj
self.num_kv_heads * self.head_size * tp_size, # k_proj
self.num_kv_heads * self.head_size * tp_size, # v_proj
]
class ColumnParallelLinear(LinearBase):
"""Linear layer with column parallelism."""
def __init__():
# Divide the weight matrix along the last dimension.
self.tp_size = get_tensor_model_parallel_world_size()
self.input_size_per_partition = input_size
self.output_size_per_partition = divide(output_size, self.tp_size)
self.output_partition_sizes = [self.output_size_per_partition]
# If QKV or MergedColumn, use output size of each partition.
if hasattr(self, "output_sizes"):
self.output_partition_sizes = [
divide(output_size, self.tp_size)
for output_size in self.output_sizes
]
def weight_loader(self, param: Parameter, loaded_weight: torch.Tensor)
def forward(
self, input_
) -> Union[torch.Tensor, tuple[torch.Tensor, Optional[Parameter]]]:
output_parallel = self.quant_method.apply(self, input_, bias)
output = tensor_model_parallel_all_gather(output_parallel)