0x00 摘要

NVIDIA Megatron 是一个基于 PyTorch 的分布式练习框架,用来练习超大Transformer言语模型,其经过归纳应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得咱们深化剖析其背面机理。

本系列大概有 5 篇文章,经过论文和源码和大家一同学习研讨。本文将看看 Megatron 怎样处理设置并行。

本系列其他文章为:

[源码解析] 模型并行分布式练习Megatron (1) — 论文 & 根底

[源码解析] 模型并行分布式练习Megatron (2) — 全体架构

[源码解析] 模型并行分布式练习 Megatron (3) —模型并行实现

另外,本文深度学习了 [细读经典]Megatron论文和代码详细剖析(2),强烈推荐有 Transformer 困惑的朋友深化阅览作者专栏。

0x01 前文回顾

前文咱们对模型并行的原理和代码进行了剖析,关于给定的模型,现在还需求处理几个问题:

  • 怎样把模型切分给节点,比方哪个节点担任哪些层。
  • 数据并行,模型并行,流水线并行这几种并行之中,每个节点别离归于哪个部分?
  • 怎样防止流水线带来的问题。

咱们接下来就仔细剖析一下。

0x02 初始化

initialize_model_parallel 办法用来设置模型并行,所以咱们接下来就详细剖析。

2.1 全局变量

由于前文_initialize_distributed之中调用了torch.distributed.init_process_group 初始化分布式环境,所以咱们知道,每个进程都有自己的 gloabl rank 和 local rank,都有自己的全局变量。

