0x00 摘要

NVIDIA Megatron 是一个根据 PyTorch 的分布式练习结构,用来练习超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得咱们深化剖析其背后机理。

本系列大概有6~7篇文章,通过论文和源码和咱们一同学习研究。本文将看看 Megatron 怎么处理模型并行。

本系列其他文章为:

[源码解析] 模型并行分布式练习Megatron (1) — 论文&根底

[源码解析] 模型并行分布式练习Megatron (2) — 全体架构

0x01 并行Transformer层

在论文篇之中,咱们了解到,由于模型越来越大,其尺寸远远超过了处理器的内存约束,因而产生了比方激活检查点(activation checkpointing)这样的内存管理技能。而模型并行则通过对模型进行各种分片来战胜单个处理器内存约束,这样模型权重和其关联的优化器状态就能够分散到多个设备之上。

ParallelTransformerLayer 便是对 Transformer 层的并行完结,所以咱们接着剖析。

1.1 初始化

ParallelTransformerLayer 初始化方法之中,树立了如下:

  • 生成一个LayerNorm处理输入数据。
  • 生成并行Attention。
  • 生成处理attention输出的LayerNorm。
  • 假如是decoder,则生成一个ParallelAttention。
  • 生成一个并行MLP。
class ParallelTransformerLayer(MegatronModule):
    """A single transformer layer.
    Transformer layer takes input with size [b, s, h] and returns an
    output of the same size.
    """
    def __init__(self, init_method, output_layer_init_method,
                 layer_number, layer_type=LayerType.encoder,
                 self_attn_mask_type=AttnMaskType.padding):
        args = get_args()
        super(ParallelTransformerLayer, self).__init__()
        self.layer_number = layer_number
        self.layer_type = layer_type
        self.apply_residual_connection_post_layernorm \
            = args.apply_residual_connection_post_layernorm
        self.bf16 = args.bf16
        self.fp32_residual_connection = args.fp32_residual_connection
        # Layernorm on the input data.
        self.input_layernorm = LayerNorm( # 生成一个LayerNorm处理输入数据
            args.hidden_size,
            eps=args.layernorm_epsilon,
            no_persist_layer_norm=args.no_persist_layer_norm)
        # Self attention.
        self.self_attention = ParallelAttention( # 生成并行Attention
            init_method,
            output_layer_init_method,
            layer_number,
            attention_type=AttnType.self_attn,
            attn_mask_type=self_attn_mask_type)
        self.hidden_dropout = args.hidden_dropout
        self.bias_dropout_fusion = args.bias_dropout_fusion
        # Layernorm on the attention output
        self.post_attention_layernorm = LayerNorm( # 生成处理attention输出的LayerNorm
            args.hidden_size,
            eps=args.layernorm_epsilon,
            no_persist_layer_norm=args.no_persist_layer_norm)
        if self.layer_type == LayerType.decoder: # 假如本层是decoder
            self.inter_attention = ParallelAttention( # 则生成一个ParallelAttention
                init_method,
                output_layer_init_method,
                layer_number,
                attention_type=AttnType.cross_attn)
            # Layernorm on the attention output.
            self.post_inter_attention_layernorm = LayerNorm(
                args.hidden_size,
                eps=args.layernorm_epsilon,
                no_persist_layer_norm=args.no_persist_layer_norm)
        # MLP
        self.mlp = ParallelMLP(init_method, # 生成一个并行MLP
                               output_layer_init_method)

对应便是:

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

1.2 前向传达

其前向传达方法如下,便是调用各种成员函数进行前向操作。

def forward(self, hidden_states, attention_mask,
            encoder_output=None, enc_dec_attn_mask=None,
            inference_params=None):
    # hidden_states: [b, s, h]
    # Layer norm at the beginning of the transformer layer.
    layernorm_output = self.input_layernorm(hidden_states) # 对输入进行处理
    # Self attention.
    attention_output, attention_bias = \ # attention操作
        self.self_attention(
            layernorm_output,
            attention_mask,
            inference_params=inference_params)
    # Residual connection. 残差衔接
    if self.apply_residual_connection_post_layernorm:
        residual = layernorm_output #norm之后成果作为X
    else:
        residual = hidden_states # 原始输入X
    # jit scripting for a nn.module (with dropout) is not
    # trigerring the fusion kernel. For now, we use two
    # different nn.functional routines to account for varying
    # dropout semantics during training and inference phases.
    if self.bias_dropout_fusion: # dropout操作
        if self.training:
            bias_dropout_add_func = bias_dropout_add_fused_train
        else:
            bias_dropout_add_func = bias_dropout_add_fused_inference
    else:
        bias_dropout_add_func = get_bias_dropout_add(self.training)
    # re-enable torch grad to enable fused optimization.
    with torch.enable_grad():
        layernorm_input = bias_dropout_add_func( # dropout操作
            attention_output,
            attention_bias.expand_as(residual),
            residual,
            self.hidden_dropout)
    # Layer norm post the self attention.
    layernorm_output = self.post_attention_layernorm(layernorm_input) # 处理attention输出
    if self.layer_type == LayerType.decoder:
        attention_output, attention_bias = \
            self.inter_attention(layernorm_output,
                                 enc_dec_attn_mask,
                                 encoder_output=encoder_output)
        # residual connection
        if self.apply_residual_connection_post_layernorm:
            residual = layernorm_output
        else:
            residual = layernorm_input
        # re-enable torch grad to enable fused optimization.
        with torch.enable_grad():
            layernorm_input = bias_dropout_add_func(
                attention_output,
                attention_bias.expand_as(residual),
                residual,
                self.hidden_dropout)
        # Layer norm post the decoder attention
        layernorm_output = self.post_inter_attention_layernorm(layernorm_input)
    # MLP.
    mlp_output, mlp_bias = self.mlp(layernorm_output) # MLP操作 
    # Second residual connection.
    if self.apply_residual_connection_post_layernorm: # 残差操作
        residual = layernorm_output
    else:
        residual = layernorm_input
    # re-enable torch grad to enable fused optimization.
    with torch.enable_grad():
        output = bias_dropout_add_func( # dropout操作
            mlp_output,
            mlp_bias.expand_as(residual),
            residual,
            self.hidden_dropout)
    return output

