C/C++教程

[源码解析] PyTorch 流水线并行实现 (2)--如何划分模型

本文主要是介绍[源码解析] PyTorch 流水线并行实现 (2)--如何划分模型,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

[源码解析] PyTorch 流水线并行实现 (2)--如何划分模型

目录
  • [源码解析] PyTorch 流水线并行实现 (2)--如何划分模型
    • 0x00 摘要
    • 0x01 问题
    • 0x01 自动平衡
      • 1.1 Automatic Balancing
      • 1.2 基础函数/函数
        • 1.2.1 Batch
        • 1.2.2 layerwise_sandbox
        • 1.2.3 detach
      • 1.3 据计算时间来平衡
      • 1.4 据内存大小来平衡
      • 1.5 分割算法
    • 0x02 模型划分
      • 2.1 调用
      • 2.2 GPipe构建
      • 2.3 示例
      • 2.4 总结
    • 0xFF 参考

0x00 摘要

上一篇文章我们介绍了 PyTorch 流水线并行的基本知识,本文我们介绍其自动平衡机制和模型分割。

流水线并行其他文章链接如下:

[源码解析] 深度学习流水线并行Gpipe(1)---流水线基本实现

[源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积

[源码解析] 深度学习流水线并行 GPipe(3) ----重计算

[源码解析] 深度学习流水线并行之PipeDream(1)--- Profile阶段

[源码解析] 深度学习流水线并行 PipeDream(2)--- 计算分区

[源码解析] 深度学习流水线并行 PipeDream(3)--- 转换模型

[源码解析] 深度学习流水线并行 PipeDream(4)--- 运行时引擎

[源码解析] 深度学习流水线并行 PipeDream(5)--- 通信模块

[源码解析] 深度学习流水线并行 PipeDream(6)--- 1F1B策略

[源码解析] PyTorch 流水线并行实现 (1)--基础知识

本文图来自论文和github源码。

0x01 问题

流水线并行首先面对的问题就是:

  • 如何把一个大模型切分成若干小模型?切分的算法是什么?
  • 如何把这些小模型分配到多个设备之上?分配的算法是什么?
  • 如何做到整体性能最优或者近似最优?衡量标准是什么?

比如一个拥有 6 个层的大模型,如何切分成三个小模型?

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| Layer 1 +--->  Layer 2 +-----> Layer 3 +----->  Layer 4 +-----> Layer 5  +---> Layer 6  |
|                                                                                         |
+------------------------------------------+----------------------------------------------+
                                           |
                                           |
                                           | ? ? ? ? ?
                                           |
                                           |
                                           v

  +--------------------+         +---------------------+      +--------------------+
  |Device 1            |         |Device 2             |      |Device 3            |
  |                    |         |                     |      |                    |
  |      Layer 1       |    +---------> Layer 4        |      |                    |
  |         +          |    |    |         +           |  +------->   Layer 6      |
  |         |          |    |    |         |           |  |   |                    |
  |         v          |    |    |         |           |  |   |                    |
  |      Layer 2       |    |    |         |           |  |   |                    |
  |         +          |    |    |         v           |  |   |                    |
  |         |          |    |    |      Layer 5 +---------+   |                    |
  |         v          |    |    |                     |      |                    |
  |      Layer 3  +---------+    |                     |      |                    |
  |                    |         |                     |      |                    |
  +--------------------+         +---------------------+      +--------------------+

接下来,我们就看看 torchgpipe 是如何解决这些问题的。

0x01 自动平衡

torchgpipe提供了子模块 torchgpipe.balance 来计算得到分区,目的是让两两分区(pairwise)之间的资源差别尽量小。资源占用情况是通过分析(profile)来计算。

1.1 Automatic Balancing

切分模型会影响GPU的利用率,比如其中计算量较大的层会减慢下游的速度,所以需要找到一个模型的最佳平衡点。但是,确定模型的最佳平衡点是很难的,特别是,如果用户仍在设计模型阶段,则模型体系结构可能会随着时间的推移而改变。在这种情况下,TorchPipe 强烈建议使用 torchgpipe.balance来自动平衡。这不会给用户提供最佳的平衡,但这是一个足够好的平衡。

请注意,这个功能是由torchgpipe提供的,而不是来自Huang等人的GPipe 原始论文。

torchgpipe提供了两个平衡工具,两者都基于每层的profile结果来使用,用户可以根据需要选择平衡工具。

  • ~torchgpipe.balance.balance by_time:跟踪每层的运行时间。
  • ~torchgpipe.balance.balance by_size:检测每层的CUDA内存使用情况。

具体使用方式如下,用户需要向模型中输入一个样本输入。

   from torchgpipe import GPipe
   from torchgpipe.balance import balance_by_time

   partitions = torch.cuda.device_count()
   sample = torch.rand(128, 3, 224, 224) # 用户需要向模型中输入一个样本输入
   balance = balance_by_time(partitions, model, sample)

   model = GPipe(model, balance, chunks=8)

1.2 基础函数/函数

1.2.1 Batch

Batch 是一个基础类,位于 torchgpipe/microbatch.py,其作用是把 tensor 或者 tensors 封装起来做统一处理。Batch 把张量保存在自己的 value 成员变量之中。在调用 call 方法时候,就把传入的方法应用到 value 张量之上

比如后面我们会讲到的 Pipeline.compute 方法之中会有如下,就是把 partition 应用到 batch 内的张量之上:

def compute(batch: Batch = batch,
                            partition: nn.Sequential = partition,
                            skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
                            ) -> Batch:
  with use_skip_tracker(skip_tracker):
  	return batch.call(partition)

	task = Task(streams[j], compute=compute, finalize=None)

具体 Batch 定义如下:

Tensors = Tuple[Tensor, ...]
TensorOrTensors = Union[Tensor, Tensors]
Function = Callable[[TensorOrTensors], TensorOrTensors]

class Batch:
    """An abstraction of an atomic tensor or a tuple of tensors. This
    eliminates every boilerplate code to classify an atomic tensor or a tuple
    of tensors.
    ::

        x = generate_tensor_or_tensors()
        x = Batch(x)

        # in-place update
        x[0] = F.apply(x[0])
        x[:] = F.apply(*x)

        # f(x) if x is a tensor.
        # f(*x) if x is a tuple of tensors.
        # y is also a batch.
        y = x.call(f)

    """

    def __init__(self, value: TensorOrTensors) -> None:
        self.value = value
        self.atomic = torch.is_tensor(value)

    @property
    def tensor(self) -> Tensor:
        """Retrieves the underlying tensor."""
        if not self.atomic:
            raise AttributeError('not atomic batch')
        return cast(Tensor, self.value)

    @property
    def tensors(self) -> Tensors:
        """Retrieves the underlying tensors."""
        if self.atomic:
            raise AttributeError('batch is atomic')
        return cast(Tensors, self.value)

    @property
    def tensor_or_tensors(self) -> TensorOrTensors:
        """Retrieves the underlying tensor or tensors regardless of type."""
        return self.value

    def call(self, function: Function) -> 'Batch': # 这里是关键方法
        """Calls a function by the underlying tensor or tensors. It also wraps
        the output with :class:`Batch`.
        """
        return Batch(function(self.value)) # 调用模型的forward       

1.2.2 layerwise_sandbox

layerwise_sandbox 方法的作用是在不影响原有模型的基础上,拷贝模型的层,这样更容易profile。

def layerwise_sandbox(module: nn.Sequential,
                      device: torch.device,
                      ) -> Generator[nn.Module, None, None]:
    """Copies layers for ease to profile. It doesn't modify the given
    module.
    """
    for layer in module:
        layer_copy = copy.deepcopy(layer)
        layer_copy.to(device)
        layer_copy.train()
        yield layer_copy

1.2.3 detach

detach 方法的作用是从autograd图中detach一些张量,得到一组新的张量。这些张量从当前计算图中被分离下来。但是仍指向原变量的存放位置。detach 可以切断一些分支的反向传播.。

def detach(batch: Batch) -> None:
    """Detaches from autograd graph."""
    for i, x in enumerate(batch):
        batch[i] = x.detach().requires_grad_(x.requires_grad)

torchgpipe代码中,经常可以见到 detach 的使用,这个从注释可以看出来,是因为 PyTorch 的一个bug 而采取的workround。

    # A Python autograd function might fail with this error:
    #
    #   RuntimeError: Returning Variables sharing storage with other Variables
    #   that require grad is not supported in Python functions. Please submit a
    #   feature request if you hit this error.
    #
    # It doesn't look like an essential restriction. But it happens on the
    # current PyTorch version. To avoid it, we should detach the tensor before
    # returning by identity autograd functions, such as Wait, Fork, and Join.
    #

1.3 据计算时间来平衡

balance_by_time 方法的作用就是依据运行时间来平衡,其中参数如下:

  • partitions :分区数目

  • module : 需要分区的顺序模型

  • sample :给定 batch size 的样本

其实就是调用 profile_times 依据sample来得到运行时间,然后进行分区。

def balance_by_time(partitions: int,
                    module: nn.Sequential,
                    sample: TensorOrTensors,
                    *,
                    timeout: float = 1.0,
                    device: Device = torch.device('cuda'),
                    ) -> List[int]:
    """Naive automatic balancing by elapsed time per layer.
    ::

        sample = torch.empty(128, 3, 224, 224)
        balance = balance_by_time(torch.cuda.device_count(), model, sample)
        gpipe = GPipe(model, balance, chunks=8)

    Args:
        partitions (int):
            intended number of partitions
        module (torch.nn.Sequential):
            sequential module to be partitioned
        sample (torch.Tensor):
            example input with arbitrary batch size

    Keyword Args:
        timeout (float):
            profiling iterates again if the timeout (in second) is not exceeded
            (default: ``1.0``)
        device ('cpu' or 'cuda' device):
            CPU or CUDA device where each layer is profiled (default: the
            current CUDA device)

    Returns:
        A list of number of layers in each partition. Use it for the `balance`
        parameter of :class:`~torchgpipe.GPipe`.

    .. note::
        `module` and `sample` must be placed on the same device.

    """
    times = profile_times(module, sample, timeout, torch.device(device))
    return balance_cost(times, partitions)

这里的 Batch 类就是对张量或者张量数组进行封装,可以统一使用其方法。

profile_times 依据sample来得到运行时间,具体逻辑是:

  • 遍历模型中的层,针对每个层:
    • 等待当前设备上所有流中的所有kernel完成
    • 记录起始运行时间
    • 对某层进行前向计算
    • 得到需要梯度的张量,如果存在,则进行后向计算
    • 等待当前设备上所有流中的所有kernel完成
    • 记录终止时间
  • 最后返回一个每层运行时间列表。
def profile_times(module: nn.Sequential,
                  sample: TensorOrTensors,
                  timeout: float,
                  device: torch.device,
                  ) -> List[int]:
    """Profiles elapsed times per layer."""
    if any(p.grad is not None for p in module.parameters()):
        raise ValueError('some parameter already has gradient')

    _batch = Batch(sample)
    for i, x in enumerate(_batch):
        _batch[i] = x.detach().to(device).requires_grad_(x.requires_grad)

    time_bufs: List[List[float]] = [[] for _ in module]
    begun_at = time.time()

    while time.time() - begun_at < timeout:
        batch = _batch

        # 遍历模型中的层
        for i, layer in enumerate(layerwise_sandbox(module, device)):
            detach(batch)

            if device.type == 'cuda':
                torch.cuda.synchronize(device) # 等待当前设备上所有流中的所有kernel完成
            tick = time.time()# 起始运行时间

            # Forward
            batch = batch.call(layer) # 对某层进行前向计算

            # Backward
            # 得到需要梯度的张量
            backward_tensors = tuple(y for y in batch if y.requires_grad)
            # 进行后向计算
            if backward_tensors:
                torch.autograd.backward(backward_tensors, backward_tensors)

            if device.type == 'cuda':
                torch.cuda.synchronize(device) # 等待当前设备上所有流中的所有kernel完成
            tock = time.time() # 终止时间

            time_bufs[i].append(tock - tick)

    us = 1_000_000
    return [sum(int(t*us) for t in buf) for buf in time_bufs]

1.4 据内存大小来平衡

balance_by_size 方法的作用就是依据运行时内存大小来平衡,其中参数如下:

  • partitions :分区数目,从示例看,可以认为是设备数。

  • module : 需要分区的顺序模型

  • sample :给定 batch size 的样本

其实就是调用 profile_sizes 依据sample来得到运行时内存大小,然后进行分区。

在训练期间,参数所需的内存取决于使用哪个优化器。优化器可以为每个参数使用缓冲区来在其内部跟踪优化统计信息,例如SGD中的动量缓冲区。

为了获得更可靠的基于大小的平衡,用户应该为优化器指定相应的“param_scale”。默认的“param_scale”是2,而不是1,这是因为梯度累积(gradient accumulation)是每个优化器所必需的。下面注释之中也给出了一些参考取值。

def balance_by_size(partitions: int,
                    module: nn.Sequential,
                    input: TensorOrTensors,
                    *,
                    chunks: int = 1,
                    param_scale: float = 2.0,
                    device: Device = torch.device('cuda'),
                    ) -> List[int]:
    """Naive automatic balancing by CUDA memory usage per layer.

    During training, required memory for parameters depends on which optimizer
    is used. Optimizers may use buffers for each parameter to track
    optimization statistics internally, such as momentum buffer in SGD.

    To get more reliable size based balance, you should specify `param_scale`
    with regard to your optimizer. The default `param_scale` is 2 instead of 1
    due to gradient accumulation which is necessary for every optimizer.

    Follow this guide to choose correct `param_scale` for typical optimizers:

    =========  =============  =========================================
    Optimizer  `param_scale`  Internal State
    =========  =============  =========================================
    SGD        2--3           (momentum_buffer)
    Adam       4--5           exp_avg, exp_avg_sq, (max_exp_avg_sq)
    Adadelta   4              square_avg, acc_delta
    Adagrad    3              sum
    RMSprop    3--5           square_avg, (momentum_buffer), (grad_avg)
    =========  =============  =========================================

    Here's a simple example with the Adam optimizer::

        balance = balance_by_size(
            torch.cuda.device_count(),
            model,

            # Same size with mini-batch to train
            torch.empty(1024, 3, 224, 224),

            # Number of micro-batches to train with GPipe
            chunks=8,

            # 4 for Adam
            param_scale=4.0,
        )

        gpipe = GPipe(model, balance, chunks=8)
        adam = Adam(gpipe.parameters())

    Args:
        partitions (int):
            intended number of partitions
        module (torch.nn.Sequential):
            sequential module to be partitioned
        input (torch.Tensor):
            example mini-batch with the same size to train

    Keyword Args:
        chunks (int):
            number of micro-batches will be used to train (default: ``1``)
        param_scale (float):
            how many copies of parameters would be allocated for training. It
            depends on optimizer. See the above guide. (default: ``2.0``)
        device ('cuda' device):
            CUDA device where each layer is profiled (default: the current CUDA
            device)

    Returns:
        A list of number of layers in each partition. Use it for the `balance`
        parameter of :class:`~torchgpipe.GPipe`.

    .. note::
        `module` and `input` must be placed on the same CUDA device.

    """
    sizes = profile_sizes(module, input, chunks, param_scale, torch.device(device))
    return balance_cost(sizes, partitions)

profile_sizes 逻辑如下:

  • 遍历模型中的层,针对每个层:
    • 使用 torch.cuda.memory_allocated 计算前向传播用到的显存,就是激活值。torch.cuda.memory_allocated(device=None) 返回给定设备device的张量所占用的当前GPU内存。
    • 使用 p.storage().size() * p.storage().element_size() 计算参数尺寸。
      • pytorch中的storage指的是连续的内存块,而tensor可以认为是映射到storage的视图。
      • element_size() 返回单个元素的字节。
    • 把激活值和参数加在一起,插入列表。
  • 返回内存大小列表。
def profile_sizes(module: nn.Sequential,
                  input: TensorOrTensors,
                  chunks: int,
                  param_scale: float,
                  device: torch.device,
                  ) -> List[int]:
    """Profiles CUDA memory usage per layer."""
    if device.type != 'cuda':
        raise ValueError('size profiler supports only CUDA device')

    batch = Batch(input)
    sizes: List[int] = []

    latent_scale = batch[0].size(0) / chunks
    for i, x in enumerate(batch):
        batch[i] = x[:1].detach().to(device).requires_grad_(x.requires_grad)

    for layer in layerwise_sandbox(module, device):
        detach(batch)

        # Detect memory usage at forward.
        # 计算前向传播用到的显存,就是激活值
        memory_before = torch.cuda.memory_allocated(device)
        batch = batch.call(layer) # 对某层进行前向传播
        memory_after = torch.cuda.memory_allocated(device)
        latent_size = memory_after - memory_before

        # Analyze size of parameters.
        # 计算参数尺寸
        param_size = sum(p.storage().size() * p.storage().element_size()
                         for p in layer.parameters())

        # 把激活值和参数加在一起,插入列表
        # Combine size of parameters and activations with normalize scales.
        size = latent_size*latent_scale + param_size*param_scale
        sizes.append(int(size))

    return sizes # 返回内存大小列表

1.5 分割算法

得到每层的计算时间或者内存大小之后,会通过如下代码来进行具体分割。

times = profile_times(module, sample, timeout, torch.device(device))
return balance_cost(times, partitions)

具体 balance_cost 只是一个封装而已,算法还是 blockpartition.solve。

def balance_cost(cost: List[int], partitions: int) -> List[int]:
    partitioned = blockpartition.solve(cost, partitions)
    return [len(p) for p in partitioned]

从其注释可知,blockpartition.solve 实现了这篇论文的算法。

Implements "Block Partitions of Sequences" by Imre Bárány et al.Paper: https://arxiv.org/pdf/1308.2452.pdf

这是一篇数学论文,其算法伪代码如下(与后续实现中注释基本一一对应)。

该论文是纯粹的数学论证,我们不去研究其内部机制,只是看看其运行结果。

我们回忆一下,这里支持的模型是顺序模型,所以无论时间还是内存大小,都是一个list。solve的作用就是把这个list尽量平均分配成若干组

假设模型有6层,需要分配到两个device之上,那么应该如何分割呢?

blockpartition.solve([1, 2, 3, 4, 5, 6], partitions=2) 

结果是 [[1, 2, 3, 4], [5, 6]]

如果分成三个device,则:

solve([1, 2, 3, 4, 5, 6], partitions=3)

结果是 [[1, 2, 3], [4, 5], [6]]

然后 balance_cost 会获得每一个 partition 的具体层数,得到balance的最终是:

[3,2,1]

分区算法具体代码如下,有兴趣的朋友可以结合论文仔细研究。

def solve(sequence: List[int], partitions: int = 1) -> List[List[int]]:
    """Splits a sequence into several partitions to minimize variance for each
    partition.

    The result might not be optimal. However, it can be done only in O(kn³),
    where k is the number of partitions and n is the length of the sequence.

    """
    if partitions < 1:
        raise ValueError(f'partitions must be a positive integer ({partitions} < 1)')

    n = len(sequence)
    if n < partitions:
        raise ValueError(f'sequence is shorter than intended partitions ({n} < {partitions})')

    # Normalize the sequence in [0, 1].
    minimum = min(sequence)
    maximum = max(sequence) - minimum

    normal_sequence: List[float]
    if maximum == 0:
        normal_sequence = [0 for _ in sequence]
    else:
        normal_sequence = [(x-minimum)/maximum for x in sequence]

    splits = [n//partitions * (x+1) for x in range(partitions-1)] + [n]

    def block_size(i: int) -> float:
        start = splits[i-1] if i > 0 else 0
        stop = splits[i]
        return sum(normal_sequence[start:stop])

    def leaderboard() -> Iterator[Tuple[float, int]]:
        return ((block_size(i), i) for i in range(partitions))

    while True:
        """
        (1) Fix p ∈ [k] with M(P) = bp. So Bp is a maximal block of P.
        """
        # max_size: M(P)
        max_size, p = max(leaderboard())

        while True:
            """
            (2) If M(P) ≤ m(P) + 1, then stop.
            """
            # min_size: m(P)
            min_size, q = min(leaderboard())

            if max_size <= min_size + 1:
                return [sequence[i:j] for i, j in zip([0]+splits[:-1], splits)]

            """
            (3) If M(P) > m(P) + 1, then let m(P) = bq for the q ∈ [k] which is
            closest to p (ties broken arbitrarily). Thus Bq is a minimal block
            of P. Let Bh be the block next to Bq between Bp and Bq. (Note that
            Bh is a non-empty block: if it were, then m(P) = 0 and we should
            have chosen Bh instead of Bq.)
            """
            if p < q:
                """
                So either p < q and then h = q−1 and we define P ∗ by moving
                the last element from Bh = Bq−1 to Bq,
                """
                h = q - 1
                splits[h] -= 1
            else:
                """
                or q < p, and then h = q + 1 and P ∗ is obtained by moving the
                first element of Bh = Bq+1 to Bq.
                """
                h = q + 1
                splits[q] += 1

            """
            Set P = P ∗ . If p = h, then go to (1), else go to (2).
            """
            if p == h:
                break

0x02 模型划分

2.1 调用

既然得到了 profile 的结果,下面就是对模型的各个层进行分割。如何分割可以参见下面注释中的使用示例,把balance 作为参数传递给 GPipe构造函数。

'''
If your model is still under development, its optimal balance would change
frequently. In this case, we highly recommend 'torchgpipe.balance' for naive
automatic balancing:

  from torchgpipe import GPipe
  from torchgpipe.balance import balance_by_time

  partitions = torch.cuda.device_count()
  sample = torch.empty(...)
  balance = balance_by_time(partitions, model, sample)

  model = GPipe(model, balance, ...)
'''

2.2 GPipe构建

Gpipe 的 __init__中可以看到,使用了 split_module 函数进行分割:

    def __init__(self,
                 module: nn.Sequential,
                 balance: Optional[Iterable[int]] = None,
                 *,
                 devices: Optional[Devices] = None,
                 chunks: int = chunks,
                 checkpoint: str = checkpoint,
                 spawn_workersdeferred_batch_norm: bool = False,
                 ) -> None:
        super().__init__()

        chunks = int(chunks)
        checkpoint = str(checkpoint)

        verify_module(module)
        # Verify if the underlying skippable modules satisfy integrity. The
        # integrity can be verified before forward() because it is static.
        verify_skippables(module)

        self.chunks = chunks
        self.checkpoint = checkpoint

        if deferred_batch_norm:
            module = DeferredBatchNorm.convert_deferred_batch_norm(module, chunks)

        if devices is None:
            devices = range(torch.cuda.device_count())
        devices = [torch.device(d) for d in devices]
        devices = cast(List[torch.device], devices)

        try:
            # 对模型进行切分
            self.partitions, self.balance, self.devices = split_module(module, balance, devices)
        except BalanceError as exc:
            raise ValueError(recommend_auto_balance(str(exc)))

        self._copy_streams: List[List[AbstractStream]] = []
        self._skip_layout = inspect_skip_layout(self.partitions)

所以我们看看 split_module 函数,其主要逻辑如下:

  • 遍历模型包含的层
    • 把新的层加入到数组layers中
    • 如果数组大小等于balance[j],就是达到了device j应该包含的层数,则:
      • 把分区数组构建成一个sequential module,得到变量 partition。
      • 利用 partition.to(device) 把partition放置到相关设备之上,这就是前文提到的,~torchgpipe.GPipe使用CUDA进行训练。用户不需要自己将模块移动到GPU,因为~torchgpipe.GPipe自动把每个分区移动到不同的设备上。
      • 把这个partition加入到分区数组中
      • 然后去下一个device看看
  • 最后返回 partitions, balance, devices。
def split_module(module: nn.Sequential,
                 balance: Iterable[int],
                 devices: List[torch.device],
                 ) -> Tuple[List[nn.Sequential], List[int], List[torch.device]]:
    """Splits a module into multiple partitions.

    Returns:
        A tuple of (partitions, balance, devices).

        Partitions are represented as a :class:`~torch.nn.ModuleList` whose
        item is a partition. All layers in a partition are placed in the
        same device.

    Raises:
        BalanceError:
            wrong balance
        IndexError:
            the number of devices is fewer than the number of partitions.

    """
    balance = list(balance)

    j = 0
    partitions = []
    layers: NamedModules = OrderedDict()

    for name, layer in module.named_children(): # 遍历模型包含的层
        layers[name] = layer # 把新的层加入到数组中

        if len(layers) == balance[j]: # 如果数组大小等于balance[j],就是达到了device j应该包含的层数
            # Group buffered layers as a partition.
            partition = nn.Sequential(layers) # 把层数组组合成一个sequential module

            device = devices[j]
            partition.to(device) # 把层放置到相关设备之上

            partitions.append(partition) # 这个新module加入到分区数组中

            # Prepare for the next partition.
            layers.clear()
            j += 1 # 去下一个device看看

    partitions = cast(List[nn.Sequential], nn.ModuleList(partitions))
    del devices[j:]

    return partitions, balance, devices

结合上面例子,balance 如下:

[3,2,1]

所以前三个层 [1, 2, 3] 组合成一个module,中间两个层 [4, 5] 组合成一个 module,最后层 [6] 是一个module。

最后分区数组为:

[ module([1, 2, 3]),  module([4, 5]),  module([6])]

2.3 示例

我们再具体打印输出看看,模型包含了6个层,分为 3 个partitions,分区内的层数分别是:3个,2个,1个。

a = nn.Linear(1, 1)
b = nn.Linear(1, 1)
c = nn.Linear(1, 1)
d = nn.Linear(1, 1)
e = nn.Linear(1, 1)
f = nn.Linear(1, 1)

balance = [3,2,1]
model = nn.Sequential(a, b, c, d, e, f)
print(model)
model = GPipe(model, balance, devices=['gpu', 'gpu','gpu'])
print(model)

结果如下,可以看到原模型被分成3个partition,每个 partition 都是一个Sequential。

Sequential(
  (0): Linear(in_features=1, out_features=1, bias=True)
  (1): Linear(in_features=1, out_features=1, bias=True)
  (2): Linear(in_features=1, out_features=1, bias=True)
  (3): Linear(in_features=1, out_features=1, bias=True)
  (4): Linear(in_features=1, out_features=1, bias=True)
  (5): Linear(in_features=1, out_features=1, bias=True)
)

GPipe(
  (partitions): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=1, out_features=1, bias=True)
      (1): Linear(in_features=1, out_features=1, bias=True)
      (2): Linear(in_features=1, out_features=1, bias=True)
    )
    (1): Sequential(
      (3): Linear(in_features=1, out_features=1, bias=True)
      (4): Linear(in_features=1, out_features=1, bias=True)
    )
    (2): Sequential(
      (5): Linear(in_features=1, out_features=1, bias=True)
    )
  )
)