首要变量如下(详细比方能够结合 initialize_model_parallel 之中的注释来看):

  • _TENSOR_MODEL_PARALLEL_GROUP :当时 rank 所归于的Intra-layer model parallel group,便是tensor 并行进程组。
    • 假设每一层分为两个tensor,则 _TENSOR_MODEL_PARALLEL_GROUP 比方为:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。
  • _PIPELINE_MODEL_PARALLEL_GROUP :当时 rank 所归于的Intra-layer model parallel group,便是流水线进程组。
    • 假设流水线深度为4,则比方为 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。
  • _MODEL_PARALLEL_GROUP :当时 rank 所归于的模型并行进程组,包含了以上两组。
    • 针对咱们比方,便是完好模型被仿制了两份,其 GPU 节点详细是[0, 1, 4, 5, 8, 9, 12, 13],[2, 3, 6, 7, 10, 11, 14, 15]
  • _EMBEDDING_GROUP : 嵌入对应的进程组。
  • _DATA_PARALLEL_GROUP :当时 rank 所归于的Data parallel group。
    • 假设数据并行度数为2,则比方为[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。

详细如下:

# Intra-layer model parallel group that the current rank belongs to.
_TENSOR_MODEL_PARALLEL_GROUP = None
# Inter-layer model parallel group that the current rank belongs to.
_PIPELINE_MODEL_PARALLEL_GROUP = None
# Model parallel group (both intra- and pipeline) that the current rank belongs to.
_MODEL_PARALLEL_GROUP = None
# Embedding group.
_EMBEDDING_GROUP = None
# Data parallel group that the current rank belongs to.
_DATA_PARALLEL_GROUP = None
_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = None
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_PIPELINE_MODEL_PARALLEL_SPLIT_RANK = None
# These values enable us to change the mpu sizes on the fly.
_MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_TENSOR_MODEL_PARALLEL_RANK = None
_MPU_PIPELINE_MODEL_PARALLEL_RANK = None
# A list of ranks that have a copy of the embedding.
_EMBEDDING_GLOBAL_RANKS = None
# A list of global ranks for each pipeline group to ease calculation of the source
# rank when broadcasting from the first or last pipeline stage.
_PIPELINE_GLOBAL_RANKS = None

2.2 初始化代码

咱们首要把 initialize_model_parallel 代码摘抄出来。initialize_model_parallel 作用便是对模型进行分组,然后初始化进程组相关的各种全局变量。

def initialize_model_parallel(tensor_model_parallel_size_=1,
                              pipeline_model_parallel_size_=1,
                              virtual_pipeline_model_parallel_size_=None,
                              pipeline_model_parallel_split_rank_=None):
    """
    Initialize model data parallel groups.
    Arguments:
        tensor_model_parallel_size: number of GPUs used for tensor model parallelism.
        pipeline_model_parallel_size: number of GPUs used for pipeline model parallelism.
        virtual_pipeline_model_parallel_size: number of virtual stages (interleaved
                                              pipeline).
        pipeline_model_parallel_split_rank: for models with both encoder and decoder,
                                            rank in pipeline with split point.
    Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
    and 8 data-parallel groups as:
        8 data_parallel groups:
            [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
        8 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
        4 pipeline model-parallel groups:
            [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    """
    if torch.distributed.get_rank() == 0:
        print('> initializing tensor model parallel with size {}'.format(
            tensor_model_parallel_size_))
        print('> initializing pipeline model parallel with size {}'.format(
            pipeline_model_parallel_size_))
    # Get world size and rank. Ensure some consistencies.
    world_size = torch.distributed.get_world_size()
    tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
    pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
    ensure_divisibility(world_size,
                        tensor_model_parallel_size * pipeline_model_parallel_size)
    data_parallel_size = world_size // (tensor_model_parallel_size *
                                        pipeline_model_parallel_size)
    num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size
    num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
    num_data_parallel_groups = world_size // data_parallel_size
    if virtual_pipeline_model_parallel_size_ is not None:
        global _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK
        global _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
        _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0
        _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = virtual_pipeline_model_parallel_size_
    if pipeline_model_parallel_split_rank_ is not None:
        global _PIPELINE_MODEL_PARALLEL_SPLIT_RANK
        _PIPELINE_MODEL_PARALLEL_SPLIT_RANK = pipeline_model_parallel_split_rank_
    rank = torch.distributed.get_rank()
    # Build the data-parallel groups.
    global _DATA_PARALLEL_GROUP
    all_data_parallel_group_ranks = []
    for i in range(pipeline_model_parallel_size):
        start_rank = i * num_pipeline_model_parallel_groups
        end_rank = (i + 1) * num_pipeline_model_parallel_groups
        for j in range(tensor_model_parallel_size):
            ranks = range(start_rank + j, end_rank,
                          tensor_model_parallel_size)
            all_data_parallel_group_ranks.append(list(ranks))
            group = torch.distributed.new_group(ranks)
            if rank in ranks:
                _DATA_PARALLEL_GROUP = group
    # Build the model-parallel groups.
    global _MODEL_PARALLEL_GROUP
    for i in range(data_parallel_size):
        ranks = [data_parallel_group_ranks[i]
                 for data_parallel_group_ranks in all_data_parallel_group_ranks]
        group = torch.distributed.new_group(ranks)
        if rank in ranks:
            _MODEL_PARALLEL_GROUP = group
    # Build the tensor model-parallel groups.
    global _TENSOR_MODEL_PARALLEL_GROUP
    for i in range(num_tensor_model_parallel_groups):
        ranks = range(i * tensor_model_parallel_size,
                      (i + 1) * tensor_model_parallel_size)
        group = torch.distributed.new_group(ranks)
        if rank in ranks:
            _TENSOR_MODEL_PARALLEL_GROUP = group
    # Build the pipeline model-parallel groups and embedding groups
    # (first and last rank in each pipeline model-parallel group).
    global _PIPELINE_MODEL_PARALLEL_GROUP
    global _PIPELINE_GLOBAL_RANKS
    global _EMBEDDING_GROUP
    global _EMBEDDING_GLOBAL_RANKS
    for i in range(num_pipeline_model_parallel_groups):
        ranks = range(i, world_size,
                      num_pipeline_model_parallel_groups)
        group = torch.distributed.new_group(ranks)
        if rank in ranks:
            _PIPELINE_MODEL_PARALLEL_GROUP = group
            _PIPELINE_GLOBAL_RANKS = ranks
        # Setup embedding group (to exchange gradients between
        # first and last stages).
        if len(ranks) > 1:
            embedding_ranks = [ranks[0], ranks[-1]]
            if pipeline_model_parallel_split_rank_ is not None and \
                    pipeline_model_parallel_split_rank_ not in embedding_ranks:
                embedding_ranks = [ranks[0],
                                   ranks[pipeline_model_parallel_split_rank_],
                                   ranks[-1]]
        else:
            embedding_ranks = ranks
        group = torch.distributed.new_group(embedding_ranks)
        if rank in embedding_ranks:
            _EMBEDDING_GROUP = group
        if rank in ranks:
            _EMBEDDING_GLOBAL_RANKS = embedding_ranks

0x03 切分样例

咱们运用注释内容来进行学习怎样切分模型,怎样把多种并行模式组合在一同。

3.1 注释