0x02 并行MLP

ParallelTransformerLayer 里面包含了 Attention 和 MLP,由于篇幅所限,咱们这儿主要对MLP进行剖析。关于 Attention 则简略研究一下其行切分机制,究竟咱们想了解的是怎么进行模型并行,而非深化理解Transformer。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

Megatron的并行MLP包含了两个线性层,第一个线性层完结了 hidden size 到 4 x hidden size 的转换,第二个线性层完结了 4 x hidden size 回到 hidden size。详细 MLP 的逻辑如下:

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

图:具有模型并行性的 MLP。f和g表示和通讯切块相关的操作,其是共轭的。f 的前向传达是一个identity运算符,而后向传达是一个all-reduce,g 的前向传达是 all-reduce,后向传达是一个identity运算符。这儿的 f 来自 ColumnParallelLinear,g 来自 RowParallelLinear。即,MLP 便是把 ColumnParallelLinear 和 RowParallelLinear 结合起来。

所以,这儿焦点问题便是:怎么把这两种线性层切开到不同的GPU卡之上?参见前文,这儿选用了第二种计划,

另一个选项是沿列拆分A,得到 A=[A1,A2]A=[A_1,A_2]。该分区答应GeLU非线性独立应用于每个分区GEMM的输出:

[Y1Y2]=[GeLU(XA1),GeLU(XA2)]\begin{bmatrix} Y_1& Y_2 \end{bmatrix}= \begin{bmatrix} GeLU(XA_1),GeLU(XA_2) \end{bmatrix}

这个方法更好,由于它删除了同步点,直接把两个 GeLU 的输出拼接在一同就行。因而,咱们以这种列并行方法划分第一个GEMM,并沿其行切割第二个GEMM,以便它直接获取GeLU层的输出,而不需求任何其他通讯(比方 all-reduce 就不需求了),如图所示。

咱们再深化剖析一下为何挑选这个计划。

依照常规逻辑,MLP 的前向传达应该分为两个阶段,别离对应了下面图之中的两行,

  • 第一行是把参数 A 依照列切分,然后把成果依照列拼接起来,得到的成果便是与不运用并行战略彻底等价的成果。
  • 第二行是把激活 Y 依照列切分,参数B依照行切分做并行,终究把输出做加法,得到 Z。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

可是每个split会导致两次额定的通讯(前向传达和后向传达各一次,下面只给出了前向传达)。由于关于第二行来说,其输入Y其实本质是 XA1,XA2并行的,所以为了下降通讯量,咱们能够把数据通讯延后或许爽性取消通讯,便是把第一行终究的 all_gather 和第二行最初的 split 省略掉,这其实便是数学上的传递性和结合律(部分和之和为大局和)。所以咱们就得到了论文之中的第二种计划。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

结合代码,便是:

  • ColumnParallelLinear 完结了 MLP 的前半部分或许考虑了这个线性层独立运用的状况。
  • RowParallelLinear 完结了 MLP 的后半部分或许考虑了这个线性层独立运用的状况。

2.1 命名标准

咱们首要看看命名标准,后文运用如下:

  • h: hidden size
  • n: number of attention heads
  • p: number of model parallel partitions
  • np: n/p
  • hp: h/p
  • hn: h/n
  • b: batch size
  • s: sequence length
  • l: number of layers
  • Transformer 的输入size是 [s, b, h],返回一个相同size的张量,咱们运用 hyperparameters 作为transformer 的超参数。

2.2 MLP 代码

2.2.1 初始化

megatron/model/transformer.py 之中有 ParallelMLP 界说如下:

  • 界说了一个 ColumnParallelLinear 用来进行第一个 H 到 4 H 的转换。
  • 然后是一个 gelu。
  • 接着是 RowParallelLinear 用来进行 4H 到 H 的转换回来。

dropout操作是在上面ParallelTransformerLayer的forward之中进行。

所以,MLP大致如图,这儿A,B是各自的权重矩阵

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

也便是对应论文之中这个图形。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

代码如下。

class ParallelMLP(MegatronModule):
    """MLP.
    MLP will take the input with h hidden state, project it to 4*h
    hidden dimension, perform nonlinear transformation, and project the
    state back into h hidden dimension.
    """
    def __init__(self, init_method, output_layer_init_method):
        super(ParallelMLP, self).__init__()
        args = get_args()
        # Project to 4h.
        self.dense_h_to_4h = mpu.ColumnParallelLinear( # 列切分
            args.hidden_size,
            args.ffn_hidden_size,
            gather_output=False, # 这儿是false,选用第二种计划
            init_method=init_method,
            skip_bias_add=True)
        self.bias_gelu_fusion = args.bias_gelu_fusion # gelu
        self.activation_func = F.gelu
        if args.openai_gelu:
            self.activation_func = openai_gelu
        elif args.onnx_safe:
            self.activation_func = erf_gelu
        # Project back to h.
        self.dense_4h_to_h = mpu.RowParallelLinear( # 行切分
            args.ffn_hidden_size,
            args.hidden_size,
            input_is_parallel=True,
            init_method=output_layer_init_method,
            skip_bias_add=True)

2.2.2 前向操作

这儿别离调用了 ColumnParallelLinear 完结了 H 到 4H 的转换,RowParallelLinear 完结了 4H 到 H 的转换。