运行时变量如下:

model = {GPipe: 6} 
 balance = {list: 3} [3, 2, 1]
 checkpoint = {str} 'except_last'
 chunks = {int} 1
 devices = {list: 3} 
  0 = {device} gpu
  1 = {device} gpu
  2 = {device} gpu
 partitions = {ModuleList: 3} 
   _modules = 
   '0' = {Sequential: 3} 
        Sequential( 
        (0): Linear(in_features=1, out_features=1, bias=True)  
        (1): Linear(in_features=1, out_features=1, bias=True) 
        (2): Linear(in_features=1, out_features=1, bias=True))   
   '1' = {Sequential: 2} 
        Sequential(  
        (3): Linear(in_features=1, out_features=1, bias=True)  
        (4): Linear(in_features=1, out_features=1, bias=True))
   '2' = {Sequential: 1} 
        Sequential(
        (5): Linear(in_features=1, out_features=1, bias=True))

需要注意一点:GPipe 的 partitions 成员变量是 nn.ModuleList 类型。nn.ModuleList是一个容器,其储存不同 module,并自动将每个 module 的 parameters 添加到网络中。但是nn.ModuleList 并没有定义一个网络,而只是将不同的模块储存在一起,这些模块之间并没有什么先后顺序,网络的执行顺序是根据 forward 函数来决定的。

