LLM Part 3: Parallelism

Lecture 07与08介绍了深度学习中的并行训练方法,包括Data Parallelism, Model Parallelism,ZeRO,Pipleline Parallelism等技术。重点讲解了各类并行方法的原理、实现及其在大规模模型训练中的应用。个人认为这两节内容非常重要,理解这些并行训练技术对于处理大规模深度学习模型至关重要。
Author

Yuyang Zhang

在Lecture07和08的课程中,我们讲探讨不同的Parallelism的技巧,包括Data Parallelism, Model Parallelism,ZeRO,Pipleline Parallelism等技术。我们将重点讲解各类并行方法的原理、实现及其在大规模模型训练中的应用。个人认为这两节内容非常重要,理解这些并行训练技术对于处理大规模深度学习模型至关重要。

废话不多说,让我们开始学习!

使用GPUs很简单,但是想要

Reduce communication across GPUs/nodes, via replication / sharding

1 为什么要并行训练?

Intra-node communication: GPU-GPU communication within a single node (e.g., via NVLink) Inter-node communication: GPU-GPU communication across different nodes (e.g., via Ethernet or InfiniBand)

2 Building Blocks of Distributed Communication / Computation

Figure 1
  • Rank: 每个GPU都有一个唯一的ID。
  • World Size: 所有参与训练的GPU的总数。

2.1 Collective Operations

Collective 意思是:设计一个general的communication pattern,来支持跨不同的devices。接下来看几个比较常见的例子:

2.1.1 Broadcast

Figure 2

Broadcast的意思是:从一个GPU(通常是rank 0)向所有其他GPU发送数据。

通常作为模型参数初始化的第一步,或者在训练过程中需要同步某些数据时使用。Load Initial Checkpoint 并且broadcast to all ranks。

2.1.2 Scatter

Figure 3

Scatter 的意思是:将一个大的数据集分成小块,分别发送到不同的GPU上进行处理。 需要注意的是,Scatter要确保数据的分割方式是合理的。

2.1.3 Gather

Figure 4

Gather 的意思是:将分布在不同GPU上的数据收集到一个GPU上进行处理。通常在训练结束后需要将结果汇总到一个GPU上进行评估或者保存模型时使用。 Gather的实现需要考虑通信效率,尤其是在大规模分布式训练中,可能会成为瓶颈。

2.1.4 Reduce

Figure 5

reduce 的意思是:将分布在不同GPU上的数据进行某种聚合操作(如求和、平均等),并将结果发送到一个GPU上。通常在训练过程中需要对梯度进行reduce操作,以便更新模型参数。

2.1.5 All Gather

Figure 6

在All Gather操作中,每个GPU将自己的数据发送给所有其他GPU,并且每个GPU都接收来自所有其他GPU的数据。这样,每个GPU最终都拥有了所有GPU的数据。

举个例子,每个Rank 有parameter shard, 每个Rank都需要知道其他Rank的parameter shard,才能进行计算。All Gather操作可以让每个Rank都获得所有Rank的parameter shard,从而完成计算。

2.1.6 Reduce-Scatter

Figure 7

reduce-scatter 的意思是:将分布在不同GPU上的数据进行某种聚合操作(如求和、平均等),并将结果分散到不同的GPU上。通常在训练过程中需要对梯度进行reduce-scatter操作,以便每个GPU都能获得更新后的模型参数的一部分。

2.1.7 All Reduce

Figure 8

All Reduce 的意思是:将分布在不同GPU上的数据进行某种聚合操作(如求和、平均等),并将结果发送到所有GPU上。通常在训练过程中需要对梯度进行all-reduce操作,以便每个GPU都能获得更新后的模型参数。

2.1.8 All-to-All

all to all 是比较general的方法:each rank sends each other rank some tensor (most general)

TIP: How to Remember the Terminology

  • Reduce: 聚合数据并发送到一个GPU上。
  • Scatter: 将数据分割并发送到不同的GPU上。
  • All: 涉及所有GPU。

2.2 Hardware

Figure 9

从图 Figure 9 中可以看出,在相同node中的GPUs,可以通过 PCI(e) bus 进行通信,速度可以到达 (242GB/s)。而在不同node中的GPUs,则需要通过Eterhnet进行通信,速度通常在 (200MB/s) 之间,远远低于PCI(e)的速度。这也是为什么在分布式训练中,通信效率成为一个重要的瓶颈。

接下来我们来看现代的Data Center 里面,不同的GPUs之间是怎么连接的。

Figure 10

从图中看到:

  • 8GPUs per node 并且通过 NVLink 连接到 NVSwitch,速度可以达到(1.8TB/s)。
  • 256 nodes 一个 pod,并且通过 InfiniBand 连接,速度可以达到(200GB/s)。connected by Infiniband (via PCIe -> HCA / Infiniband NIC -> Infiniband cable) (~0.05 TB/s)
  • N pods per cluster, 并且通过 Ethernet 连接,速度可以达到(200MB/s)。

什么叫Bypassing CPU?

RDMA Remote Direct Memory Access (RDMA) 是一种允许计算机直接访问远程计算机内存的技术,而无需通过CPU进行数据传输。这种技术可以显著降低通信延迟和CPU负载,提高分布式系统中的数据传输效率。

2.2.1 NCLL

NCCL translates collective operations into low-level packets that are sent between GPUs. [talk]

Detects topology of hardware (e.g., number of nodes, switches, NVLink/PCIe)

Optimizes the path between GPUs

Launches GPU kernels to send/receive data

2.3 Torch Distributed

主要介绍了PyTorch中的分布式训练框架torch.distributed。

setup

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

在setup之后,

dist.barrier()  # Synchronize all processes

Wait to all the process to reach this point.

接下来看几个操作:

  • dist.all_reduce(tensor, op=dist.ReduceOp.SUM): 对所有GPU上的tensor进行求和,并将结果发送到所有GPU上。其中 async_op 参数表示是否异步执行,如果设置为True,则函数会立即返回一个Future对象,表示操作的状态,可以通过调用Future对象的wait()方法来等待操作完成。
  • dist.reduce_scatter(tensor, scatter_list, op=dist.ReduceOp.SUM): 将分布在不同GPU上的数据进行求和,并将结果分散到不同的GPU上。其中 scatter_list 是一个列表,包含每个GPU上要发送的数据块。
  • dist.all_gather(tensor_list, tensor): 将每个GPU上的tensor发送给所有其他GPU,并将结果收集到一个列表中。其中 tensor_list 是一个列表,包含每个GPU上要接收的数据块。
  • dist.broadcast(tensor, src=0): 从指定的GPU(通常是rank 0)向所有其他GPU发送数据。其中 src 参数表示源GPU的rank。

在Distributed 最后,记得clean up:

def cleanup():
    dist.destroy_process_group()

2.4 Benchmarking

2.4.1 Measure Speed

2.4.2 Measure Effective Bandwidth

3 Distributed Training

3.1 Data Parallelism

Figure 11

根据Batch Size来切,每个GPU负责不同部分的数据,每个GPU上都有完整的模型副本,计算完梯度之后需要进行通信来同步参数更新。

for p in params:
    dist.all_reduce(p.grad.data, op=dist.ReduceOp.AVG)

接下来让我们看一下Data Parallelism是如何Scaling的:

  • Compute Scaling: 每个GPU都得到了B/M的Batch Size,计算量减少了M倍。
  • Communication Overhead: transmit \(2x\) parameters (forward + backward),每个GPU都需要和其他GPU通信来同步参数更新,通信量随着GPU数量的增加而增加。
  • Memory Scaling: None. 每个GPU上都有完整的模型副本,所以内存需求没有减少。

可见,这种最常见的问题就是Memory的问题,每个GPUs都有一份copy

3.1.1 ZeRO

shard the optimizer state, gradients and parameters across GPUs, this corresponing to the 3 stages of ZeRO.

3.1.1.1 ZeRO Stage 1: Optimizer State Sharding

在ZeRO Stage 1中,优化器状态(如动量、二阶矩等)被分割并分布在不同的GPU上。每个GPU只保存自己负责的参数对应的优化器状态,而不是整个模型的优化器状态。

这个是怎么进行的的呢?

3.1.1.2 ZeRO Stage 2: Gradient Sharding

在ZeRO Stage 2中,除了优化器状态之外,梯度也被分割并分布在不同的GPU上。每个GPU只计算自己负责的参数对应的梯度,并且只保存自己负责的参数对应的梯度。

3.1.1.3 ZeRO Stage 3: Parameter Sharding (a.k.a Fully Sharded Data Parallel, FSDP)

在ZeRO Stage 3中,除了优化器状态和梯度之外,模型参数也被分割并分布在不同的GPU上。每个GPU只保存自己负责的参数,并且在计算过程中需要通信来获取其他GPU上的参数。

Critical Batch Size

Still Has Memory Issue

3.2 Model Tensor Parallelism

3.2.1 Pipeline Parallelism

Figure 12

3.2.1.1 Layer-wise Pipeline Parallelism

3.2.1.2 Pipeline Parallelism

The problem of the simple pipeling parallelism is the pipeline bubble. 由于每个GPU需要等待前一个GPU完成计算才能开始计算,所以在训练过程中会出现空闲时间,导致效率降低。

3.2.1.3 Zero bubble Pipeline

3.2.2 Tensor Parallelism

Figure 13

each GPUs 保存一部分的模型参数,计算时需要通信来获取其他GPU上的参数。

3.2.2.1 Column Tensor Parallelism

3.2.2.2 Row Tensor Parallelism

Back to top