def forward(self, hidden_states):
    # [s, b, 4hp]
    intermediate_parallel, bias_parallel = self.dense_h_to_4h(hidden_states) # 纵向切分
    if self.bias_gelu_fusion:
         intermediate_parallel = \
                 bias_gelu_impl(intermediate_parallel, bias_parallel)
    else:
        intermediate_parallel = \
            self.activation_func(intermediate_parallel + bias_parallel)
    # [s, b, h]
    output, output_bias = self.dense_4h_to_h(intermediate_parallel) # 横向切分
    return output, output_bias

咱们接下来别离介绍 ColumnParallelLinear 和 RowParallelLinear。ColumnParallelLinear 别离能够独立运用或许作为 ParallelMLP 的前半段,RowParallelLinear 也能够独立运用或许作为 ParallelMLP 的后半段。

0x03 ColumnParallelLinear

ColumnParallelLinear 便是按列进行切分,也便是纵刀流。留意,这儿说的是对权重进行列切分。便是:

Y=XA=X[A1,A2]=[XA1,XA2]Y = XA = X[A_1, A_2] = [XA_1, XA_2]

详细切分如下:

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

3.1 界说

由于 Python 语言特性,这儿有用的仅仅注释,从注释中能够看出来,关于 Y=XA+bY = XA + b,A 被以如下方法进行并行化:A=[A1,…,Ap]A = [A_1, …, A_p]

class ColumnParallelLinear(torch.nn.Module):
    """Linear layer with column parallelism.
    The linear layer is defined as Y = XA + b. A is parallelized along
    its second dimension as A = [A_1, ..., A_p].
    Arguments:
        input_size: first dimension of matrix A.
        output_size: second dimension of matrix A.
        bias: If true, add bias
        gather_output: If true, call all-gether on output and make Y avaiable
                       to all GPUs, otherwise, every GPU will have its output
                       which is Y_i = XA_i
        init_method: method to initialize weights. Note that bias is always set
                     to zero.
        stride: For the strided linear layers.
        keep_master_weight_for_test: This was added for testing and should be
                                     set to False. It returns the master weights
                                     used for initialization.
        skip_bias_add: This was added to enable performance optimations where bias
                       can be fused with other elementwise operations. we skip 
                       adding bias but instead return it.
    """

3.2 初始化

初始化代码之中主要是用切分的信息来初始化权重。

def __init__(self, input_size, output_size, bias=True, gather_output=True,
             init_method=init.xavier_normal_, stride=1,
             keep_master_weight_for_test=False,
             skip_bias_add=False):
    super(ColumnParallelLinear, self).__init__()
    # Keep input parameters
    self.input_size = input_size
    self.output_size = output_size
    self.gather_output = gather_output
    # Divide the weight matrix along the last dimension.
    world_size = get_tensor_model_parallel_world_size() # 取得本tensor并行组的world size
    self.output_size_per_partition = divide(output_size, world_size) # 取得簿本模型应输出size
    self.skip_bias_add = skip_bias_add
    # Parameters.
    # Note: torch.nn.functional.linear performs XA^T + b and as a result
    # we allocate the transpose.
    # Initialize weight.
    args = get_args()
    if args.use_cpu_initialization:
        # 用切分的size初始化权重
        self.weight = Parameter(torch.empty(self.output_size_per_partition,
                                            self.input_size,
                                            dtype=args.params_dtype))
        self.master_weight = _initialize_affine_weight_cpu( # 初始化权重
            self.weight, self.output_size, self.input_size,
            self.output_size_per_partition, 0, init_method,
            stride=stride, return_master_weight=keep_master_weight_for_test)
    else:
        # 用切分的size初始化权重
        self.weight = Parameter(torch.empty(
            self.output_size_per_partition, self.input_size,
            device=torch.cuda.current_device(), dtype=args.params_dtype))
        _initialize_affine_weight_gpu(self.weight, init_method, # 初始化权重
                                      partition_dim=0, stride=stride)
    if bias:
        if args.use_cpu_initialization:
            # 用切分的size初始化权重
            self.bias = Parameter(torch.empty(
                self.output_size_per_partition, dtype=args.params_dtype))
        else:
            # 用切分的size初始化权重
            self.bias = Parameter(torch.empty(
                self.output_size_per_partition,
                device=torch.cuda.current_device(),
                dtype=args.params_dtype))
        set_tensor_model_parallel_attributes(self.bias, True, 0, stride)
        # Always initialize bias to zero.
        with torch.no_grad():
            self.bias.zero_()
    else:
        self.register_parameter('bias', None)
    self.async_tensor_model_parallel_allreduce = (
            not args.no_async_tensor_model_parallel_allreduce and
            world_size > 1)

3.2.1 切分size

self.output_size_per_partition = divide(output_size, world_size) 这儿有一个切割size操作,得到每个子模型应该拥有的权重巨细。

def ensure_divisibility(numerator, denominator):
    """Ensure that numerator is divisible by the denominator."""
    assert numerator % denominator == 0, '{} is not divisible by {}'.format(
        numerator, denominator)
def divide(numerator, denominator):
    """Ensure that numerator is divisible by the denominator and return
    the division value."""
    ensure_divisibility(numerator, denominator)
    return numerator // denominator

3.2.2 初始化权重

以下代码完结了初始化权重。

def _initialize_affine_weight_gpu(weight, init_method,
                                  partition_dim, stride=1):
    """Initialize affine weight for model parallel on GPU."""
    set_tensor_model_parallel_attributes(tensor=weight,
                                         is_parallel=True,
                                         dim=partition_dim,
                                         stride=stride)
    with get_cuda_rng_tracker().fork():
        init_method(weight)
def _initialize_affine_weight_cpu(weight, output_size, input_size,
                                  per_partition_size, partition_dim,
                                  init_method, stride=1,
                                  return_master_weight=False):
    """Initialize affine weight for model parallel.
    Build the master weight on all processes and scatter
    the relevant chunk."""
    set_tensor_model_parallel_attributes(tensor=weight,
                                         is_parallel=True,
                                         dim=partition_dim,
                                         stride=stride)
    # Initialize master weight
    master_weight = torch.empty(output_size, input_size,
                                dtype=torch.float,
                                requires_grad=False)
    init_method(master_weight)
    args = get_args()
    master_weight = master_weight.to(dtype=args.params_dtype)
    # Split and copy
    per_partition_per_stride_size = divide(per_partition_size, stride)
    weight_list = torch.split(master_weight, per_partition_per_stride_size,
                              dim=partition_dim)
    rank = get_tensor_model_parallel_rank()
    world_size = get_tensor_model_parallel_world_size()
    my_weight_list = weight_list[rank::world_size]
    with torch.no_grad():
        torch.cat(my_weight_list, dim=partition_dim, out=weight)
    if return_master_weight:
        return master_weight
    return None

3.3 逻辑整理

为了更好的剖析,咱们引进下图(来自参阅1),这个图对应了 ColumnParallelLinear 类的前向传达和后向传达进程。这儿的 f 和 g 操作其实是从代码之中抽象出来的,能够理解为 f 是对输入的处理,g 则是处理之后得到终究输出。此处对应了论文中描述的粗体字:

Figure 3. Blocks of Transformer with Model Parallelism. f and g are conjugate. f is an identity operator in the forward pass and all reduce in the backward pass while g is an all reduce in the forward pass and identity in the backward pass.

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

图片来自 GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism。

咱们针对上图,整理一下逻辑。

3.3.1 前向传达

咱们一步一步细化。

首要,全体语义为:Y = XA + b。

其次,前向传达时分的逻辑如下:

  • 输入:这儿 A 沿着列做切分,X 是全部的输入(每个GPU都拥有相同的X)。
  • 核算:通过核算之后,输出的 Y1,Y2Y_1, Y_2 也是依照列被切分过的。每个GPU只要自己对应的分区。
  • 输出:Y1,Y2Y_1, Y_2 只要兼并在一同,才干得到终究输出的 Y。

再次,咱们运用operator来细化一下:

  • 输入:由于每个GPU需求拿到一个完好的输入 X,所以前向操作之中需求把X分发到每个GPU,这样就运用了 Identity 操作。
  • 核算:通过核算之后,输出的 Y1,Y2Y_1, Y_2 也是依照列被切分过的。每个GPU只要自己对应的分区。
  • 输出:由于Y1,Y2Y_1, Y_2 需求兼并在一同,才干得到终究输出的 Y。所以需求有一个 all-gather 操作来进行聚合,即得到 Y=[Y1,Y2] Y = [Y_1, Y_2]

咱们把这些逻辑点在上图上用红色方框标明,输入 X 先通过 f 来处理,输出 Y 是 g 整合之后的成果。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

3.3.2 后向传达

咱们接下来看看后向传达,关于上图来说,后向传达是从上至下,梯度先通过 g,终究被 f 处理。

反向传达的逻辑如下:

  • 现在得到了反向传达上游传过来的梯度 ∂L∂Y\frac{\partial L}{\partial Y},现在需求对其进行切分,保证每个GPU之上都有一份梯度 ∂L∂Yi\frac{\partial L}{\partial Y_i}。操作是∂L∂Yi(split)\frac{\partial L}{\partial Y_i}(split)
  • 每个GPU之上会进行关于X的梯度核算,所以每个GPU都有一份对X的梯度(可是其内容不一样)。
  • 终究需求把各个 GPU 之上关于X的梯度进行相加,得到完好梯度,这就需求一个 all-reduce 操作。即 ∂L∂X=∂L∂X∣1+∂L∂X∣2\frac{\partial L}{\partial X} = \frac{\partial L}{\partial X} |_1 + \frac{\partial L}{\partial X} |_2

所以咱们在图上用蓝色圆角矩形标明出来后向传达对应的算子。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

3.4 代码完结

咱们接下来结合代码来剖析。

3.3.1 ColumnParallelLinear

ColumnParallelLinear 的 forward 代码之中,主要是施行了 f 和 g 的forward操作,一起把 f 和 g 的backward 操作树立起来,详细如下:

  • 假如装备了异步操作,则运用 ColumnParallelLinearWithAsyncAllreduce 完结 f 运算符的功能,这一个函数包括了identity 操作,矩阵乘法,树立后向传达操作。
  • 假如是同步操作,则:
    • 运用 copy_to_tensor_model_parallel_region 完结前向传达 identity 操作,树立反向传达all-reduce,便是图中f的backward。identity 操作 便是把输入 X 完好的复制到多个GPU之上,相似 X 通过 f 的前向操作,变成了 [X, X, …, X]。
    • 运用 linear 对 [X, X, …, X] 和 权重 A 完结矩阵乘法操作。
  • 假如gather_output为True,则在前向传达时分把 YiY_i 做all-gather,由于反向传达时需求把完好梯度scatter到对应GPU之上,所以要树立关于的split操作。MLP完结之中,此处设置为 False,这样每个GPU输出的是自己partition 的 4h/p,直接传送给下一个线性层。
def forward(self, input_):
    # 假如挑选疏忽bias,就会设置为None,后续就不用处理了
    bias = self.bias if not self.skip_bias_add else None
    # 下面主要是图中的 f 操作
    if self.async_tensor_model_parallel_allreduce:
        # 树立反向传达时分的异步all-reduce
        input_shape = input_.shape
        input_ = input_.view(input_shape[0] * input_shape[1],input_shape[2])
        # Maxtrix multiply with asynchronouse all-reduce execution
        output_parallel = ColumnParallelLinearWithAsyncAllreduce.apply(
                input_, self.weight, bias)
        output_parallel = output_parallel.view(
                input_shape[0], input_shape[1], output_parallel.shape[1])
    else:
        # Set up backprop all-reduce.、
        # 树立反向传达all-reduce,便是图中f的backward
        input_parallel = copy_to_tensor_model_parallel_region(input_) 
        # Matrix multiply.
        output_parallel = F.linear(input_parallel, self.weight, bias) # 矩阵乘法操作
    # 下面便是图中的 g 操作    
    if self.gather_output: # 是否需求聚合操作
        # All-gather across the partitions.
        # 聚合输出,便是图中g的forward
        output = gather_from_tensor_model_parallel_region(output_parallel) #
    else:
        output = output_parallel
    output_bias = self.bias if self.skip_bias_add else None # 假如不疏忽bias,还得传出去
    return output, output_bias

