目录
Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。
本系列将通过源码分析来带领大家了解 Horovod。本文是系列第五篇,看看 Horovod 如何融合各个机器学习框架。
前面几篇链接如下:
[源码解析] 深度学习分布式训练框架 Horovod (1) --- 基础知识
[源码解析] 深度学习分布式训练框架 horovod (2) --- 从使用者角度切入
[源码解析] 深度学习分布式训练框架 horovod (3) --- Horovodrun背后做了什么
[源码解析] 深度学习分布式训练框架 horovod (4) --- 网络基础 & Driver
我们需要一些问题来引导分析:
我们接下来看看 Horovod 如何融合。
我们通过架构图来看看。
以下是网上一位同学的架构图带你了解当红炸子鸡Horovod分布式训练框架,为了尽力保持风格统一,我重新绘制如下:
他分层思路如下:
MPI在Hovorod的角色比较特殊:
我们现在知道,Horovod 内部实现(封装)了 allreduce 功能,借以实现梯度规约。
但是,hvd.allreduce又是如何实现对不同通信library的调用的呢?Horovod 使用一个统一层来完成。
首先,我们看看每个 rank 节点的运行机制,这样知道统一层的实现需要考虑哪些因素:
其次,统一层的实现是:
我们下面就逐一分析下这几个方面。
Horovod OP 类体系如下:
逻辑如下:
+---------------+ | HorovodOp | +----+-----+---++ ^ ^ ^ ^ ^ | | | | | +----------------------------+ | | | | | +---------------------+ | | +-----------------+ | | +-------+ | | | | | | | +------+-----+ +---+----+ +---------+---+ +----+--------+ +----------+--+ | AlltoallOp | | JoinOp | | AllreduceOp | | AllgatherOp | | BroadcastOp | +------------+ +--------+ ++---+----+--++ +-------------+ +-------------+ ^ ^ ^ ^ | | | | +------------------+ | | +-----------------------------------+ | +-------+ +-------------+ | | | | | +-----+--------+ +---+----------+ +------------+--------+ +------------+---+ | MPIAllreduce | | GPUAllreduce | | AdasumMPIAllreduceOp| | GlooAllreduce | +--------------+ +--------------+ +---------------------+ +----------------+
手机上如图:
HorovodOp 是所有类的基类,其主要作用是:
class HorovodOp { public: HorovodOp::HorovodOp(HorovodGlobalState* global_state) : global_state_(global_state) {} int64_t HorovodOp::NumElements(std::vector<TensorTableEntry>& entries) { int64_t num_elements = 0; for (auto& e : entries) { num_elements += e.tensor->shape().num_elements(); } return num_elements; } virtual Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) = 0; protected: HorovodGlobalState* global_state_; };
HorovodOp 的派生类有几个,其功能望文生义,比如:AllreduceOp ,AllgatherOp,BroadcastOp,AlltoallOp,JoinOp(弹性训练使用)。
我们以 AllreduceOp 为例,其定义如下,主要函数是:
class AllreduceOp : public HorovodOp { public: virtual Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) = 0; virtual bool Enabled(const ParameterManager& param_manager, const std::vector<TensorTableEntry>& entries, const Response& response) const = 0; protected: virtual void MemcpyInFusionBuffer(const std::vector<TensorTableEntry>& entries, const void*& fused_input_data, void*& buffer_data, size_t& buffer_len); ...... };
接下来是具体的实现类,和具体通讯框架有关,比如:MPIAllreduce,GPUAllreduce,AdasumMPIAllreduceOp,GlooAllreduce。在 common/ops 中可以看到具体种类有 NCCL/Gloo/MPI 等等。
这些 op 由 op_manager 管理,op_manager 会根据优先级找到可以用来计算的 op 进行计算,比如:
我们以 MPIAllreduce 为例进行说明,其定义如下:
class MPIAllreduce : public AllreduceOp { public: MPIAllreduce(MPIContext* mpi_context, HorovodGlobalState* global_state); Status Execute(std::vector<TensorTableEntry>& entries, const Response& response) override; bool Enabled(const ParameterManager& param_manager, const std::vector<TensorTableEntry>& entries, const Response& response) const override; protected: MPIContext* mpi_context_; };
具体 Execute 就是调用 MPI_Allreduce 来完成操作,比如:
Status MPIAllreduce::Execute(std::vector<TensorTableEntry>& entries, const Response& response) { // Copy memory into the fusion buffer. ... MemcpyInFusionBuffer(entries, fused_input_data, buffer_data, buffer_len); ... // Do allreduce. timeline.ActivityStartAll(entries, MPI_ALLREDUCE); const void* sendbuf = entries.size() > 1 || fused_input_data == buffer_data ? MPI_IN_PLACE : fused_input_data; int op = MPI_Allreduce(sendbuf, buffer_data, (int) num_elements, mpi_context_->GetMPIDataType(first_entry.tensor), mpi_context_->GetMPISumOp(first_entry.tensor->dtype()), mpi_context_->GetMPICommunicator(Communicator::GLOBAL)); // Copy memory out of the fusion buffer. ... MemcpyOutFusionBuffer(buffer_data, entries); ... }
因为 Horovod 主要是由一个后台线程完成梯度操作,所以让我们看看这个后台线程之中如何调用到 Hovorod OP。
Horovod的工作流程比较简单:
Horovod 的后台线程拿到需要融合的tensor 之后,会调用 PerformOperation 进行具体的collective 操作。在 PerformOperation 之中有调用
void PerformOperation(Response response, HorovodGlobalState& state) { ...... Status status; try { // 进行collective的操作 status = op_manager->ExecuteOperation(entries, response); } catch (const std::exception& ex) { status = Status::UnknownError(ex.what()); } ...... }
逻辑如下:
+---------------------------------+ | | +-----------------------------+ | BackgroundThreadLoop | | | | | | OperationManager | | +--------------------------+ | | | | | RunLoopOnce | | | | | | | | | | | | | | | | | | ComputeResponseList | | +----------> ExecuteOperation | | | + | | | | | | | | | | | | | | | | | | | | | | | | | | | 1 | | | | v | | | | | | | | | | | | | | PerformOperation +----------+ | | | | | | | | | +--------------------------+ | | | | | | | +---------------------------------+ +-----------------------------+
然后 status = op_manager->ExecuteOperation(entries, response) 会调用不同的 op->Execute(entries, response) 执行reduce 运算。
比如 ALLREDUCE 就调用了 ExecuteAllreduce(entries, response)。
Status OperationManager::ExecuteOperation(std::vector<TensorTableEntry>& entries, const Response& response) const { if (response.response_type() == Response::ALLREDUCE) { return ExecuteAllreduce(entries, response); // 这里 } else if (response.response_type() == Response::ALLGATHER) { return ExecuteAllgather(entries, response); } else if (response.response_type() == Response::BROADCAST) { return ExecuteBroadcast(entries, response); } else if (response.response_type() == Response::ALLTOALL) { return ExecuteAlltoall(entries, response); } else if (response.response_type() == Response::JOIN) { return ExecuteJoin(entries, response); } ..... }
逻辑如下:
+---------------------------------+ | | +-----------------------+ | BackgroundThreadLoop | | | | | | OperationManager | | +--------------------------+ | | | | | RunLoopOnce | | | | | | | | | | | | | | | | | | ComputeResponseList | | +----------> ExecuteOperation | | | + | | | | + | | | | | | | | | | | | | | | | | | | | | | | | | 1 | | 2 | | | v | | | | | | | | | | | | | | | | PerformOperation +----------+ | v | | | | | | ExecuteAllreduce | | +--------------------------+ | | | | | | | +---------------------------------+ +-----------------------+
具体就是从 allreduce_ops_ 之中选取一个合适的 op,调用其Execute。
Status OperationManager::ExecuteAllreduce(std::vector<TensorTableEntry>& entries, const Response& response) const { for (auto& op : allreduce_ops_) { if (op->Enabled(*param_manager_, entries, response)) { return op->Execute(entries, response); } } }
allreduce_ops_ 是从哪里来的?在 OperationManager 构建函数中有。
allreduce_ops_(std::move(allreduce_ops)),
所以我们看看allreduce_ops 如何构建。
在 CreateOperationManager 之中对 allreduce_ops 进行添加。
可以看到,添加的类型大致如下:
OperationManager* CreateOperationManager(HorovodGlobalState& state) { // Order of these operations is very important. Operations will be checked // sequentially from the first to the last. The first 'Enabled' operation will // be executed. std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops; std::vector<std::shared_ptr<AllgatherOp>> allgather_ops; std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops; std::vector<std::shared_ptr<AllreduceOp>> adasum_ops; std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops; #if HAVE_MPI && HAVE_GPU // 如果配置了MPI if (mpi_context.IsEnabled()) { #if HOROVOD_GPU_ALLREDUCE == 'M' allreduce_ops.push_back(std::shared_ptr<AllreduceOp>( new MPI_GPUAllreduce(&mpi_context, &gpu_context, &state))); allreduce_ops.push_back( std::shared_ptr<AllreduceOp>(new NCCLHierarchicalAllreduce( &nccl_context, &mpi_context, &gpu_context, &state))); #elif HAVE_DDL && HOROVOD_GPU_ALLREDUCE == 'D' //如果配置了DDL allreduce_ops.push_back(std::shared_ptr<AllreduceOp>( new DDLAllreduce(&ddl_context, &gpu_context, &state))); #endif #if HAVE_NCCL && HOROVOD_GPU_ALLREDUCE == 'N'//如果配置了NCCL allreduce_ops.push_back(std::shared_ptr<AllreduceOp>( new NCCLAllreduce(&nccl_context, &gpu_context, &state))); #endif ......
因此我们知道,如何使用这些 Operation。
流程如下:
+---------------------------------+ | | +-----------------------+ | BackgroundThreadLoop | | | | | | OperationManager | | +--------------------------+ | | | | | RunLoopOnce | | | | | | | | | | | | | | | | +--> GPUAllreduce | | ComputeResponseList | | +----------> ExecuteOperation | | | | + | | | | + | | | | | | | | | | | +--> NCCLHierarchicalAllreduce | | | | | | | | | | | | | | | | 1 | | 2 | | | | v | | | | | | +--> NCCLAllreduce | | | | | | | | | | | PerformOperation +----------+ | v | | | | | | | ExecuteAllreduce | +--> DDLAllreduce | +--------------------------+ | | + | | | | | | | | +---------------------------------+ | | | +--> GlooAllreduce | | allreduce_ops----------+ | | | | +----------------+ | | | +--> | MPIAllreduce | +-----------------------+ | | | | | +----------------------------------> Execute | 3 | | +----------------+
手机如下:
回顾下每个 rank 节点的运行机制,每个rank有两个thread:
到目前为止,我们其实分析的是第二部分:background thread 是负责通讯和allreduce。
下面我们要看看第一部分的某些环节,即 Tensorflow 这样的框架是如何把 tensor & op 发送给 后台线程。
Horovod 定义的这套HVD OP是跟具体深度学习框架无关的,比如使用 TensorFlow时候,是无法直接insert到TF Graph中执行的,所以还需要注册TF的OP。
Horovod 针对各个框架定义了不同的实现。
针对 TensorFlow 模型分布式训练,Horovod 开发了 TensorFlow ops 来实现 Tensorflow tensor 的 AllReduce。而且这些 op 可以融入 TensorFlow 的计算图中,利用 TensorFlow graph 的 runtime 实现计算与通信的 overlapping,从而提高通信效率。
以 TensorFlow 模型的 AllReduce 分布式训练为例,Horovod 开发了 allreduce ops 嵌入 TensorFlow 的反向计算图中,从而获取 TensorFlow 反向计算的梯度并进行梯度汇合。allreduce ops 可以通过调用 gloo 提供的 allreduce API 来实现梯度汇合的。
比如 在 horovod/tensorflow/mpi_ops.cc 之中,就针对 tensorflow 定义了 HorovodAllreduceOp。
对于 TensorFlow,可以自定义 Operation,即如果现有的库没有涵盖你想要的操作, 你可以自己定制一个。
为了使定制的 Op 能够兼容原有的库,你必须做以下工作:
HorovodAllreduceOp 就是一种TF Async OP,然后其内部实现中调用了 HVD OP,这是比较巧妙的组合模式。显然继承了TP Aysnc OP的HorovodAllReduce 是可以插入到TF Graph里面,然后被正常执行的。
添加新的OP需要3步,我们具体看看。
第一步是定义Op 的接口,使用REGISTER_OP()向 TensorFlow 系统注册来定义 Op 的接口,该OP就是HorovodAllreduceOp。
// 1. 定义 Op 的接口 // REGISTER_OP()向 TensorFlow 系统注册来定义 Op 的接口,该OP就是HorovodAllreduceOp. // 在注册时, 指定 Op 的名称: REGISTER_OP("HorovodAllreduce") // 输入(类型和名称): Input("tensor: T") // 输出(类型和名称): Output("sum: T") // 和所需要任何 属性的文档说明Doc(R"doc(...)doc"); // // 该 Op 接受一个 T 类型 tensor 作为输入, T 类型可以是{int32, int64, float32, float64} // 输出一个 T 类型 tensor sum,sum是在所有的MPI进程中求和 REGISTER_OP("HorovodAllreduce") .Attr("T: {int32, int64, float16, float32, float64}") .Attr("reduce_op: int") .Attr("prescale_factor: float") .Attr("postscale_factor: float") .Attr("ignore_name_scope: bool = False") .Input("tensor: T") .Output("sum: T") .SetShapeFn([](shape_inference::InferenceContext* c) { c->set_output(0, c->input(0)); return Status::OK(); });
第二步是为 Op 实现 kernel。在定义接口之后, 每一个实现称之为一个 "kernel",提供一个或多个 Op 的实现,即可以存在多个 kernel。
HorovodAllreduceOp 类继承 AsyncOpKernel,覆盖 其ComputeAsync() 方法。ComputeAsync()方法提供一个类型为 OpKernelContext* 的参数 context, 用于访问一些有用的信息, 例如输入和输出的 tensor。
在 ComputeAsync 里,会把这一 AllReduce 的请求入队。可以看到,在 TensorFlow 支持的实现上,Horovod 与百度大同小异。都是自定义了 AllReduce Op,在 Op 中把请求入队。
// 2. 为 Op 实现 kernel。 // 在定义接口之后, 每一个实现称之为一个 "kernel",提供一个或多个 Op 的实现,即可以存在多个 kernel。 // 为这些 kernel 的每一个创建一个对应的类, 继承 AsyncOpKernel, 覆盖 ComputeAsync 方法。 // ComputeAsync 方法提供一个类型为 OpKernelContext* 的参数 context, 用于访问一些有用的信息, 例如输入和输出的 tensor class HorovodAllreduceOp : public AsyncOpKernel { public: // 防止类构造函数的隐式自动转换,只能显示调用该构造函数 explicit HorovodAllreduceOp(OpKernelConstruction* context) : AsyncOpKernel(context) { OP_REQUIRES_OK(context, context->GetAttr("reduce_op", &reduce_op_)); OP_REQUIRES_OK(context, context->GetAttr("prescale_factor", &prescale_factor_)); OP_REQUIRES_OK(context, context->GetAttr("postscale_factor", &postscale_factor_)); OP_REQUIRES_OK(context, context->GetAttr("ignore_name_scope", &ignore_name_scope_)); } // 重写ComputeAsync()方法 void ComputeAsync(OpKernelContext* context, DoneCallback done) override { OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()), done); auto node_name = name(); if (ignore_name_scope_) { auto pos = node_name.find_last_of('/'); if (pos != std::string::npos) { node_name = node_name.substr(pos + 1); } } auto device = GetDeviceID(context); auto tensor = context->input(0); horovod::common::ReduceOp reduce_op = static_cast<horovod::common::ReduceOp>(reduce_op_); Tensor* output; OP_REQUIRES_OK_ASYNC( context, context->allocate_output(0, tensor.shape(), &output), done); // ReadyEvent makes sure input tensor is ready, and output is allocated. // shared_ptr 是一个标准的共享所有权的智能指针, 允许多个指针指向同一个对象 auto ready_event = std::shared_ptr<common::ReadyEvent>(RecordReadyEvent(context)); // 模板函数 std::make_shared 可以返回一个指定类型的 std::shared_ptr auto hvd_context = std::make_shared<TFOpContext>(context); auto hvd_tensor = std::make_shared<TFTensor>(tensor); auto hvd_output = std::make_shared<TFTensor>(*output); // 将张量的Allreduce操作OP加入队列 auto enqueue_result = EnqueueTensorAllreduce( hvd_context, hvd_tensor, hvd_output, ready_event, node_name, device, [context, done](const common::Status& status) { context->SetStatus(ConvertStatus(status)); done(); }, reduce_op, (double) prescale_factor_, (double) postscale_factor_); OP_REQUIRES_OK_ASYNC(context, ConvertStatus(enqueue_result), done); } private: int reduce_op_; // Using float since TF does not support double OP attributes float prescale_factor_; float postscale_factor_; bool ignore_name_scope_; };
第三步是注册OP到 TensorFlow 系统。
// 3. 注册OP到 TensorFlow 系统 // 注册时可以指定该 kernel 运行时的多个约束条件. 例如可以指定一个 kernel 在 CPU 上运行, 另一个在 GPU 上运行 REGISTER_KERNEL_BUILDER(Name("HorovodAllreduce").Device(DEVICE_CPU), HorovodAllreduceOp); // 如果执行了GPU #if HOROVOD_GPU_ALLREDUCE REGISTER_KERNEL_BUILDER(Name("HorovodAllreduce").Device(DEVICE_GPU), HorovodAllreduceOp); #endif
具体可以参考 add new op,里面规范了 Tensorflow 自定义算子的实现。
请注意,生成的函数将获得一个蛇形名称(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名为 ZeroOut,则 Python 函数将称为 zero_out。
C++ 的定义是驼峰的,生成出来的 python 函数是下划线小写的,所以最后对应的是,适配Op的代码在 horovod/tensorflow 目录下面。
C++ | Python |
---|---|
HorovodAllgather | horovod_allgather |
HorovodAllreduce | horovod_allreduce |
HorovodBroadcast | horovod_broadcast |
所以,在 python 世界中,当 _DistributedOptimizer 调用 compute_gradients 来优化的时候,会通过 _allreduce 来调用到 MPI_LIB.horovod_allreduce,也就是调用到 HorovodAllreduceOp 这里。
具体 _DistributedOptimizer 如何调用到 _allreduce,我们在后续文章中会讲解。
def _allreduce(tensor, name=None, op=Sum): if name is None and not _executing_eagerly(): name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name) return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op)
HorovodAllreduceOp 类会调用 EnqueueTensorAllreduce() 方法,将张量的Allreduce操作OP加入HorovodGlobalState的队列中。
EnqueueTensorAllreduce 位于:/horovod/common/operations.cc。
具体方法就是构建contexts,callbacks等各种支撑数据,然后调用 EnqueueTensorAllreduces 进行处理。
// Contexts and controller must be initialized and the background thread // must be running before this function is called. Status EnqueueTensorAllreduce(std::shared_ptr<OpContext> context, std::shared_ptr<Tensor> tensor, std::shared_ptr<Tensor> output, std::shared_ptr<ReadyEvent> ready_event, std::string name, const int device, StatusCallback callback, ReduceOp reduce_op, double prescale_factor, double postscale_factor) { // Wrap inputs in std::vector and pass onto multi tensor implementation std::vector<std::shared_ptr<OpContext>> contexts; std::vector<std::shared_ptr<Tensor>> tensors; std::vector<std::shared_ptr<Tensor>> outputs; std::vector<std::shared_ptr<ReadyEvent>> ready_events; std::vector<std::string> names; std::vector<StatusCallback> callbacks; contexts.emplace_back(std::move(context)); tensors.emplace_back(std::move(tensor)); outputs.emplace_back(std::move(output)); ready_events.emplace_back(std::move(ready_event)); names.emplace_back(std::move(name)); callbacks.emplace_back(std::move(callback)); return EnqueueTensorAllreduces(contexts, tensors, outputs, ready_events, names, device, callbacks, reduce_op, prescale_factor, postscale_factor); }
EnqueueTensorAllreduces 主要就是调用 AddToTensorQueueMulti 向 tensor queue 提交操作,方法逻辑为:
具体代码如下:
Status EnqueueTensorAllreduces(std::vector<std::shared_ptr<OpContext>>& contexts, std::vector<std::shared_ptr<Tensor>>& tensors, std::vector<std::shared_ptr<Tensor>>& outputs, std::vector<std::shared_ptr<ReadyEvent>>& ready_events, std::vector<std::string>& names, const int device, std::vector<StatusCallback>& callbacks, ReduceOp reduce_op, double prescale_factor, double postscale_factor) { Status status; ...... std::vector<Request> messages; std::vector<TensorTableEntry> entries; messages.reserve(tensors.size()); entries.reserve(tensors.size()); for (int n = 0; n < tensors.size(); ++n) { // 遍历需要 reduce 的 tensor // 把tensor组装成一个Request Request message; message.set_request_rank(horovod_global.controller->GetRank()); message.set_tensor_name(names[n]); message.set_tensor_type(tensors[n]->dtype()); message.set_device(device); message.set_prescale_factor(prescale_factor); message.set_postscale_factor(postscale_factor); if (reduce_op == ReduceOp::ADASUM) { message.set_request_type(Request::ADASUM); } else { message.set_request_type(Request::ALLREDUCE); } message.set_tensor_shape(tensors[n]->shape().to_vector()); messages.push_back(std::move(message)); TensorTableEntry e; e.tensor_name = names[n]; e.context = std::move(contexts[n]); // input and output can be the same, only move when safe if (tensors[n] != outputs[n]) { e.tensor = std::move(tensors[n]); e.output = std::move(outputs[n]); } else { e.tensor = tensors[n]; e.output = outputs[n]; } e.ready_event = std::move(ready_events[n]); e.device = device; e.callback = std::move(callbacks[n]); // 针对每个 tensor,会创建对应 TensorTableEntry,用于保存tensor 的权重,message 主要是一些 元信息 metadata entries.push_back(std::move(e)); } std::string tensors_enqueued; for (const auto& n : names) { tensors_enqueued += n + "; "; } // Only create groups larger than 1 tensor, unless disable_group_fusion is requested. // In that case, even single tensor groups are created to enforce disabling fusion. if (tensors.size() > 1 || horovod_global.disable_group_fusion) { auto group_id = horovod_global.group_table.RegisterGroup(std::move(names)); for (auto& message : messages) { message.set_group_id(group_id); } } // 往 GlobalState 的 tensor_queue 里面添加 status = horovod_global.tensor_queue.AddToTensorQueueMulti(entries, messages); return status; }
Tensor 和 op 主要是添加到 TensorQueue,具体就是调用 如下:
status = horovod_global.tensor_queue.AddToTensorQueueMulti(entries, messages);
AddToTensorQueue 和 AddToTensorQueueMulti 函数基本逻辑类似,只不过后者是处理多个message,具体如下:
// Add a TensorTableEntry as well as its message to the queue. Status TensorQueue::AddToTensorQueue(TensorTableEntry& e, Request& message) { std::lock_guard<std::mutex> guard(mutex_); if (tensor_table_.find(e.tensor_name) != tensor_table_.end()) { return DUPLICATE_NAME_ERROR; } tensor_table_.emplace(e.tensor_name, std::move(e)); message_queue_.push(std::move(message)); return Status::OK(); } Status TensorQueue::AddToTensorQueueMulti(std::vector<TensorTableEntry>& entries, std::vector<Request>& messages) { std::lock_guard<std::mutex> guard(mutex_); for (int i = 0; i < entries.size(); ++i) { if (tensor_table_.find(entries[i].tensor_name) != tensor_table_.end()) { return DUPLICATE_NAME_ERROR; } tensor_table_.emplace(entries[i].tensor_name, std::move(entries[i])); message_queue_.push(std::move(messages[i])); } return Status::OK(); }
这样就添加到了 message queue,我们的逻辑也完成了。
总结Horovod的梯度同步更新以及AllReduce操作的全过程如下:
具体如下图:
+----------------------+ + | Computation Graph | Execution Thread | Background Communication Thread +---------+------------+ | | | | | v | | +----------------+ | | | | | TF Aysnc Op | | | | | +------+---------+ | | | | | | | v | +-----------------------+ + | HorovodGlobalState | +---------------------+ EnqueueTensorAllreduce(tensor, op) | | | | +---------------+ | | | HorovodAllreduceOp | +--------------------------------------> | HorovodOp | +-------------------------> message_queue | | | +----+-----+---++ | | +---------------------+ ^ ^ ^ ^ ^ | tensor_table | | | | | | | | +----------------------------+ | | | | +-----------------------+ | +---------------------+ | | +-----------------+ | | +-------+ | | | | | | | +------+-----+ +---+----+ +---------+---+ +----+--------+ +----------+--+ | AlltoallOp | | JoinOp | | AllreduceOp | | AllgatherOp | | BroadcastOp | +------------+ +--------+ ++---+----+--++ +-------------+ +-------------+ ^ ^ ^ ^ | | | | +------------------+ | | +-----------------------------------+ | +-------+ +-------------+ | | | | | +-----+--------+ +---+----------+ +------------+--------+ +------------+---+ | MPIAllreduce | | GPUAllreduce | | AdasumMPIAllreduceOp| | GlooAllreduce | +--------------+ +--------------+ +---------------------+ +----------------+
手机如下: