前文我们对DDP的一些支撑模块已经做了介绍,这为本文做了必要的铺垫,本文就开始介绍Python世界代码和C++世界的初始化部分。下文介绍C++世界的核心代码。
本系列其他文章如下:
深度学习利器之自动微分(1)
深度学习利器之自动微分(2)
[源码解析]深度学习利器之自动微分(3) --- 示例解读
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[源码解析] PyTorch 分布式(4)------分布式应用基础概念
[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用
[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store
[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组
[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇
DDP是数据并行训练的实现,为了唤醒大家的记忆,我们还是要看看数据并行的一个整体流程,来自fairscale github源码。
以下文字翻译自 https://pytorch.org/docs/master/notes/ddp.html,这是DDP架构的一个总论。
下面是 DDP 实现组件。堆栈图显示了代码的结构。
我们顺着此架构图从上往下看。
最上面是分布式数据并行组件。
nn.parallel.DistributedDataParallel
模块的forward
函数,该模块会调用C++库。_sync_param
功能是:当一个DDP进程在多个设备上工作时,会执行进程内参数同步,并且它还从rank 0 进程向所有其他进程广播模型缓冲区。Reducer.cpp
之中实现。Reducer
: 其构造函数在distributed.py
被调用,Reducer
将注册 Reducer::autograd_hook()
到梯度累加器。autograd_hook()
当梯度就绪时,autograd 引擎将调用该函数。prepare_for_backward()
在 distributed.py
之中,当 DDP 前向传递结束时,会调用prepare_for_backward()
。如果在DDP构造函数中,把find_unused_parameters
设置为True
,DDP 会遍历 autograd 计算图以查找未使用的参数。以下是两个进程相关组件。
c10d
库提供了 3 个开箱即用的实现,即 ProcessGroupGloo,ProcessGroupNCCL和ProcessGroupMPI。 DistributedDataParallel
用ProcessGroup::broadcast()
在初始化期间将模型状态从rank 0 的进程发送到其他进程,并对ProcessGroup::allreduce()
梯度求和。我们把论文和 https://pytorch.org/docs/master/notes/ddp.html 结合起来,看看 DDP 总体实现。
我们总结一次DistributedDataParallel迭代中的步骤如下(与上图不完全一致,有部分细化):
Prerequisite:
ProcessGroup
进行通信。因此,应用程序必须ProcessGroup
在构建 DDP 之前创建实例。Constuctor:
rank 0 进程会引用本地模块,把模型state_dict()
参数广播到所有进程之中,这样可以保证所有进程使用同样初始化数值和模型副本进行训练。
每个 DDP 进程创建一个 local Reducer
,稍后将在向后传递期间处理梯度同步。
为了提高通信效率,Reducer
将参数梯度组织成桶,一次规约一个桶。
Model.parameters()
与给定模型相反的顺序分配到桶中 。使用相反顺序的原因是因为 DDP 期望梯度在反向传递期间以大约该顺序准备就绪。grad0
和grad1
在 bucket1
中,另外两个梯度在 bucket0
中。当然,这种假设可能并不总是正确的,当这种情况发生时,它可能会损害 DDP 后向速度,因为它无法 Reducer
尽早开始通信。除了分桶,Reducer
还在构造期间注册 autograd 钩子,每个参数一个钩子。当梯度准备好时,将在向后传递期间触发这些钩子。具体就是遍历参数,为每个参数加上 grad_accumulator 和 autograd_hook。
Forward Pass:
find_unused_parameters
设置为True
,DDP 会分析本地模型的输出,从 out 开始遍历计算图,把未使用参数标示为 ready,因为每次计算图都会改变,所以每次都要遍历。
Reducer
只会等待未准备好的参数,但它仍然会规约所有桶。将参数梯度标记为就绪并不能帮助 DDP 跳过桶,但它会阻止 DDP 在向后传递期间永远等待不存在的梯度。find_unused_parameters
为True
。Backward Pass:
backward()
在 loss 上直接调用该函数 Tensor
,这是 DDP 无法控制的,DDP 使用构造时注册的 autograd hooks 来触发梯度同步。当一个梯度准备好时,它在该梯度累加器上的相应 DDP 钩子将触发。Reducer
启动异步allreduce
以计算所有进程的梯度平均值。Reducer
将阻塞等待所有allreduce
操作完成。完成此操作后,将平均梯度写入param.grad
所有参数的字段。Optimizer Step:
因为 Python 世界是可以在很多时刻给类设置成员变量,因此我们还是从 __init__
看起。
__init__
其核心逻辑是:
设置设备类型。
设置设备IDs。
设置 self.process_group,默认就是 GroupMember.WORLD。
配置各种类成员变量。
检查 parameters。
设定bucket大小。
构建参数。
将 rank 0 的state_dict() 广播到其他worker,以保证所有worker的模型初始状态相同。
建立reducer。
具体代码如下:
class DistributedDataParallel(Module): def __init__( self, module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, ): super(DistributedDataParallel, self).__init__() # 设置设备类型 self.is_multi_device_module = len({p.device for p in module.parameters()}) > 1 distinct_device_types = {p.device.type for p in module.parameters()} self.device_type = list(distinct_device_types)[0] # 设置设备IDs if ( device_ids is None or len(device_ids) == 0 # For backward compatibility. or self.device_type == "cpu" or self.is_multi_device_module ): self.device_ids = None self.output_device = None else: self.device_ids = [_get_device_index(x, True) for x in device_ids] if output_device is None: output_device = device_ids[0] self.output_device = _get_device_index(output_device, True) # 设置process group if process_group is None: self.process_group = _get_default_group() else: self.process_group = process_group # 配置各种成员变量 self.static_graph = False self.dim = dim self.module = module self.device = list(self.module.parameters())[0].device self.broadcast_buffers = broadcast_buffers self.find_unused_parameters = find_unused_parameters self.require_backward_grad_sync = True self.require_forward_param_sync = True self.ddp_uneven_inputs_config = _DDPUnevenInputsConfig( ddp_join_enabled=False, ddp_join_divide_by_initial_world_size=False, ddp_join_throw_on_early_termination=False, ) self.gradient_as_bucket_view = gradient_as_bucket_view if hasattr(module, "_ddp_params_and_buffers_to_ignore"): self.parameters_to_ignore = module._ddp_params_and_buffers_to_ignore else: self.parameters_to_ignore = [] # 检查 parameters # Check that a module does not have Uninitialized parameters for param in module.parameters(): if isinstance(param, torch.nn.parameter.UninitializedParameter): raise RuntimeError( "Modules with uninitialized parameters can't be used with `DistributedDataParallel`. " "Run a dummy forward pass to correctly initialize the modules" ) # used for intra-node param sync and inter-node sync as wel self.broadcast_bucket_size = int(250 * 1024 * 1024) # reduction bucket size self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024) # Whether to perform input tensor CPU to GPU copies on a side-stream self.use_side_stream_for_tensor_copies = ( os.environ.get("PYTORCH_DDP_USE_SIDE_STREAM", "1") == "1" ) # 构建参数 # TODO(wayi@): Remove this field since SPMD is no longer supported, # and also remove all the relevant unnecessary loops. # Module replication within process (single-process multi device) # 这里需要注意,就是以后不支持了 self._module_copies = [self.module] # Build parameters for reducer. parameters, expect_sparse_gradient = self._build_params_for_reducer() # Verify model equivalence. dist._verify_model_across_ranks(self.process_group, parameters) # Sync params and buffers. Ensures all DDP models start off at the same value. # 将 rank 0 的state_dict() 广播到其他worker,以保证所有worker的模型初始状态相同; self._sync_params_and_buffers(authoritative_rank=0) # In debug mode, build a mapping of parameter index -> parameter. if dist._get_debug_mode() != dist._DistributedDebugLevel.OFF: param_to_name_mapping = self._build_param_to_name_mapping(parameters) else: param_to_name_mapping = {} # Builds reducer. self._ddp_init_helper(parameters, expect_sparse_gradient, param_to_name_mapping)
我们接下来选择一些重要步骤进行分析。
对于 DDP,第一个关键步就是构建参数,这里要注意,如果目前情况是单机多GPU,也就是单进程多设备(和DP一样了)情况,那么需要在进程之内进行模型复制。
但是未来不会支持了,会去掉。所以 parameters 就是 [ToyModel] 的参数集合,parameters[0] 就是 ToyModel 的参数。后面介绍 BucketReplica 会提到。
# TODO(wayi@): Remove this field since SPMD is no longer supported, # and also remove all the relevant unnecessary loops. # Module replication within process (single-process multi device) self._module_copies = [self.module] # 构建一个比如 [ToyModel] 这样的列表 # Build parameters for reducer. parameters, expect_sparse_gradient = self._build_params_for_reducer()
我们看看模型中有哪些重要参数:
model.parameters()
得到这些参数。model.buffers()
得到这些参数。具体 _build_params_for_reducer 就为reducer建立参数,逻辑大致如下:
# 之前在初始化过程中,设定了 self._module_copies = [self.module] def _build_params_for_reducer(self): # Build tuple of (module, parameter) for all parameters that require grads. modules_and_parameters = [ [ (module, parameter) # 得到module列表 for module_name, module in replica.named_modules() # 得到参数列表,并且参数是需要求导,不在忽略列表之中 for parameter in [ param # Note that we access module.named_parameters instead of # parameters(module). parameters(module) is only needed in the # single-process multi device case, where it accesses replicated # parameters through _former_parameters. for param_name, param in module.named_parameters(recurse=False) if param.requires_grad and f"{module_name}.{param_name}" not in self.parameters_to_ignore ] ] for replica in self._module_copies ] # Deduplicate any parameters that might be shared across child modules. # 用集合去除可能在多个modules中共享的参数 memo = set() modules_and_parameters = [ # "p not in memo" is the deduplication check. # "not memo.add(p)" is always True, and it's only there to cause "add(p)" if needed. [(m, p) for m, p in replica_mps if p not in memo and not memo.add(p)] for replica_mps in modules_and_parameters ] # Build list of parameters. # 构建一个参数列表 parameters = [ list(parameter for _, parameter in replica) for replica in modules_and_parameters ] # Checks if a module will produce a sparse gradient. def produces_sparse_gradient(module): if isinstance(module, torch.nn.Embedding) or isinstance( module, torch.nn.EmbeddingBag ): return module.sparse return False # Build list of booleans indicating whether or not to expect sparse # gradients for the corresponding parameters. # 参数是否期盼sparse gradients expect_sparse_gradient = [ list(produces_sparse_gradient(module) for module, _ in replica) for replica in modules_and_parameters ] # The following modules_params and modules_buffers are used for # param/buffer sync in _sync_params. # 得到module的参数,与下面的buffer一起,都是用来同步到其他worker的 self.modules_params = [ list(self._get_parameters(m)) for m in self._module_copies ] # Collect buffers for modules, filtering out buffers that should be ignored. # 得到module的buffer,module_buffers 在后续同步时候会用到 named_module_buffers = [ [(buffer, buffer_name) for buffer_name, buffer in m.named_buffers()] for m in self._module_copies ] self.modules_buffers = [ [ buffer for (buffer, buffer_name) in module_buffers if buffer_name not in self.parameters_to_ignore ] for module_buffers in named_module_buffers ] return parameters, expect_sparse_gradient
此时 parameters 示例如下,可以看到其只有 [0] 元素有意义,这个 [0] 原始本身包括4个元素:
parameters = {list: 1} 0 = {list: 4} 0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02, 3.8828e-02, 1 ) 1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033, 0.2771, 0.0721, ) 2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319, 0.0713, 0.3155, ) 3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008, 0.0582, -0.1245, -0.2538, ) __len__ = {int} 4 __len__ = {int} 1
这里多说一句,何处用到 self.modules_buffers?后来在广播参数时候就会用到,比如:
# When running in join mode, checks and performs sync of module buffers if # the models have buffers that should be synchronized in the forward pass. def _check_and_sync_module_buffers(self): if self.will_sync_module_buffers(): authoritative_rank = self._find_common_rank(self._distributed_rank, False) self._distributed_broadcast_coalesced( self.modules_buffers[0], self.broadcast_bucket_size, authoritative_rank )
这里使用了 _find_common_rank 来得到目前 DDP 使用的所有有效 ranks。
def _find_common_rank(self, input_rank, rank_cond): # -1 indicates that this rank is not under consideration to be the # common_rank rank_to_use = torch.tensor( [input_rank if rank_cond else -1], device=self.device, ) # 使用MAX操作得到最大数值 dist.all_reduce(rank_to_use, op=ReduceOp.MAX, group=self.process_group) if rank_to_use.item() == -1: raise ValueError( "BUG! Expected rank_cond to be true for at least one process." ) return rank_to_use.item() # 返回全部ranks
接下来是验证模型阶段。
因为后续用到了如下代码,所以我们首先看看背景知识 broadcast。不熟悉这部分的朋友会有疑问是:为什么 broadcast 可以从 rank 0 广播到其他rank,明明所有rank都调用到了同样的 broadcast 代码。
process_group->broadcast(vec)->wait(); // 把 rank 0 的 meta 广播到对应的设备
我们来到 torch/lib/c10d/ProcessGroupMPI.cpp。可以看到,其使用了 MPI 的 MPI_Bcast API 来进行广播操作,其中 opts.rootRank是关键所在。
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroupMPI::broadcast( std::vector<at::Tensor>& tensors, const BroadcastOptions& opts) { checkSingleTensor(tensors); std::function<void(std::unique_ptr<WorkEntry>&)> runFunc = [opts, this](std::unique_ptr<WorkEntry>& entry) { auto data = (entry->src)[0]; c10::DeviceGuard guard(data.device()); std::unique_lock<std::mutex> globalLock(pgGlobalMutex_); MPI_CHECK(MPI_Bcast( // 调用MPI API data.data_ptr(), data.numel(), mpiDatatype.at(data.scalar_type()), opts.rootRank, // 这里是关键,只是从root广播其他rank pgComm_)); }; auto entry = std::make_unique<WorkEntry>(&tensors, &tensors, std::move(runFunc)); return enqueue( std::move(entry), "mpi:broadcast", c10::optional<std::vector<at::Tensor>>(tensors)); }
opts 是 BroadcastOptions 的实例。
class BroadcastOptions: rootRank: int rootTensor: int timeout: timedelta
在 C++ 世界对应了如下:
struct BroadcastOptions { int rootRank = 0; int rootTensor = 0; std::chrono::milliseconds timeout = kUnsetTimeout; };
在定义时候看到,BroadcastOptions 被C++自动初始化为0,所以所有 rank 的进程都是使用 rootRank = 0 进行调用 MPI_Bcast,结果就是从 rank = 0 来向其他 rank 进行广播。
c10::intrusive_ptr<ProcessGroup::Work> broadcast( std::vector<at::Tensor>& data, const BroadcastOptions& opts = BroadcastOptions()) override;
我们接下来看看如何验证模型。
_verify_model_across_ranks 的作用是验证模型(replica 0)的相关参数在广播之后,跨进程时候拥有同样的size/strides。
# Verify model equivalence. dist._verify_model_across_ranks(self.process_group, parameters)
通过下面代码我们可知,_verify_model_across_ranks 实际调用到verify_replica0_across_processes。
module.def( "_verify_model_across_ranks", &::c10d::verify_replica0_across_processes, py::arg("process_group"), py::arg("replicas"), py::call_guard<py::gil_scoped_release>());
verify_replica0_across_processes 之中,参数model_replicas 就是前面的 parameters,其逻辑如下:
具体代码如下:
// Verifies corresponding params in replica 0 have the same sizes/strides // across processes. void verify_replica0_across_processes( c10::intrusive_ptr<c10d::ProcessGroup> process_group, std::vector<std::vector<at::Tensor>> model_replicas) { size_t i = 0; for (const auto& t : model_replicas[0]) { i += 2 * t.dim(); } at::TensorOptions options; options = options.dtype(at::kLong); auto metadata = at::empty({static_cast<long>(i)}, options); // Technically, process 0 is the broadcast source, so only process 0 needs // to populate metadata. But no harm keeping work aligned across processes. auto metadata_accessor = metadata.accessor<int64_t, 1>(); i = 0; // 把model_replicas[0]拷贝到metadata_accessor,其实就是metadata for (const auto& t : model_replicas[0]) { for (const auto& sz : t.sizes()) { metadata_accessor[i++] = sz; } for (const auto& str : t.strides()) { metadata_accessor[i++] = str; } } // 然后把metadata克隆到metadata_dev auto metadata_dev = metadata.clone().to(model_replicas[0][0].device()); std::vector<at::Tensor> vec{metadata_dev}; // 广播metadata_dev process_group->broadcast(vec)->wait(); // 把process 0 的 meta 广播到对应的设备 // 这之后,metadata_dev 就是所有进程的结果大家都一样了 // Technically, process 0 doesn't need to double-check metadata, because it // was the source. But no harm keeping work aligned. auto control = at::empty({static_cast<long>(i)}, options); // 把 metadata_dev 拷贝回 control control.copy_(metadata_dev, /*non_blocking=*/false); // 然后把 control 和 model_replicas[0]比较,看看是否和原来相等 auto control_accessor = control.accessor<int64_t, 1>(); i = 0; for (size_t p = 0; p < model_replicas[0].size(); p++) { const auto& t = model_replicas[0][p]; // I'd like to include which process we are in the message, // but ProcessGroup::getRank is not public! for (const auto& sz : t.sizes()) { TORCH_CHECK( sz == control_accessor[i++], "replicas[0][", p, "] in this process" " with sizes ", t.sizes(), " appears not to match sizes of the same param in process 0."); } for (const auto& str : t.strides()) { TORCH_CHECK( str == control_accessor[i++], "replicas[0][", p, "] in this process" " with strides ", t.strides(), " appears not to match strides of the same param in process 0."); } } }
下一步是广播状态,把模型初始参数和变量从 rank 0 广播到其他 ranks。
# Sync params and buffers. Ensures all DDP models start off at the same value. # 将 rank 0 的state_dict() 广播到其他worker,以保证所有worker的模型初始状态相同; self._sync_params_and_buffers(authoritative_rank=0)
我们先来看看需要广播什么。
pytorch 的 state_dict 是一个字典对象,其将模型的每一层与它的对应参数建立映射关系,比如 model 每一层的weights及偏置等等。只有那些参数可以训练的层(比如卷积层,线性层等)才会被保存到模型的state_dict中,池化层、BN层这些本身没有参数的层就不会保存在 state_dict 之中,比如针对下面模型。
class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5)
state_dict 如下:
self.module.state_dict() = {OrderedDict: 4} 'net1.weight' = {Tensor: 10} tensor([[ 0.2687, 0.0840, -0.1032, 0.3079, 0.0385, -0.0495, -0.3068, -0.1271,\n -0.1067, -0.1966],\n [-0.1203, 0.1789, 0.0666, 0.1882, 0.1335, 0.1921, -0.1145, -0.1781,\n 0.0661, -0.2339],\n [ 0.1865, -0.2076, 0.2071, 0 'net1.bias' = {Tensor: 10} tensor([ 0.2146, -0.1599, 0.2350, -0.2843, -0.0773, -0.2151, 0.1864, -0.3068,\n -0.2093, 0.1365]) 'net2.weight' = {Tensor: 5} tensor([[ 0.1922, -0.0148, -0.1884, 0.2124, -0.1361, 0.0172, -0.2371, 0.1946,\n 0.2047, -0.2697],\n [-0.2690, 0.1372, 0.2269, 0.0436, -0.1353, -0.2054, -0.2418, -0.2300,\n 0.1987, 0.0007],\n [ 0.0995, -0.2659, -0.2374, -0 'net2.bias' = {Tensor: 5} tensor([0.1488, 0.0791, 0.1667, 0.1449, 0.0545])
_sync_params_and_buffers 是依据 module的state_dict 来收集可以训练的参数,然后把这些参数广播出去。
具体代码是:
def _sync_params_and_buffers(self, authoritative_rank=0): module_states = [] for name, param in self.module.state_dict().items(): if name not in self.parameters_to_ignore: module_states.append(param) # module_states = {list: 4} [tensor([[ 0.2687, 0.0840, -0.1032, 0.3079, 0.0385, -0.0495, -0.3068, -0.1271,\n -0.1067, -0.1966],\n [-0.1203, 0.1789, 0.0666, 0.1882, 0.1335, 0.1921, -0.1145, -0.1781,\n 0.0661, -0.2339],\n [ 0.1865, -0.2076, 0.2071, if len(module_states) > 0: self._distributed_broadcast_coalesced( module_states, self.broadcast_bucket_size, authoritative_rank )
我们看看,_distributed_broadcast_coalesced
调用了 dist._broadcast_coalesced
import torch.distributed as dist def _distributed_broadcast_coalesced( self, tensors, buffer_size, authoritative_rank=0 ): dist._broadcast_coalesced( self.process_group, tensors, buffer_size, authoritative_rank )
我们沿着代码来寻找,首先来到 torch\distributed_init_.py,这里会导入 _broadcast_coalesced。
if is_available(): from torch._C._distributed_c10d import ( Store, FileStore, TCPStore, ProcessGroup, PrefixStore, Reducer, Logger, BuiltinCommHookType, GradBucket, _DEFAULT_FIRST_BUCKET_BYTES, _register_comm_hook, _register_builtin_comm_hook, _broadcast_coalesced, # 在这里导入 _compute_bucket_assignment_by_size, _verify_model_across_ranks, _test_python_store, _DistributedDebugLevel, _get_debug_mode ) if sys.platform != 'win32': from torch._C._distributed_c10d import ( HashStore, _round_robin_process_groups, ) from .distributed_c10d import * # noqa: F403 # Variables prefixed with underscore are not auto imported # See the comment in `distributed_c10d.py` above `_backend` on why we expose # this. from .distributed_c10d import _backend, _all_gather_base
我们继续找到 torch\csrc\distributed\c10d\init.cpp
module.def( "_broadcast_coalesced", // Define a lambda such that the pybind11 prototype can take a std::vector // for the tensor list argument, but still pass it to the underlying // function as a c10::ArrayRef. [](c10::intrusive_ptr<::c10d::ProcessGroup> process_group, std::vector<at::Tensor> tensors, // NOLINT size_t buffer_size, int rank) { broadcast_coalesced( // 在这里 std::move(process_group), tensors, buffer_size, rank); }, py::arg("process_group"), py::arg("tensors"), py::arg("buffer_size"), // The source of truth rank to broadcast the tensors from. py::arg("src") = 0, py::call_guard<py::gil_scoped_release>());
最后来到了 torch/lib/c10d/comm.cpp,这里利用 ProcessGroup 对张量进行广播。
// Broadcast many tensors to all processes in the process group. void broadcast_coalesced( c10::intrusive_ptr<c10d::ProcessGroup> process_group, at::TensorList tensors, size_t buffer_size, int rank) { // Coalesce tensors into buckets taking into account the maximum buffer size. // This routine is multi-device aware, so the tensors can be split across // multiple devices and can contain a mix of CPU and CUDA tensors. // 首先计算出桶 const auto buckets = compute_bucket_assignment_by_size(tensors.vec(), {buffer_size}); // Returns tensor at specified index in input tensor list. const auto lookup = [&tensors](size_t index) { return tensors[index]; }; // We maintain a maximum of 2 in flight broadcast operations to avoid // allocating too much memory (in case the specified tensors are very large). std::deque<BroadcastWork> in_flight; // 建立一个广播work列表 constexpr auto max_in_flight = 2; for (const auto& bucket : buckets) { // 遍历桶 if (in_flight.size() >= max_in_flight) { // 由注释可以知道,广播维度是2,这样避免内存占用过大 in_flight.front().finish(); // 广播变量 in_flight.pop_front(); } in_flight.emplace_back(process_group, c10::fmap(bucket, lookup), rank); } while (!in_flight.empty()) { in_flight.front().finish(); in_flight.pop_front(); } }
对于BroadcastWork,我们补充说明一下,就是利用 ProcessGroup 来把张量广播出去,ProcessGroup 具体可以参见前面文章。
class BroadcastWork { public: BroadcastWork( const c10::intrusive_ptr<c10d::ProcessGroup>& process_group, std::vector<at::Tensor> bucket_tensors, int root_rank = 0) : bucket_tensors_(std::move(bucket_tensors)), flat_tensor_({torch::utils::flatten_dense_tensors(bucket_tensors_)}) { BroadcastOptions broadcastOptions; broadcastOptions.rootRank = root_rank; work_ = process_group->broadcast(flat_tensor_, broadcastOptions); } void finish() { work_->wait(); // Copy the output of the broadcast operation back. auto output_tensors = torch::utils::unflatten_dense_tensors( flat_tensor_.front(), bucket_tensors_); TORCH_INTERNAL_ASSERT(output_tensors.size() == bucket_tensors_.size()); for (size_t i = 0; i < output_tensors.size(); i++) { bucket_tensors_[i].copy_(output_tensors[i], /*non_blocking=*/true); } } protected: // The list of tensors to broadcast. They are guaranteed to be // placed on the same device and have the same dtype. std::vector<at::Tensor> bucket_tensors_; // The vector with a single flattened tensor containing the contents // of the tensors in bucket_tensors_. It must be stored in a vector // because c10d::ProcessGroup::broadcast takes a vector argument. std::vector<at::Tensor> flat_tensor_; private: // The broadcast work that is kicked off upon construction. c10::intrusive_ptr<c10d::ProcessGroup::Work> work_; };
接下来会调用 _ddp_init_helper 进行初始化业务函数。
_ddp_init_helper 是用来初始化业务的函数,其主要逻辑如下:
具体代码如下:
def _ddp_init_helper(self, parameters, expect_sparse_gradient, param_to_name_mapping): """ Initialization helper function that does the following: (1) bucketing the parameters for reductions (2) resetting the bucketing states (3) registering the grad hooks (4) Logging constructin-time DDP logging data (5) passing a handle of DDP to SyncBatchNorm Layer """ self.num_iterations = 0 # The bucket size limit is specified in the constructor. # Additionally, we allow for a single small bucket for parameters # that are defined first, such that their gradients don't spill into # a much larger bucket, adding unnecessary latency after gradient # computation finishes. Experiments showed 1MB is a reasonable value. bucket_indices = dist._compute_bucket_assignment_by_size( parameters[0], [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap], expect_sparse_gradient[0], ) # Note: reverse list of buckets because we want to approximate the # order in which their gradients are produced, and assume they # are used in the forward pass in the order they are defined. self.reducer = dist.Reducer( parameters, list(reversed(bucket_indices)), # 利用桶index self.process_group, expect_sparse_gradient, self.bucket_bytes_cap, self.find_unused_parameters, self.gradient_as_bucket_view, param_to_name_mapping, ) self.logger = dist.Logger(self.reducer) # Set logging data that can be got during construction time. self.logger.set_construction_data_and_log( self.module.__class__.__name__, [] if self.device_ids is None else self.device_ids, -1 if self.output_device is None else self.output_device, self.broadcast_buffers, ) # passing a handle to torch.nn.SyncBatchNorm layer self._passing_sync_batchnorm_handle(self._module_copies)
首先,_compute_bucket_assignment_by_size 完成了分桶功能。这里parameters[0] 就是对应的张量列表。
_DEFAULT_FIRST_BUCKET_BYTES = 1048576 # reduction bucket size self.bucket_bytes_cap = int(bucket_cap_mb * 1024 * 1024) bucket_indices = dist._compute_bucket_assignment_by_size( parameters[0], # 桶的大小限制是一个数组 [dist._DEFAULT_FIRST_BUCKET_BYTES, self.bucket_bytes_cap], expect_sparse_gradient[0], )
我们接下来就要结合论文内容来分析。
梯度bucketing的思想是基于这样一个观察,即集合通信在大张量上更有效。
实验表明,如果DDP在短时间内等待并将多个梯度存储到一个AllReduce操作中,它可以实现更高的吞吐量和更低的延迟,而不是在每个梯度存储可用时立即启动专用的AllReduce。这对于具有许多小参数的模型尤其有用。但是,DDP不应在一个AllReduce中传输所有数据,否则,在计算结束之前无法启动任何通信。
参数到桶映射(Parameter-to-Bucket Mapping)对DDP速度有相当大的影响。在每次向后传播中,将所有参数梯度中的张量复制到桶中,并在AllReduce之后将平均梯度复制回桶中。为了加速复制操作,存储桶始终与参数在同一设备上创建。如果模型跨越多个设备,DDP会考虑设备关联性,以确保同一存储桶中的所有参数都位于同一设备上。AllReduce的顺序也会对结果产生影响,因为它决定了多少通信可以与计算重叠。DDP按model.parameters()的相反顺序启动AllReduce。
所以,为了提高通信效率,DDP 将Reducer
参数梯度组织成为桶,一次规约一个桶。从参数梯度到桶的映射是在构建时根据桶大小限制和参数大小确定的,。用户可以通过设置bucket_cap_mb来配置桶的大小。
模型参数以(大致)Model.parameters()
与给定模型相反的顺序分配到桶中 。使用相反顺序的原因是:
DDP 按照类型和设备作为key来分组,因为不同设备上的tensor不应该分在一组上,同类型张量应该分在一桶。用类型和设备作为key 就可以保证同设备上同类型张量分配在同一个桶里。
// Tensors may be coalesced into buckets. Buckets must contain tensors of // the same type, on the same device, so a bucket can identified by a // composite key of a tensor's type identifier and its device. struct BucketKey { BucketKey(c10::ScalarType type, c10::Device device) : type(std::move(type)), device(std::move(device)) {} const c10::ScalarType type; const c10::Device device; // See torch/csrc/utils/hash.h for dispatch code. static size_t hash(const BucketKey& key) { return c10::get_hash(key.type, key.device); // 用类型和设备作为key } };
其关键结构如下,BucketAccumulator 可以认为是实际的桶。
struct BucketAccumulator { std::vector<size_t> indices; // 桶内容,是张量列表 size_t size = 0; // 桶大小,比如若干mb }; // 桶的逻辑内容 // Keep vector of indices and size accumulator by tensor type and device. std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>> buckets; // 所有桶的列表,每一个实际桶可以认为是 BucketAccumulator
我们来看看 compute_bucket_assignment_by_size的具体逻辑:
std::vector<std::vector<size_t>> compute_bucket_assignment_by_size( const std::vector<at::Tensor>& tensors, const std::vector<size_t>& bucket_size_limits, // 桶大小限制 const std::vector<bool>& expect_sparse_gradient, const std::vector<int64_t>& tensor_indices) { //实际上,初始化时候没有传入 tensor_indices // Either expect_sparse_gradient is not specified or it has as many elements // as the vector with tensors. TORCH_INTERNAL_ASSERT( expect_sparse_gradient.empty() || (tensors.size() == expect_sparse_gradient.size())); TORCH_INTERNAL_ASSERT(tensors.size() > 0); std::vector<std::vector<size_t>> result; result.reserve(tensors.size()); // 预留大小 // Keep iterator into the size_limit vector by tensor type and device. // This is done so that we can use the consecutive bucket limits per type. std::unordered_map< BucketKey, std::vector<size_t>::const_iterator, c10::hash<BucketKey>> bucket_size_limit_iterators; // Local accumulator type for a single bucket. struct BucketAccumulator { std::vector<size_t> indices; // 桶内容,是张量列表 size_t size = 0; // 桶大小,比如若干mb }; // 桶的逻辑内容 // Keep vector of indices and size accumulator by tensor type and device. std::unordered_map<BucketKey, BucketAccumulator, c10::hash<BucketKey>> buckets; // 所有桶的列表,每一个实际桶可以认为是 BucketAccumulator for (size_t i = 0; i < tensors.size(); i++) { // 遍历传入的所有张量 const auto& tensor = tensors[i]; //拿到张量 TORCH_CHECK(!tensor.is_sparse(), "No support for sparse tensors."); // when tensor_indices is empty, the index of tensors[i] assigned to // bucket is i, otherwise the tensor index is tensor_indices[i]. auto tensor_index = i; // 就是给所有的tensor一个index,从0开始递增,一直到 tensors.size() if (!tensor_indices.empty()) { tensor_index = tensor_indices[i]; // 如果有index,就拿到张量的index } // If we expect a sparse gradient to be produced for this tensor, it cannot // be grouped together with other gradients and gets its own bucket. // 如果配置了期待sparse gradient,则把这个张量自己放入一个桶,因为没法和其他张量放在一起 if (!expect_sparse_gradient.empty() && expect_sparse_gradient[tensor_index]) { result.push_back({tensor_index}); continue; } auto key = BucketKey(tensor.scalar_type(), tensor.device()); //使用张量信息构建桶的key auto& bucket = buckets[key]; // 找到对应的桶, 拿到BucketAccumulator bucket.indices.push_back(tensor_index); // 往该桶的张量列表里面插入新张量的index,indices 是 tensor index list bucket.size += tensor.numel() * tensor.element_size();// 增加对应桶大小 // Initialize bucket size limit iterator if necessary. // 如果需要,就设定成大小限制的初始值 if (bucket_size_limit_iterators.count(key) == 0) { bucket_size_limit_iterators[key] = bucket_size_limits.begin(); } // bucket_size_limit_iterator 就是桶大小的范围, 即 [_DEFAULT_FIRST_BUCKET_BYTES, int(bucket_cap_mb * 1024 * 1024)] auto& bucket_size_limit_iterator = bucket_size_limit_iterators[key]; const auto bucket_size_limit = *bucket_size_limit_iterator; // 当前最小值限制 if (bucket.size >= bucket_size_limit) { // 如果桶的尺寸大于最小值限制,就是说目前桶的尺寸已经达到了桶的最大限制,按说需要转移到新桶了(实际上确实转移到了逻辑上的新桶,但是实际还是在现有桶内执行,因为 type, device 还是同样的,还是应该在原有桶内继续累积,不过原有桶的indice已经转移到了result之中,就相当于清空了) result.emplace_back(std::move(bucket.indices)); // 把桶内容插入到返回result,就是说,当桶尺寸过大的时候,就先插入到result之中。 bucket = BucketAccumulator(); // 重新生成桶,bucket是个引用,所以直接赋值,就相当于清空原有的桶,就是原来桶继续用,但是桶内原有的indices已经转移到了result之中。 // Advance to the next bucket size limit for this type/device. // 前进到下一个尺寸限制 auto next = bucket_size_limit_iterator + 1; if (next != bucket_size_limits.end()) { bucket_size_limit_iterator = next; } } } // Add remaining buckets. 把剩余的桶内indices插入到返回值,因为之前已经有些直接插入到了result之中 for (auto& it : buckets) { auto& bucket = it.second; if (!bucket.indices.empty()) { result.emplace_back(std::move(bucket.indices)); } } // If tensor_indices is not empty, the order of the tensors is in the gradient // ready order, so no need to sort. // If tensor_indices is empty, sort resulting buckets by the minimum tensor // index they include. We assume that the order of the tensors is the order in // which they are used (or the reverse order in which their gradients are // produced). This sorting step ensures that the buckets are ready in // consecutive order. // 如果 tensor_indices 非空,说明张量的顺序已经是梯度准备好的顺序,不需要再排序了 // 如果 tensor_indices 是空的,依据最小张量index来排序,这里假定张量的顺序是他们使用的顺序(或者说是他们梯度产生次序的反序)。这种排序可保证桶是按照连续不断的顺序准备好。 // 注意,这里就是正序排列,等到创建Reducer的时候,才反序传入:list(reversed(bucket_indices)) if (tensor_indices.empty()) { std::sort( result.begin(), result.end(), [](const std::vector<size_t>& a, const std::vector<size_t>& b) { // 对于任意两个vector,排序的依据是:用这两个vector之中最小index来排序 const auto amin = std::min_element(a.begin(), a.end()); // a中的最小index const auto bmin = std::min_element(b.begin(), b.end()); // b中的最小index return *amin < *bmin; }); } return result; // result 最终如下,里面每个vector 都对应了一个bucket,里面是都是 tensor 的 index,这里都是从小到大顺序排序。 }
result 最终如下,里面每个vector 都对应了一个bucket,里面是都是 tensor 的 index,这里都是从小到大顺序排序。
这里注意的是:因为 传入参数 tensors就是 parameters[0],而 parameters[0] 是按照 parametes() 的返回结果来的,即,模型参数以(大致)Model.parameters()
与给定模型相反的顺序分配到桶中 。使用相反顺序的原因是因为 DDP 期望梯度在反向传递期间以大约该顺序准备就绪。最终 DDP 是按model.parameters()的相反顺序启动AllReduce。
+-----------------------------------------------------------------------+ | | | <tensor index 1, tensor index 2, tensor index 3, tensor index 4> | | | | | | <tensor index 5, tensor index 6, tensor 7> | | | | | | ...... | | | | | | <tensor index 8, tensor index 9, tensor index 10, tensor index 11> | | | +-----------------------------------------------------------------------+
接下来的代码就是生成了一个Reducer。
self.reducer = dist.Reducer( parameters, list(reversed(bucket_indices)), # 利用桶index self.process_group, expect_sparse_gradient, self.bucket_bytes_cap, self.find_unused_parameters, self.gradient_as_bucket_view, param_to_name_mapping, )
我们在后续文章中会详细介绍 Reducer。
pytorch分布式系列3——分布式训练时,torch.utils.data.distributed.DistributedSampler做了什么?
pytorch分布式系列1——搞清torch.distributed.launch相关的环境变量
pytorch分布式系列2——DistributedDataParallel是如何做同步的?
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
Pytorch的nn.DataParallel
https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20
https://pytorch.org/docs/stable/distributed.html
PyTorch 源码解读之分布式训练了解一下?
实操教程|PyTorch AutoGrad C++层实现
PYTORCH 自动微分(一)
PyTorch如何加速数据并行训练?分布式秘籍大揭秘
pytorch分布式训练(二init_process_group)
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
https://pytorch.org/docs/master/notes/ddp.html
https://pytorch.org/tutorials/intermediate/dist_tuto.html
PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析
Pytorch模型中的parameter与buffer
https://pytorch.org/docs/master/notes/ddp.html