3.3.2 f 操作

F 操作是对输入进行开始处理,详细是:

  • 前向传达时分直接复制。
  • 后向传达做all-reduce。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

3.3.2.1 同步操作

这儿咱们主要剖析 copy_to_tensor_model_parallel_region,其做了前向copy操作,一起构建了后向 all-reduce。

def copy_to_tensor_model_parallel_region(input_):
    return _CopyToModelParallelRegion.apply(input_)

咱们还是需求看看 _CopyToModelParallelRegion。能够看到,其 forward 便是简略的把输入转移到输出,便是对应了前向仿制identity。

class _CopyToModelParallelRegion(torch.autograd.Function):
    """Pass the input to the model parallel region."""
    @staticmethod
    def symbolic(graph, input_):
        return input_
    @staticmethod
    def forward(ctx, input_):
        return input_ # 简略的把输入转移到输出,便是对应了前向仿制identity
    @staticmethod
    def backward(ctx, grad_output):
        return _reduce(grad_output) # 反向传达时分,输入是多个GPU上的梯度全体,通过all-reduce兼并

对应的后向传达就运用了All-reduce,反向传达时分,输入是多个GPU上的梯度全体,通过all-reduce兼并。

def _reduce(input_):
    """All-reduce the input tensor across model parallel group."""
    # Bypass the function if we are using only 1 GPU.
    if get_tensor_model_parallel_world_size()==1:
        return input_
    # All-reduce.
    torch.distributed.all_reduce(input_, group=get_tensor_model_parallel_group())
    return input_
3.3.2.2 异步 All-Reduce

ColumnParallelLinearWithAsyncAllreduce 这儿把同步之中的乘法操作也放置进来。

class ColumnParallelLinearWithAsyncAllreduce(torch.autograd.Function):
    """
    Column-parallel linear layer execution with asynchronous all-reduce
    execution in backprop.
    """
    @staticmethod
    def forward(ctx, input, weight, bias):
        ctx.save_for_backward(input, weight)
        ctx.use_bias = bias is not None
        output = torch.matmul(input, weight.t()) # 同步时分的乘法也在这儿了
        if bias is not None:
            output = output + bias
        return output
    @staticmethod
    def backward(ctx, grad_output):
        input, weight = ctx.saved_tensors
        use_bias = ctx.use_bias
        grad_input = grad_output.matmul(weight)
        # Asyncronous all-reduce
        handle = torch.distributed.all_reduce( # 反向传达操作
                grad_input, group=get_tensor_model_parallel_group(), async_op=True)
        # Delay the start of weight gradient computation shortly (3us) to have
        # all-reduce scheduled first and have GPU resources allocated
        _ = torch.empty(1, device=grad_output.device) + 1
        grad_weight = grad_output.t().matmul(input)
        grad_bias = grad_output.sum(dim=0) if use_bias else None
        handle.wait()
        return grad_input, grad_weight, grad_bias

3.3.3 g 操作

以下对应了图之中的 g 操作。G操作是终究生成输出Y,逻辑是:

  • 前向传达时分做 all-gather;
  • 后向传达需求履行 split,把梯度scatter到不同GPU之上。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

def gather_from_tensor_model_parallel_region(input_):
    return _GatherFromModelParallelRegion.apply(input_)

详细代码如下:

class _GatherFromModelParallelRegion(torch.autograd.Function):
    """Gather the input from model parallel region and concatinate."""
    @staticmethod
    def symbolic(graph, input_):
        return _gather(input_)
    @staticmethod
    def forward(ctx, input_):
        return _gather(input_)
    @staticmethod
    def backward(ctx, grad_output):
        return _split(grad_output)

3.3.4 根底函数

咱们接下来看看上面用到的一些根底函数。

3.3.4.1 gather

_gather 是沿着终究一个维度进行拼接。

def _gather(input_):
    """Gather tensors and concatinate along the last dimension."""
    world_size = get_tensor_model_parallel_world_size()
    # Bypass the function if we are using only 1 GPU.
    if world_size==1:
        return input_
    # Size and dimension.
    last_dim = input_.dim() - 1
    rank = get_tensor_model_parallel_rank() # 取得本worker在tensor并行之中的rank
    tensor_list = [torch.empty_like(input_) for _ in range(world_size)]
    tensor_list[rank] = input_
    # 在本 tensor 进程组之间进行 all-gather操作
    torch.distributed.all_gather(tensor_list, input_, group=get_tensor_model_parallel_group())
    # Note: torch.cat already creates a contiguous tensor.
    output = torch.cat(tensor_list, dim=last_dim).contiguous()
    return output
3.3.4.2 split

_split 完结了张量切分操作。

def split_tensor_along_last_dim(tensor, num_partitions,
                                contiguous_split_chunks=False):
    """Split a tensor along its last dimension.
    Arguments:
        tensor: input tensor.
        num_partitions: number of partitions to split the tensor
        contiguous_split_chunks: If True, make each chunk contiguous
                                 in memory.
    """
    # Get the size and dimension.
    last_dim = tensor.dim() - 1
    last_dim_size = divide(tensor.size()[last_dim], num_partitions) # 得到每个切分的size
    # Split.
    tensor_list = torch.split(tensor, last_dim_size, dim=last_dim) # 对张量进行切分
    # Note: torch.split does not create contiguous tensors by default.
    if contiguous_split_chunks:
        return tuple(chunk.contiguous() for chunk in tensor_list)
    return tensor_list