initialize_model_parallel 的注释值得咱们深化学习,详细如下:

Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:
    8 data_parallel groups:
        [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
    8 tensor model-parallel groups:
        [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
    4 pipeline model-parallel groups:
        [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.

从注释能够知道如下信息:

  • 假定现在有16个GPU,归于两个node,rank 0 ~7 归于第一个节点,rank 8 ~ 15 归于第二个节点。

  • create 8 tensor model-parallel groups, 4 pipeline model-parallel groups,这说明将一个完好模型切分如下:

    • 沿着行横向切了一刀:tensor_model_parallel_size = 16 / 8 = 2,便是2个 GPUs 来进行模型张量并行。
    • 沿着列纵向切了三刀:pipeline_model_parallel_size = 16 /4 = 4,便是4个GPUs 进行流水线并行。
    • 因而,一个模型分为8块,每一块放在一个GPU之上,便是8个GPU。而经过如下核算能够知 16 GPUs / 8 GPUs = 2 models。即,16张卡能够放置两个完好模型。
  • 由于张量模型并行组巨细是2,即16个GPU被分红8组,则这8组内容是 [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。

  • 由于流水线并行组巨细是4,即16个GPU被分红4组,则这4组内容是[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。

  • 由于数据并行组巨细是2,16个GPU被分红8组,则这8组内容是[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。

  • 以上这些进程组都是经过 torch.distributed.new_group 来完成,这样组内进程之间就知道哪些进程是在同一个组内,是在一同练习的,也知道怎样通讯。

3.2 切分状况

模型原始图如下

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

模型切分之后如下,总共被分红8块。其间,第一层被切分为 A,B,所以 A,B 之间便是 Tensor Model parallel。后边 C,D 之间也是 Tensor Model parallel,把两层都做了切分,顺次类推。

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

咱们的方针便是用代码来看看怎样生成注释里边的各种模型组

3.3 切分战略

咱们接下来看看详细切分的战略,也便是GPU分配战略。切分需求归纳考虑多种状况,首要看看模型并行的通讯状况。

  • 张量并行:通讯发生在每层的前向传达和后向传达过程之中,通讯类型是all-reduce,不但单次通讯数据量大,并且通讯频频。
  • 流水线并行:通讯在流水线阶段相邻的切分点之上,通讯类型是P2P通讯,单词通讯数据量较少可是比较频频,并且由于流水线的特色,会发生GPU闲暇时刻,这儿称为流水线气泡(Bubble)。

咱们接下来看看各种并行机制的比照。

  • Tensor versus Pipeline Parallelism. 张量模型的并行性在节点内是最好的,由于它会减少通讯量。另一方面,流水线模型并行运用更便宜的点对点通讯,能够跨节点履行,而不会约束整个核算。然而,流水线并行性会在流水线气泡中花费很多时刻,因而,应约束流水线级的总数,以便流水线中的microbatches数量是流水线深度的合理倍数。当张量并行巨细等于单个节点中的GPU数量时会到达峰值功能。
  • Pipeline versus Data Parallelism. 关于每个batch size,吞吐量跟着流水线并行规划的添加而下降。流水线模型并行应该首要用于支持不适合单个 worker 的大型模型练习。而数据并行应该用于扩大练习规划。
  • Tensor versus Data Parallelism. 接下来看看数据和张量模型的并行性对功能的影响。在较大的批处理量和微批处理量为1的状况下,数据并行通讯并不频频;张量模型并行需求对批处理中的每个微批进行all-to-all通讯。这种all-to-all的通讯主导了端到端的练习时刻,特别是当通讯需求在多GPU节点进步行时。此外,跟着张量模型并行规划的添加,咱们在每个GPU上履行较小的矩阵乘法(由于会把模型张量进行切分),这下降了每个GPU的运用率。

终究看看结论

  • Tensor模型并行被用于intra-node transformer 层,由于张量并行核算密布且是消耗很多带宽,这样会在HGX based系统上高效运转。
  • Pipeline 模型并行首要被用于inter-node transformer 层,由于Pipeline 并行的通讯带宽占用少,其能够有效运用集群中多网卡规划。
  • 数据并行则在前两者根底之进步行加持,使得练习能够扩展到更大规划和更快的速度。咱们应该注意到,尽管数据并行能够带来高效的扩展,但咱们不能单独运用数据并行来处理练习超大模型,由于 a)内存容量缺乏,b)数据并行的扩展约束。

3.4 试验

咱们接下来做一个试验看看。

import torch
world_size = 16
tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensor
pipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipeline
data_parallel_size = world_size // (tensor_model_parallel_size *
                                    pipeline_model_parallel_size) # 2
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4
num_data_parallel_groups = world_size // data_parallel_size # 8
# Build the data-parallel groups.
print("------ Build the data-parallel groups -----")
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size):
    start_rank = i * num_pipeline_model_parallel_groups
    end_rank = (i + 1) * num_pipeline_model_parallel_groups
    for j in range(tensor_model_parallel_size):
        ranks = range(start_rank + j, end_rank,
                      tensor_model_parallel_size)
        all_data_parallel_group_ranks.append(list(ranks))
print(all_data_parallel_group_ranks)
# Build the model-parallel groups.
print("------ Build the model-parallel groups -----")
for i in range(data_parallel_size):
    ranks = [data_parallel_group_ranks[i]
             for data_parallel_group_ranks in all_data_parallel_group_ranks]
    print(list(ranks))
# Build the tensor model-parallel groups.
print("------ Build the tensor model-parallel groups -----")
for i in range(num_tensor_model_parallel_groups):
    ranks = range(i * tensor_model_parallel_size,
                  (i + 1) * tensor_model_parallel_size)
    print(list(ranks))
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
print("------ Build the pipeline model-parallel groups -----")
for i in range(num_pipeline_model_parallel_groups):
    ranks = range(i, world_size,
                  num_pipeline_model_parallel_groups)
    print(list(ranks))

输出如下。需求注意,这儿都是 GPU 的序列号,[0,2] 便是 [g0, g2]:

------ Build the data-parallel groups -----
[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]
------ Build the model-parallel groups -----
[0, 1, 4, 5, 8, 9, 12, 13]
[2, 3, 6, 7, 10, 11, 14, 15]
------ Build the tensor model-parallel groups -----
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
------ Build the pipeline model-parallel groups -----
[0, 4, 8, 12]
[1, 5, 9, 13]
[2, 6, 10, 14]
[3, 7, 11, 15]
咱们比照一下注释,发现代码打印结果能够和注释对应上:
    Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
    and 8 data-parallel groups as:
        8 data_parallel groups:
            [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
        8 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
        4 pipeline model-parallel groups:
            [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]

咱们接下来会进行详细剖析。

0x04 开始状况

4.1 GPU 状况

从注释中能够看到:

Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box.

意思便是:调用者需求保证相邻的rank在同一个节点上,咱们比方有两个Node,其间第一个Node拥有 GPU 0 ~ 7,便是 rank 0 ~ 7,第二个Node是 GPU 8~15,便是 rank 8 ~ 15。

详细如下,这儿每行4个GPU,是由于 4 GPUs to parallelize the model pipeline,所以流水线每个stage是4个GPU。

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

4.2 符号说明

下面是论文之中说到的一些符号,这儿有必要再取出来温习一下:

  • (, , ): Parallelization dimensions.

  • for the pipeline-modelparallel size,

  • for the tensor-model-parallel size, and for the data-parallel size.

  • : Number of GPUs. We require = .

4.3 初始分组

依据注释,咱们得出现在分组状况和一些全局信息。

  • 总共16个GPU,所以 world_size 为 16。便是 Notation 之中的 n。
  • 运用两个GPU进行 model tensor 并行,所以 tensor_model_parallel_size = 2。便是 Notation 之中的 t。
  • 运用四个GPU进行模型流水线并行,所以 pipeline_model_parallel_size = 4。便是 Notation 之中的 p。其实,便是流水线深度为 4,即,4 个 GPU 是串行的。
  • 依据上面定义,d = n / ( t * p) = 2,便是 data_parallel_size = 2。由于 t * p 便是一个模型所需求的 GPU,d = (总 GPU / 一个模型需求的 GPU),结果是这些GPU能够练习 d 个模型,便是能够用 d 个 mini-batches 进行这个 d个模型一同练习,所以数据并行度为 d。

接下来结合代码看看需求分红多少个process groups,他们在代码之中的变量是什么。

  • num_tensor_model_parallel_groups 便是从 tensor model 并行视点看,分红8 个进程roup。
  • num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size 便是从 model 并行视点看,分红 4 个 进程group。
  • num_data_parallel_groups = world_size // data_parallel_size 便是从data 并行视点看,分红8 个 进程group。便是会有 8 个 DDP,每个 DDP 包含 2 个 rank。
  • 还有一个 _MODEL_PARALLEL_GROUP,

详细如下:

world_size = 16
tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensor
pipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipeline
data_parallel_size = world_size // (tensor_model_parallel_size *
                                    pipeline_model_parallel_size) # 2
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4
num_data_parallel_groups = world_size // data_parallel_size # 8

0x05 Tensor model-parallel

本节咱们剖析的是,怎样将 Node 上的 GPU 分给 tensor model 并行组。

5.1 分组

关于注释比方,16 / 2 = 8,分红 8 个进程组,每个组 两个 rank。这些分组别离是:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15],咱们得到了如下信息:

  • [g0, g1] 便是某一层分切为2半,别离被 g0, g1 来履行,[g2, g3] 表示另一层被分为两层,别离被 g2,g3 来履行。

  • 咱们能够看到,每一个 tensor-model-parallel group的 rank一定是相邻的,比方 [g0, g1], [g2, g3]。

  • 注意,0 ~ 7 不代表是同一个模型。0 ~ 7 是同一个 Node 上的 GPU,这点容易被混淆。

咱们再看看代码:

    # Build the tensor model-parallel groups.
    global _TENSOR_MODEL_PARALLEL_GROUP
    for i in range(num_tensor_model_parallel_groups): # 8
        ranks = range(i * tensor_model_parallel_size,
                      (i + 1) * tensor_model_parallel_size)
        group = torch.distributed.new_group(ranks) # 就有生成 8 组
        if rank in ranks: 
            # 假设本rank在某一list之中,即1 在 [0,1] 之中,则本 rank 就归于 new_group([0,1])
            _TENSOR_MODEL_PARALLEL_GROUP = group 

咱们试验之中在这儿得到:

------ Build the tensor model-parallel groups -----
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]