随之而来问题就是:partition内部可以用Sequential来进行一系列的前向操作,但是如何配置partitions 之间的执行顺序?这个我们会在后续文章中分析。

2.4 总结

最后总结一下,流程是从上至下。

  1. 使用 balance_by_size 或者 balance_by_time 来先运行系统,得到 profile 结果。
  2. 然后使用 split_module 来对模型进行分割。
  3. 最后就得到了一个相对平衡的分区结果。
  4. 把这些分区分配到不同的设备之上。

具体如下图:

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| Layer 1 +--->  Layer 2 +-----> Layer 3 +----->  Layer 4 +-----> Layer 5  +---> Layer 6  |
|                                                                                         |
+--------------------------+---------------------------+----------------------------------+
                           |                           |
           balance_by_size | 1                       1 |  balance_by_time
                           |                           |
                           v                           v
                [[1, 2, 3], [4, 5], [6]]         [[1, 2, 3, 4], [5, 6]]
                           +                           +
                           |                           |
                           +-----------+      +--------+
                                       |      |
                                       v      v
                                 2  split_module
                                          +
                                          |
                                          |
   3                                      v
 +------------------------------------------------------------------------------------+
 | +--------------------+         +---------------------+      +--------------------+ |
 | |Partition 1         |         |Partition 2          |      |Partition 3         | |
 | |                    |         |                     |      |                    | |
 | |      Layer 1       |    +---------> Layer 4        |      |                    | |
 | |         +          |    |    |         +           |  +------->   Layer 6      | |
 | |         |          |    |    |         |           |  |   |                    | |
 | |         v          |    |    |         |           |  |   |                    | |
 | |      Layer 2       |    |    |         |           |  |   |                    | |
 | |         +          |    |    |         v           |  |   |                    | |
 | |         |          |    |    |      Layer 5 +---------+   |                    | |
 | |         v          |    |    |                     |      |                    | |
 | |      Layer 3  +---------+    |                     |      |                    | |
 | |                    |         |                     |      |                    | |
 | +---------+----------+         +---------+-----------+      +-----------+--------+ |
 |           |                              |                              |          |
 +------------------------------------------------------------------------------------+
             |                              |                              |
           4 |                            4 |                            4 |
             v                              v                              v
   +---------+----------+         +---------+-----------+       +----------+---------+
   |                    |         |                     |       |                    |
   |    Device 1        |         |     Device 2        |       |     Device 3       |
   |                    |         |                     |       |                    |
   +--------------------+         +---------------------+       +--------------------+

至此,我们分析了自动平衡机制,下一篇我们看看如何切分数据和一些运行时机制。

0xFF 参考

https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

CUDA学习:基础知识小结

CUDA随笔之Stream的使用

NVIDIA解决方案架构师深度解析大规模参数语言模型Megatron-BERT

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

https://github.com/NVIDIA/apex/

https://github.com/justheuristic/prefetch_generator

https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

https://pytorch.org/docs/stable/autograd.html

https://pytorch.org/docs/notes/cuda.html

https://zhuanlan.zhihu.com/p/61765561

https://pytorch.apachen.org/docs/1.7/64.html

https://zhidx.com/p/217999.html

这篇关于[源码解析] PyTorch 流水线并行实现 (2)--如何划分模型的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!