def _split(input_):
    """Split the tensor along its last dimension and keep the
    corresponding slice."""
    world_size = get_tensor_model_parallel_world_size() # 获取本tensor进程组的world size
    # Bypass the function if we are using only 1 GPU.
    if world_size==1:
        return input_
    # Split along last dimension.
    input_list = split_tensor_along_last_dim(input_, world_size)
    # Note: torch.split does not create contiguous tensors by default.
    rank = get_tensor_model_parallel_rank() # 获取自己的rank
    output = input_list[rank].contiguous() # 获取切分后,自己对应的rank
    return output

其间,get_tensor_model_parallel_rank 作用是获取本进程在tensor并行组的rank。

def get_tensor_model_parallel_rank():
    """Return my rank for the tensor model parallel group."""
    global _MPU_TENSOR_MODEL_PARALLEL_RANK
    if _MPU_TENSOR_MODEL_PARALLEL_RANK is not None:
        return _MPU_TENSOR_MODEL_PARALLEL_RANK
    return torch.distributed.get_rank(group=get_tensor_model_parallel_group())

0x04 RowParallelLinear

RowParallelLinear 这儿是依照行进行切分,便是横刀流,留意这儿是对权重A施行行切分。比方公式为 Y = XA,X是输入,A是权重,Y是输出,行切分便是针对A的第一个维度进行切分,这儿 X1X_1 终究一个维度等于 A1A_1 第一个维度。

XA=[X1,X2][A1A2]=X1A1+X2A2=Y1+Y2=YXA = \begin{bmatrix}X_1,X_2\end{bmatrix} \begin{bmatrix}A_1 \\ A_2\end{bmatrix} = X_1 A_1 + X_2 A_2 = Y_1 + Y_2 = Y

详细如下:

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

4.1 界说

界说之中只要注释有用,能够看出来怎么切分。

class RowParallelLinear(torch.nn.Module):
    """Linear layer with row parallelism.
    The linear layer is defined as Y = XA + b. A is parallelized along
    its first dimension and X along its second dimension as:
               -   -
              | A_1 |
              | .   |
          A = | .   |        X = [X_1, ..., X_p]
              | .   |
              | A_p |
               -   -
    Arguments:
        input_size: first dimension of matrix A.
        output_size: second dimension of matrix A.
        bias: If true, add bias. Note that bias is not parallelized.
        input_is_parallel: If true, we assume that the input is already
                           split across the GPUs and we do not split
                           again.
        init_method: method to initialize weights. Note that bias is always set
                     to zero.
        stride: For the strided linear layers.
        keep_master_weight_for_test: This was added for testing and should be
                                     set to False. It returns the master weights
                                     used for initialization.
        skip_bias_add: This was added to enable performance optimization where bias
                       can be fused with other elementwise operations. We skip
                       adding bias but instead return it.
    """

4.2 初始化

和列切分相似,初始化之中主要是获取每个权重分区的巨细,然后据此切分权重。

def __init__(self, input_size, output_size, bias=True,
             input_is_parallel=False,
             init_method=init.xavier_normal_, stride=1,
             keep_master_weight_for_test=False,
             skip_bias_add=False):
    super(RowParallelLinear, self).__init__()
    # Keep input parameters
    self.input_size = input_size
    self.output_size = output_size
    self.input_is_parallel = input_is_parallel
    # Divide the weight matrix along the last dimension.
    world_size = get_tensor_model_parallel_world_size()
    self.input_size_per_partition = divide(input_size, world_size) # 获取每个权重分区的巨细
    self.skip_bias_add = skip_bias_add
    # Parameters.
    # Note: torch.nn.functional.linear performs XA^T + b and as a result
    # we allocate the transpose.
    # Initialize weight.
    args = get_args()
    if args.use_cpu_initialization:
        self.weight = Parameter(torch.empty(self.output_size,
                                            self.input_size_per_partition,
                                            dtype=args.params_dtype))
        # 切分权重
        self.master_weight = _initialize_affine_weight_cpu(
            self.weight, self.output_size, self.input_size,
            self.input_size_per_partition, 1, init_method,
            stride=stride, return_master_weight=keep_master_weight_for_test)
    else:
        self.weight = Parameter(torch.empty(
            self.output_size, self.input_size_per_partition,
            device=torch.cuda.current_device(), dtype=args.params_dtype))
        # 切分权重
        _initialize_affine_weight_gpu(self.weight, init_method,
                                      partition_dim=1, stride=stride)
    if bias:
        if args.use_cpu_initialization:
            self.bias = Parameter(torch.empty(self.output_size,
                                              dtype=args.params_dtype))
        else:
            self.bias = Parameter(torch.empty(
                self.output_size, device=torch.cuda.current_device(),
                dtype=args.params_dtype))
        # Always initialize bias to zero.
        with torch.no_grad():
            self.bias.zero_()
    else:
        self.register_parameter('bias', None)

4.3 逻辑整理

为了更好的剖析,咱们引进下图(来自参阅1),这个图对应了 RowParallelLinear 类的前向传达和后向传达进程。这儿的 f 和 g 操作其实是从代码之中抽象出来的,能够理解为 f 是对输入的处理,g 则是处理之后得到终究输出

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

咱们针对上图,整理一下逻辑。

4.3.1 前向传达

咱们一步一步细化。

首要,全体语义为:Y = XA + b。