对应咱们图上如下,每个 tensor model group 用一个虚线小矩形框标示,总共8个:

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

_TENSOR_MODEL_PARALLEL_GROUP = group 就记录了本rank的进程组信息,比方 rank 2,它的 _TENSOR_MODEL_PARALLEL_GROUP 内容便是:group([g2, g3])。

5.2 运用

咱们接下来看看怎样运用。

get_tensor_model_parallel_group 返回了自己 rank 对应的 tensor model group。

def get_tensor_model_parallel_group():
    """Get the tensor model parallel group the caller rank belongs to."""
    return _TENSOR_MODEL_PARALLEL_GROUP

在 megatron/mpu/mappings.py 之中有对 tensor model group 的运用:

def _reduce(input_):
    """All-reduce the input tensor across model parallel group."""
    # Bypass the function if we are using only 1 GPU.
    if get_tensor_model_parallel_world_size()==1:
        return input_
    # All-reduce.
    torch.distributed.all_reduce(input_, group=get_tensor_model_parallel_group())
    return input_

便是当流水线反向传达时分,运用 _TENSOR_MODEL_PARALLEL_GROUP 进行在组内进行集合通讯。

0x06 Pipe-parallel

本节咱们剖析的是,怎样将 Node 上的 GPU 分给 pipeline model 并行组。