其次,前向传达时分的逻辑如下:

  • 输入:这儿 A 沿着行做切分,由于A的维度发生了改变,所以X也需求做相应改变,X就必须依照列做切分,这样 X 每个分块才干与A 每个分块进行相乘。这儿假如输入是已经split过的(input_is_parallel 为True),则就不需求再进行split。
  • 核算:核算便是 Y1=X1A1Y_1 = X_1 A_1Y2=X2A2Y_2 = X_2A_2。通过核算之后,输出的 Y1,Y2Y_1, Y_2 的shape便是终究 Y 的shape。每个GPU只要自己对应的分区。
  • 输出:Y1,Y2Y_1, Y_2 只要兼并在一同,才干得到终究输出的 Y。可是由于 Y1,Y2Y_1, Y_2 形状相同,都等于Y的形状,所以只要简略矩阵相加即可。

再次,咱们运用operator来细化一下:

  • 输入:需求对 X 进行纵向切分,这便是一个split操作,得到了 [X1,X2][X_1, X_2],这两个分区要别离放到两个GPU之上。
  • 核算:通过核算之后,每个GPU只要自己对应的分区。
  • 输出:由于Y1,Y2Y_1, Y_2 需求兼并在一同,才干得到终究输出的 Y。这样需求把 Y1Y_1Y2Y_2 相加(由所以两个GPU,所以之间还有等候操作),这便是 all-reduce 操作。

咱们把这些逻辑点在上图上用红色方框标明,输入 X 先通过 f 来处理,输出 Y 是 g 整合之后的成果。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

4.3.2 后向传达

咱们接下来看看后向传达,关于上图来说,后向传达是从上至下,梯度先通过 g,终究被 f 处理。

反向传达的逻辑如下:

  • 现在得到了反向传达上游传过来的梯度 ∂L∂Y\frac{\partial L}{\partial Y},由于 Y1,Y2Y_1, Y_2 的形状相同,所以直接把梯度 ∂L∂Y\frac{\partial L}{\partial Y}传给每个GPU即可,操作是∂L∂Yi=∂L∂Y(identity)\frac{\partial L}{\partial Y_i} = \frac{\partial L}{\partial Y} (id entity)。这儿解释一下,在前向传达时分,XA 的成果需求 all-reduce,能够理解为 sum operator,所以反向传达时分直接复制梯度即可。
  • 每个GPU之上会进行关于X的梯度核算,所以每个GPU都有一份对X的梯度(可是其内容不一样)。
  • 终究需求把各个 GPU 之上关于X的梯度进行聚合,得到完好梯度,便是forward 之中 split 的反向操作,依照终究一列对梯度进行拼接,即all-gather操作。

所以咱们在图上用蓝色圆角矩形标明出来后向传达对应的算子。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

4.4 代码完结

咱们接下来看看代码怎么完结。

4.4.1 RowParallelLinear

RowParallelLinear 的 forward 代码之中,主要是施行了 f 和 g 的forward操作,一起把 f 和 g 的backward 操作树立起来,详细如下:

def forward(self, input_):
    # 这儿,输入的张量已经被切割到每个GPU,输出张量是all-reduce之后的全体
    # Set up backprop all-reduce.
    if self.input_is_parallel:  # 是否已经是split的输入
        # Transformer's MLP 抵达这儿,由于已经split,所以直接就接了输入,不会scatter
        input_parallel = input_
    else:
        # 独立 row parallel 线性层到这儿,会进行前向切分和后向拼接
        input_parallel = scatter_to_tensor_model_parallel_region(input_)
    # Matrix multiply.
    # 比方 X_i 和 A_i 进行乘法操作
    output_parallel = F.linear(input_parallel, self.weight)
    # All-reduce across all the partitions.
    # 进行前向all-reduce操作,这样每个GPU之上都是完好的最新成果,一起树立了后向的identity操作。
    output_ = reduce_from_tensor_model_parallel_region(output_parallel)
    if not self.skip_bias_add:
        # 加上bias
        output = output_ + self.bias if self.bias is not None else output_
        output_bias = None
    else:
        output = output_
        output_bias = self.bias
    return output, output_bias

4.4.1 f 操作

scatter_to_tensor_model_parallel_region 对应了f操作,其作用是:

  • 前向切分split输入,一起树立后向的 all-gather 操作。
  • 后向操作进行 all-gather 操作。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

代码为:

def scatter_to_tensor_model_parallel_region(input_):
    return _ScatterToModelParallelRegion.apply(input_)

详细 _ScatterToModelParallelRegion 完结了实际事务,详细 _split, _gather 操作在前面都介绍过。

class _ScatterToModelParallelRegion(torch.autograd.Function):
    """Split the input and keep only the corresponding chuck to the rank."""
    @staticmethod
    def symbolic(graph, input_):
        return _split(input_)
    @staticmethod
    def forward(ctx, input_):
        return _split(input_)
    @staticmethod
    def backward(ctx, grad_output):
        return _gather(grad_output)

4.4.2 g 操作

reduce_from_tensor_model_parallel_region 对应了 g 操作,作用是:

  • 前向操作是 all-reduce之后得到终究输出.

  • 反向操作则直接复制操作。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

代码为:

def reduce_from_tensor_model_parallel_region(input_):
    return _ReduceFromModelParallelRegion.apply(input_)

详细事务如下:

class _ReduceFromModelParallelRegion(torch.autograd.Function):
    """All-reduce the input from the model parallel region."""
    @staticmethod
    def symbolic(graph, input_):
        return _reduce(input_)
    @staticmethod
    def forward(ctx, input_):
        return _reduce(input_) # 前面有介绍
    @staticmethod
    def backward(ctx, grad_output):
        return grad_output #便是indentity 操作,直接把输入复制到两个GPU之上

0x05 Embedding

咱们接下来看看 embedding。为了让内存做到均衡装备,对embedding也会依照vocab维度来做shard操作,终究把分区放到多个GPU之上。这样每个卡上都有嵌入表的一部分。

class VocabParallelEmbedding(torch.nn.Module):
    """Embedding parallelized in the vocabulary dimension.
    This is mainly adapted from torch.nn.Embedding and all the default
    values are kept.
    Arguments:
        num_embeddings: vocabulary size.
        embedding_dim: size of hidden state.
        init_method: method to initialize weights.
    """
    def __init__(self, num_embeddings, embedding_dim,
                 init_method=init.xavier_normal_):
        super(VocabParallelEmbedding, self).__init__()
        # Keep the input dimensions.
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        # Set the detauls for compatibility.
        self.padding_idx = None
        self.max_norm = None
        self.norm_type = 2.
        self.scale_grad_by_freq = False
        self.sparse = False
        self._weight = None
        self.tensor_model_parallel_size = get_tensor_model_parallel_world_size()
        # Divide the weight matrix along the vocaburaly dimension.
        self.vocab_start_index, self.vocab_end_index = \ # 得到分区的起始,终止方位
            VocabUtility.vocab_range_from_global_vocab_size(
                self.num_embeddings, get_tensor_model_parallel_rank(),
                self.tensor_model_parallel_size)
        self.num_embeddings_per_partition = self.vocab_end_index - \ # 得到分区内嵌入数目
            self.vocab_start_index
        # Allocate weights and initialize.
        args = get_args()
        if args.use_cpu_initialization:
            self.weight = Parameter(torch.empty(
                self.num_embeddings_per_partition, self.embedding_dim,
                dtype=args.params_dtype))
            _initialize_affine_weight_cpu( # 对权重进行分区
                self.weight, self.num_embeddings, self.embedding_dim,
                self.num_embeddings_per_partition, 0, init_method)
        else:
            self.weight = Parameter(torch.empty(
                self.num_embeddings_per_partition, self.embedding_dim,
                device=torch.cuda.current_device(), dtype=args.params_dtype))
            _initialize_affine_weight_gpu(self.weight, init_method, # 对权重进行分区
                                          partition_dim=0, stride=1)

由于每一个GPU仅仅取得了全体嵌入的一部分,所以关于每个worker来说,可能有一个输入找不到嵌入,因而需求对embedding终究输出做一个 all-reduce操作,这样能够得到完好embedding。

def forward(self, input_):
        if self.tensor_model_parallel_size > 1:
            # Build the mask.
        		# input_mask 意思是单词不在本worker的 embedding 分区范围内,所以设置为0
            input_mask = (input_ < self.vocab_start_index) | \
                         (input_ >= self.vocab_end_index)
            # Mask the input.
            masked_input = input_.clone() - self.vocab_start_index
            masked_input[input_mask] = 0
        else:
            masked_input = input_
            # Get the embeddings.
        output_parallel = F.embedding(masked_input, self.weight,
                                      self.padding_idx, self.max_norm,
                                      self.norm_type, self.scale_grad_by_freq,
                                      self.sparse)
        # Mask the output embedding.
        if self.tensor_model_parallel_size > 1:
            output_parallel[input_mask, :] = 0.0
        # Reduce across all the model parallel GPUs.
        output = reduce_from_tensor_model_parallel_region(output_parallel)
        return output

0x06 总结

6.1 MLP并行

咱们总结一下MLP的并行完结,详细如下图,其间逻辑如下:

  • 中间灰色的是论文中的概念图。
  • 联络代码之后,咱们能够知道,其是由一个 ColumnParallelLinear 接上一个 RowParallelLinear 完结的,咱们把概念图转化为图左边两个方框。
  • ColumnParallelLinear 是对权重进行列切分,RowParallelLinear 是对权重进行行切分。
  • 其间 ColumnParallelLinear 的 Y1,Y2Y_1, Y_2 没有通过 all-gather 操作(便是略过了 g 操作),而是直接输入到了 RowParallelLinear 之中,接到了RowParallelLinear 的 X1,X2X_1, X_2,即,RowParallelLinear 没有 f 操作。
  • 概念图之中的 f 便是ColumnParallelLinear 的 f,g 便是 RowParallelLinear 的 g。详细逻辑如图上所示。

[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现

6.2 共轭函数

论文之中提到了共轭函数。

f and g are conjugate. f is an identity operator in the forward pass and all reduce in the backward pass while g is an all reduce in the forward pass and identity in the backward pass.

咱们前面代码之中也有运用到,咱们整理出来如下,其间两两互为共轭函数。

  • copy_to_tensor_model_parallel_region 是前向操作copy(identity),后向操作 all-reduce。

  • reduce_from_tensor_model_parallel_region 是前向操作 all-reduce,后向操作 copy(identity)。

其实,便是MLP之中的 f,g 操作,这两个是共轭函数。

相似,gather_from_tensor_model_parallel_region 是前向操作 all-gather,后向操作 scatter,这和scatter_to_tensor_model_parallel_region 也是共轭函数。

这些函数代码详细如下:

def copy_to_tensor_model_parallel_region(input_):
    return _CopyToModelParallelRegion.apply(input_)
def reduce_from_tensor_model_parallel_region(input_):
    return _ReduceFromModelParallelRegion.apply(input_)
def scatter_to_tensor_model_parallel_region(input_):
    return _ScatterToModelParallelRegion.apply(input_)
def gather_from_tensor_model_parallel_region(input_):
    return _GatherFromModelParallelRegion.apply(input_)

至此,咱们已经完结了对模型并行完结的剖析,下一篇咱们看看在源码之中怎么设定各种并行装备。

0xEE 个人信息

★★★★★★关于日子和技能的考虑★★★★★★

微信公众账号:罗西的考虑

0xFF 参阅

developer.nvidia.com/gtc/2020/sl…

[细读经典]Megatron论文和代码详细剖析(2)

[细读经典]Megatron论文和代码详细剖析(1)

Megatron-LM源码阅读(一)

Megatron-LM源码阅读(二)

megatron学习总结

GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism

大规模练习之 transformer 中的张量模型并行