6.1 分组

从注释能够看到,流水线分组便是把这个16个GPU 分红 4 组,每组 4 个 GPU,得到 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15],咱们得到了如下信息:

  • 每组的四个GPU进行模型流水线并行,所以 pipeline_model_parallel_size = 4。便是 Notation 之中的 p。其实,便是流水线深度为 4, 每组内 4 个 GPU 是串行的。即, [g0, g4, g8, g12] 这4个 GPU是串行的。

  • 再看看流水线的每一层,含有 16 / 4 = 4 个 GPU,能看到第一层是 0 ~ 4,第二层是 5 ~ 8,……。

  • 能够看到,流水线的 group是隔 n // p个取一个,比方[0, 4, 8, 12]。

  • 关于流水线每个stage,则是stage i 的 rank 范围是:[(i-1) * n//p, (i) * n//p],即 rank 2 地点的stage 的rank是 [0,1,2,3]。

  • _PIPELINE_MODEL_PARALLEL_GROUP 得到了本rank对应的流水线进程组。

  • _PIPELINE_GLOBAL_RANKS 得到了进程组的ranks。

  • 假设本进程是 rank 2,则流水线进程组 ranks 是 [g2, g6, g10, g14]。

详细代码如下:

    # Build the pipeline model-parallel groups and embedding groups
    # (first and last rank in each pipeline model-parallel group).
    global _PIPELINE_MODEL_PARALLEL_GROUP
    global _PIPELINE_GLOBAL_RANKS
    global _EMBEDDING_GROUP
    for i in range(num_pipeline_model_parallel_groups): # 4
        ranks = range(i, world_size, # 每隔 n // p个取一个
                      num_pipeline_model_parallel_groups)
        group = torch.distributed.new_group(ranks)
        if rank in ranks:
            _PIPELINE_MODEL_PARALLEL_GROUP = group
            _PIPELINE_GLOBAL_RANKS = ranks
        # Setup embedding group (to exchange gradients between
        # first and last stages).
        if len(ranks) > 1:
            embedding_ranks = [ranks[0], ranks[-1]]
        else:
            embedding_ranks = ranks
        group = torch.distributed.new_group(embedding_ranks)
        if rank in embedding_ranks:
            _EMBEDDING_GROUP = group

咱们拓宽之前图如下,现在看到添加了 4 条从上到下的虚线箭头,别离对应了 4 组流水线串行。横向层是从 Stage 0 ~ Stage 3。

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

6.2 运用

接下来看看怎样运用。

get_pipeline_model_parallel_group 返回了自己 rank 对应的 pipeline model group。

def get_pipeline_model_parallel_group():
    """Get the pipeline model parallel group the caller rank belongs to."""
    return _PIPELINE_MODEL_PARALLEL_GROUP

详细运用是在 megatron/p2p_communication.py,_communicate 之中会用流水线组信息来进行通讯。这儿省略了大部分代码。


def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
                 use_ring_exchange=False, tensor_shape=None,
                 override_scatter_gather_tensors_in_pipeline=False,
                 dtype_=None):
    """Communicate tensors between stages. Used as helper method in other
    communication methods that are used in megatron/schedules.py.
    """
    # Send tensors in both the forward and backward directions as appropriate.
    if use_ring_exchange: # 这儿运用get_pipeline_model_parallel_group 进行通讯
        torch.distributed.ring_exchange(tensor_send_prev=tensor_send_prev,
                                        tensor_recv_prev=tensor_recv_prev,
                                        tensor_send_next=tensor_send_next,
                                        tensor_recv_next=tensor_recv_next,
                                        group=mpu.get_pipeline_model_parallel_group())
    else:
        ops = []
        if tensor_send_prev is not None:
            send_prev_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_prev,
                mpu.get_pipeline_model_parallel_prev_rank()) # 得到流水线前一个rank
            ops.append(send_prev_op)
        if tensor_recv_prev is not None:
            recv_prev_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_prev,
                mpu.get_pipeline_model_parallel_prev_rank())
            ops.append(recv_prev_op)
        if tensor_send_next is not None:
            send_next_op = torch.distributed.P2POp(
                torch.distributed.isend, tensor_send_next,
                mpu.get_pipeline_model_parallel_next_rank()) # 得到流水线下一个rank
            ops.append(send_next_op)
        if tensor_recv_next is not None:
            recv_next_op = torch.distributed.P2POp(
                torch.distributed.irecv, tensor_recv_next,
                mpu.get_pipeline_model_parallel_next_rank())
            ops.append(recv_next_op)

6.2.1 上下游rank

详细怎样得到流水线上下游的rank?是经过 get_pipeline_model_parallel_next_rank 和 get_pipeline_model_parallel_prev_rank 来完成。其间_PIPELINE_GLOBAL_RANKS 得到了进程组的ranks,假设本进程是 rank 2,则流水线进程组 ranks 是 [g2, g6, g10, g14]。

def get_pipeline_model_parallel_next_rank():
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]
def get_pipeline_model_parallel_prev_rank():
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
    return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]

6.2.2 world size

get_pipeline_model_parallel_world_size 得到了进程组的 world size。

def get_pipeline_model_parallel_world_size():
    """Return world size for the pipeline model parallel group."""
    global _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
    if _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE is not None:
        return _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
    return torch.distributed.get_world_size(group=get_pipeline_model_parallel_group())

0x07 Data-parallel

咱们接下来看看数据并行。

7.1 分组

关于注释比方,16 / 2 = 8,分红 8 个进程组,每个组 两个 rank。这些分组别离是:[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15],咱们得到了如下信息:

  • 依据上面剖析, t * p 便是一个模型所需求的 GPU,因而,d = (总 GPU 数目 / 一个模型需求的 GPU 数目) = n / ( t * p),便是说,现在供给的这 n 个GPU能够同时练习 d 个模型,便是能够用 d 个 mini-batches 输入到这 d 个模型一同练习,所以数据并行度为 d。
  • 对应注释比方,便是data_parallel_size = 16 / (2 * 4) = 2。
  • rank 2 对应的数据并行进程组是[g0, g2]。

咱们再看看用代码怎样确定有哪些group,每个group里边包含什么。

  • 首要,流水线被分红了 p 个 stage,关于流水线每个stage,其有 n // p 个GPU,stage i 的 rank 范围是:[i * n//p, (i+1) * n//p],即 rank 2地点的stage 的rank是 [0,1,2,3]。
  • 其次,在每一个stage之中,ranks = range(start_rank + j, end_rank, tensor_model_parallel_size) ,意思是这stage的n//p个GPUs中,每隔 t 个取一个作为数据并行 group 之中的一份子,因而每个data-parallel group巨细为 n // p // t = d。

详细代码如下:

    # Build the data-parallel groups.
    global _DATA_PARALLEL_GROUP
    assert _DATA_PARALLEL_GROUP is None, \
        'data parallel group is already initialized'
    all_data_parallel_group_ranks = []
    for i in range(pipeline_model_parallel_size): # 遍历流水线深度
        start_rank = i * num_pipeline_model_parallel_groups # 找到每个stage的开始rank
        end_rank = (i + 1) * num_pipeline_model_parallel_groups # 找到每个stage的停止rank
        for j in range(tensor_model_parallel_size): # 遍历tensor model分组size
            ranks = range(start_rank + j, end_rank, # 每隔 t 个取一个作为数据并行group中的一份子
                          tensor_model_parallel_size)
            all_data_parallel_group_ranks.append(list(ranks))
            group = torch.distributed.new_group(ranks)
            if rank in ranks:
                _DATA_PARALLEL_GROUP = group

打印输出如下,和注释一致。

------ Build the data-parallel groups -----
[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]

对应图片拓宽如下:其间,每个新增的双箭头对应一个DDP(两个rank),比方[2, 3]对应一个DDP。

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

7.2 怎样运用

咱们接下来看看怎样运用。

get_data_parallel_group 会得到本rank对应的 _DATA_PARALLEL_GROUP。

def get_data_parallel_group():
    """Get the data parallel group the caller rank belongs to."""
    return _DATA_PARALLEL_GROUP

在 allreduce_gradients之中,会对本数据并行组进行all-reduce。


    def allreduce_gradients(self):
        """Reduce gradients across data parallel ranks."""
        # If we have buffers, simply reduce the data in the buffer.
        if self._grad_buffers is not None:
            for _, buffer_ in self._grad_buffers.items():
                buffer_.data /= mpu.get_data_parallel_world_size() # 数据并行 world size
                torch.distributed.all_reduce(
                    buffer_.data, group=mpu.get_data_parallel_group()) # 数据并行组
        else:
            # Otherwise, bucketize and all-reduce
            buckets = {}
            # Pack the buckets.
            for param in self.module.parameters():
                if param.requires_grad and param.grad is not None:
                    tp = param.data.type()
                    if tp not in buckets:
                        buckets[tp] = []
                    buckets[tp].append(param)
                    param.main_grad = param.grad
            # For each bucket, all-reduce and copy all-reduced grads.
            for tp in buckets:
                bucket = buckets[tp]
                grads = [param.grad.data for param in bucket]
                coalesced = _flatten_dense_tensors(grads)
                coalesced /= mpu.get_data_parallel_world_size()
                torch.distributed.all_reduce(
                    coalesced, group=mpu.get_data_parallel_group())
                for buf, synced in zip(grads, _unflatten_dense_tensors(
                        coalesced, grads)):
                    buf.copy_(synced)

0x08 模型组

面试验中,咱们得到模型并行组如下:[0, 1, 4, 5, 8, 9, 12, 13] [2, 3, 6, 7, 10, 11, 14, 15]。生成代码如下:

    # Build the model-parallel groups.
    global _MODEL_PARALLEL_GROUP
    for i in range(data_parallel_size):
        ranks = [data_parallel_group_ranks[i]
                 for data_parallel_group_ranks in all_data_parallel_group_ranks]
        group = torch.distributed.new_group(ranks)
        if rank in ranks:
            _MODEL_PARALLEL_GROUP = group

_MODEL_PARALLEL_GROUP 会得到本rank对应的模型组。

def get_model_parallel_group():
    """Get the model parallel group the caller rank belongs to."""
    return _MODEL_PARALLEL_GROUP

这儿是裁剪梯度会用到,便是在本模型的悉数rank之中进行梯度裁剪相关操作。


def clip_grad_norm_fp32(parameters, max_norm, norm_type=2):
    """Clips gradient norm of an iterable of parameters whose gradients
       are in fp32.
    This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and
    added functionality to handle model parallel parameters. Note that
    the gradients are modified in place.
    Arguments:
        parameters (Iterable[Tensor] or Tensor): an iterable of Tensors or a
            single Tensor that will have gradients normalized
        max_norm (float or int): max norm of the gradients
        norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for
            infinity norm.
    Returns:
        Total norm of the parameters (viewed as a single vector).
    """
    if isinstance(parameters, torch.Tensor):
        parameters = [parameters]
    # Filter parameters based on:
    #   - grad should not be none
    #   - parameter should not be shared
    #   - should not be a replica due to tensor model parallelism
    grads = []
    grads_for_norm = []
    for param in parameters:
        grad_not_none = param.grad is not None
        is_not_shared = param_is_not_shared(param)
        is_not_tp_duplicate = param_is_not_tensor_parallel_duplicate(param)
        grad = param.grad.detach()
        if grad_not_none:
            # Make sure the grads are in fp32
            grads.append(grad)
        if grad_not_none and is_not_shared and is_not_tp_duplicate:
            grads_for_norm.append(grad)
    # Norm parameters.
    max_norm = float(max_norm)
    norm_type = float(norm_type)
    total_norm = 0.0
    # Calculate norm.
    if norm_type == inf:
        total_norm = max(grad.abs().max() for grad in grads_for_norm)
        total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
        # Take max across all model-parallel GPUs.
        torch.distributed.all_reduce(total_norm_cuda,
                                     op=torch.distributed.ReduceOp.MAX,
                                     group=mpu.get_model_parallel_group()) # 模型组信息
        total_norm = total_norm_cuda[0].item()
    else:
        if norm_type == 2.0:
            dummy_overflow_buf = torch.cuda.IntTensor([0])
            # Use apex's multi-tensor applier for efficiency reasons.
            # Multi-tensor applier takes a function and a list of list
            # and performs the operation on that list all in one kernel.
            grad_norm, _ = multi_tensor_applier(
                amp_C.multi_tensor_l2norm,
                dummy_overflow_buf,
                [grads_for_norm],
                False # no per-parameter norm
            )
            # Since we will be summing across data parallel groups,
            # we need the pow(norm-type).
            total_norm = grad_norm ** norm_type
        else:
            for grad in grads_for_norm:
                grad_norm = torch.norm(grad, norm_type)
                total_norm += grad_norm ** norm_type
        # Sum across all model-parallel GPUs.
        torch.distributed.all_reduce(total_norm,
                                     op=torch.distributed.ReduceOp.SUM,
                                     group=mpu.get_model_parallel_group()) # 模型组信息
        total_norm = total_norm.item() ** (1.0 / norm_type)
    # Scale.
    clip_coeff = max_norm / (total_norm + 1.0e-6)
    if clip_coeff < 1.0:
        dummy_overflow_buf = torch.cuda.IntTensor([0])
        multi_tensor_applier(amp_C.multi_tensor_scale,
                             dummy_overflow_buf,
                             [grads, grads],
                             clip_coeff)
    return total_norm

之前的图如下,运用看到分红两组,左边是Model 0 对应的悉数ranks,右面是model 1 的ranks。

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

0x09 怎样把模型分到GPU

咱们终究还有一个问题没有触及,便是怎样把模型分块放到对应的GPU之上。便是怎样与最初分红A,B,…, H 的那个图对应起来。其实,不是依据模型来把模型部分复制到对应的rank或许GPU,而是rank或许GPU主动过来复制自己对应的层。

  • 由于调用了 mpu.initialize_model_parallel 来设置模型并行,数据并行等各种进程组,所以每个 rank 对应的进程都有自己的全局变量,详细其实便是进程自动就被映射到GPU上了。比方 rank 2 对应的进程在启动之后才知道自己是 rank 2,然后从初始化的全局变量之中知道自己的 data_parallel group 是 [g0, g2],tensor model-parallel group 是[g2, g3],pipeline model-parallel group 是 [g2, g6, g10, g14]。
  • ParallelTransformer 的初始化之中,offset 便是依据 rank 知道自己应该生成模型的那些层,然后经过 self.layers = torch.nn.ModuleList([build_layer(i + 1 + offset) for i in range(self.num_layers)]) 来生成对应的层。
  • get_model 办法也会依据自己的 pipeline rank 和 is_pipeline_first_stage 来知道是不是第一层或许终究一层,然后做相应处理。
  • 终究把模型参数复制到了自己对应的 GPU 之上。

详细 ParallelTransformer 初始化代码如下:

class ParallelTransformer(MegatronModule):
    """Transformer class."""
    def __init__(self, init_method, output_layer_init_method,
                 layer_type=LayerType.encoder,
                 self_attn_mask_type=AttnMaskType.padding,
                 pre_process=True, post_process=True):
        super(ParallelTransformer, self).__init__()
        args = get_args()
        # 省略代码
        # Transformer layers.
        def build_layer(layer_number):
            return ParallelTransformerLayer(
                init_method,
                output_layer_init_method,
                layer_number,
                layer_type=layer_type,
                self_attn_mask_type=self_attn_mask_type)
        # 下面 offset 便是依据rank知道自己应该生成模型的那些层
        if args.virtual_pipeline_model_parallel_size is not None:
            # Number of layers in each model chunk is the number of layers in the stage,
            # divided by the number of model chunks in a stage.
            self.num_layers = self.num_layers // args.virtual_pipeline_model_parallel_size
            # With 8 layers, 2 stages, and 4 model chunks, we want an assignment of
            # layers to stages like (each list is a model chunk):
            # Stage 0: [0]  [2]  [4]  [6]
            # Stage 1: [1]  [3]  [5]  [7]
            # With 8 layers, 2 stages, and 2 virtual stages, we want an assignment of
            # layers to stages like (each list is a model chunk):
            # Stage 0: [0, 1]  [4, 5]
            # Stage 1: [2, 3]  [6, 7]
            offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
                args.num_layers // args.virtual_pipeline_model_parallel_size) + \
                (mpu.get_pipeline_model_parallel_rank() * self.num_layers)
        else:
            # Each stage gets a contiguous set of layers.
            offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
        self.layers = torch.nn.ModuleList(
            [build_layer(i + 1 + offset) for i in range(self.num_layers)])
        if self.post_process:
            # Final layer norm before output.
            self.final_layernorm = LayerNorm(
                args.hidden_size,
                eps=args.layernorm_epsilon)

所以,终究作用如下,其间同名子模块具有同样的参数,能够数据并行,即两个A能够数据并行。一列上的层之间能够流水线串行,比方 A–> C –> E –> G 便是串行,而一个横行4个是流水线的一个stage,其间从0开始,横向相邻两个GPU是 tensor model 并行。

[源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行

0xEE 个人信息

★★★★★★关于日子和技术的考虑★★★★★★

微信公众账号:罗西的考虑

0xFF 参考

[细读经典]Megatron论文和代码详细剖析(2)

[细读经典]Megatron论文和代码详细剖析(1)

Megatron-LM源码阅览(一)

Megatron-LM源码阅览(二)

megatron学习总结

GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism

怎样评价 NVIDIA 发布的 DGX-1?