在前文中,我们介绍了PipeDream的总体架构,Profile阶段,计算分区阶段和模型转换阶段,本文我们介绍运行时执行引擎,这是一个统一基础设施层。
流水线并行其他文章链接如下:
[源码解析] 深度学习流水线并行Gpipe(1)---流水线基本实现
[源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积
[源码解析] 深度学习流水线并行 GPipe(3) ----重计算
[源码解析] 深度学习流水线并行之PipeDream(1)--- Profile阶段
[源码解析] 深度学习流水线并行 PipeDream(2)--- 计算分区
[源码解析] 深度学习流水线并行 PipeDream(3)--- 转换模型
在前文,我们经历了三个阶段:profile,计算分区,模型转换,目前就得到了若干python文件&配置文件。PipeDream 加载这些文件之后,就可以进行训练。
所以从本文开始,我们介绍训练所需要的各种支撑系统,比如运行时执行引擎。主要是看看一个深度学习训练运行时应该包括什么功能。
结合之前的分析和我们先思考为何要实现一个运行时,以及针对深度学习(流水线并行)需要实现什么功能。
首先看看PyTorch 的特点:
RPC 功能是在 PyTorch 1.5 版本中正式引入的,时间是2020-06-12,具体如下。
Distributed RPC framework APIs [Now Stable]
The
torch.distributed.rpc
package aims at supporting a wide range of distributed training paradigms that do not fit intoDistributedDataParallel
. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. Features in thetorch.distributed.rpc
package can be categorized into four main sets of APIs.
- The RPC API allows running a function on a specified destination worker with given arguments and fetches the return value or creates a distributed reference to the return value.
- The RRef (Remote REFerence) serves as a reference to an object on another worker. A worker holding an RRef can explicitly request copies of the object, and it can also share the light-weight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
- With Distributed Autograd, applications can automatically compute gradients even if a model is split on multiple workers using RPC. This is achieved by stitching together local autograd graphs at RPC boundaries in the forward pass and reaching out to participants to transparently launch local autograd in the backward pass.
- The Distributed Optimizer uses gradients computed by Distributed Autograd to update model parameters. Its constructor takes a local optimizer (e.g.,
SGD
,Adagrad
, etc.) and a list of parameter RRefs, and itsstep()
function automatically uses the local optimizer to update parameters on all distinct RRef owner workers.
但是 PipeDream 论文是在 2019 年发布,这就意味着 PipeDream无法精准利用 PyTorch RPC,只能自己实现通信逻辑,即对计算图的支撑。
其次看看PipeDream的特性:
综合以上两点,这就意味着,对于PipeDream来说,单纯的 DDP,模型并行和 autograd 功能无法满足我们的需求,必须结合起来使用。
PipeDream需要自己实现至少:
所以,下面我们结合这些功能点,做具体分析。
通过源码中的样例我们可以看到,可以在多个节点上分别运行 main_with_runtime.py 脚本,每个脚本启动参数不同,比如 rank 就不同,这样各个节点之上就运行了不同的阶段所对应的模型。
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 0 --local_rank 0 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 1 --local_rank 1 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 2 --local_rank 2 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 3 --local_rank 3 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
使用 runtime 的总体逻辑以如下文件为例 :runtime/translation/main_with_runtime.py。主要逻辑是:
总体代码如下:
def main(): # 解析输入参数 global args, best_prec1 args = parser.parse_args() # Special case handling for GNMT model l2_promote() torch.cuda.set_device(args.local_rank) # build tokenizer tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME)) # define loss function criterion = build_gnmt_criterion( vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1) # create stages of the model # 加载,生成模型 module = importlib.import_module(args.module) args.arch = module.arch() # 依据模块来构建模型 model = module.model(criterion) # 依据参数进行配置比如输入大小,batch size等 input_size = [args.max_length_train, args.batch_size] training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size], "input2": input_size, "target": [args.max_length_train * args.batch_size], "target_length": [args.batch_size]} dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64, "target": torch.int64, "target_length": torch.int32} inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0} target_tensor_names = {"target", "target_length"} # 遍历模型的每个层(跳过最后loss层) for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): # Skip last layer (loss). input_tensors = [] # 遍历每层的输入,构建输入张量 for module_input in inputs: if module_input in inputs_module_destinations: inputs_module_destinations[module_input] = module_id input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]), dtype=dtypes[module_input])#.cuda() input_tensors.append(input_tensor) #stage.cuda() # PyTorch should not maintain metadata for a backward pass on # synthetic inputs. Without the following line, the runtime is # as much as 1.5x slower in a full DP configuration. with torch.no_grad(): # 通过调用stage对应的forward函数,构建出输出 output_tensors = stage(*tuple(input_tensors)) if not type(output_tensors) is tuple: output_tensors = [output_tensors] # 遍历每层的输出,设置其类型和形状 for output, output_tensor in zip(outputs, list(output_tensors)): # output 是 ['out2', 'out1'] training_tensor_shapes[output] = list(output_tensor.size()) dtypes[output] = output_tensor.dtype # 构建输出值张量类型 eval_tensor_shapes = {} for key in training_tensor_shapes: eval_tensor_shapes[key] = tuple( training_tensor_shapes[key]) training_tensor_shapes[key] = tuple( training_tensor_shapes[key]) # 加载配置文件 configuration_maps = { 'module_to_stage_map': None, 'stage_to_rank_map': None, 'stage_to_depth_map': None } if args.config_path is not None: json_config_file = json.load(open(args.config_path, 'r')) configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None) configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None) configuration_maps['stage_to_rank_map'] = { int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()} configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None) # 构建一个 StageRuntime r = runtime.StageRuntime( model=model, distributed_backend=args.distributed_backend, fp16=args.fp16, loss_scale=args.loss_scale, training_tensor_shapes=training_tensor_shapes, eval_tensor_shapes=eval_tensor_shapes, training_tensor_dtypes=dtypes, inputs_module_destinations=inputs_module_destinations, target_tensor_names=target_tensor_names, configuration_maps=configuration_maps, master_addr=args.master_addr, rank=args.rank, local_rank=args.local_rank, num_ranks_in_server=args.num_ranks_in_server, verbose_freq=args.verbose_frequency, model_type=runtime.TRANSLATION, enable_recompute=args.recompute) # stage needed to determine if current stage is the first stage # num_stages needed to determine if current stage is the last stage # num_ranks needed to determine number of warmup_minibatches in case of pipelining args.stage = r.stage args.num_stages = r.num_stages args.num_ranks = r.num_ranks if not is_first_stage(): args.synthetic_data = True # define optimizer if args.no_input_pipelining: num_versions = 1 else: # number of versions is the total number of machines following the current # stage, shared amongst all replicas in this stage num_versions = r.num_warmup_minibatches + 1 # if specified, resume from checkpoint if args.resume: checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage) assert os.path.isfile(checkpoint_file_path) print("=> loading checkpoint '{}'".format(checkpoint_file_path)) checkpoint = torch.load(checkpoint_file_path) args.start_epoch = checkpoint['epoch'] best_prec1 = checkpoint['best_prec1'] r.load_state_dict(checkpoint['state_dict']) print("=> loaded checkpoint '{}' (epoch {})" .format(checkpoint_file_path, checkpoint['epoch'])) # TODO: make this configurable by args # 建立 optimizer use_adam_optimizer = True if use_adam_optimizer: optimizer = adam.AdamWithWeightStashing( modules=r.modules(), master_parameters=r.master_parameters, model_parameters=r.model_parameters, loss_scale=args.loss_scale, num_versions=num_versions, lr=args.lr, betas=(0.9,0.999), weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency, macrobatch=args.macrobatch) else: optimizer = sgd.SGDWithWeightStashing( modules=r.modules(), master_parameters=r.master_parameters, model_parameters=r.model_parameters, loss_scale=args.loss_scale, num_versions=num_versions, lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency) if args.resume: optimizer.load_state_dict(checkpoint['optimizer']) cudnn.benchmark = True # 加载 dataset train_dataset = LazyParallelDataset( src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME), tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME), tokenizer=tokenizer, min_len=args.min_length_train, max_len=args.max_length_train, sort=False, max_size=None) val_dataset = ParallelDataset( src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME), tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME), tokenizer=tokenizer, min_len=args.min_length_train, max_len=args.max_length_train, sort=True) distributed_sampler = False if configuration_maps['stage_to_rank_map'] is not None: num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0]) if num_ranks_in_first_stage > 1: distributed_sampler = True # TODO: fix random seeds train_loader = train_dataset.get_loader( batch_size=args.batch_size, seeds=range(args.epochs), batch_first=False, shuffle=True, bucketing=not args.no_bucketing, num_workers=args.workers, world_size=r.num_ranks_in_first_stage, rank=r.rank_in_stage if r.stage == 0 else 0 ) val_loader = val_dataset.get_loader( batch_size=args.batch_size, batch_first=False, shuffle=True, num_workers=args.workers, world_size=r.num_ranks_in_first_stage, seeds=range(args.epochs), rank=r.rank_in_stage if r.stage == 0 else 0 ) # if checkpoint is loaded, start by running validation if args.resume: assert args.start_epoch > 0 validate(val_loader, r, args.start_epoch-1) # 进行训练,保存checkpoint for epoch in range(args.start_epoch, args.epochs): if distributed_sampler: train_loader.sampler.set_epoch(epoch) adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy) # train or run forward pass only for one epoch if args.forward_only: validate(val_loader, r, epoch) else: train(train_loader, r, optimizer, epoch) # evaluate on validation set prec1 = validate(val_loader, r, epoch) if r.stage != r.num_stages: prec1 = 0 # remember best prec@1 and save checkpoint best_prec1 = max(prec1, best_prec1) should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0 if args.checkpoint_dir and should_save_checkpoint: save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': r.state_dict(), 'best_prec1': best_prec1, 'optimizer' : optimizer.state_dict(), 'tokenizer': tokenizer.get_state() }, args.checkpoint_dir, r.stage, epoch)
我们先来看看如何加载模型。
模型文件在上文中生成,所以这里加载模型文件,我们以 ../translation/models/gnmt/gpus=4/ 下的模型文件为例。
这里的__init__
文件如下:
from .gnmt import GNMTSplit from .stage0 import Stage0 from .stage1 import Stage1 from .stage2 import Stage2 from .stage3 import Stage3 def arch(): return "gnmt" def model(criterion): return [ (Stage0(), ["input0", "input1"], ["out2", "out1"]), (Stage1(), ["out2", "input1", "input2", "out1"], ["out3", "out7"]), (Stage2(), ["out3", "out7"], ["out8", "out9", "out10"]), (Stage3(), ["out8", "out9", "out10"], ["out12"]), (criterion, ["out12"], ["loss"]) ] def full_model(): return GNMTSplit()
具体每个 item 的格式如下:
(stage, inputs, outputs)
所以就需要按照这个格式来加载。
具体加载方法如下:
# create stages of the model module = importlib.import_module(args.module) args.arch = module.arch()
得到module如下:
module = {module} <module 'translation.models.gnmt.gpus=4' from '../translation/models/gnmt/gpus=4/__init__.py'> GNMTSplit = {type} <class 'translation.models.gnmt.gpus=4.gnmt.GNMTSplit'> Stage0 = {type} <class 'translation.models.gnmt.gpus=4.stage0.Stage0'> Stage1 = {type} <class 'translation.models.gnmt.gpus=4.stage1.Stage1'> Stage2 = {type} <class 'translation.models.gnmt.gpus=4.stage2.Stage2'> Stage3 = {type} <class 'translation.models.gnmt.gpus=4.stage3.Stage3'> gnmt = {module} <module 'translation.models.gnmt.gpus=4.gnmt' from '../translation/models/gnmt/gpus=4/gnmt.py'> stage0 = {module} <module 'translation.models.gnmt.gpus=4.stage0' from '../translation/models/gnmt/gpus=4/stage0.py'> stage1 = {module} <module 'translation.models.gnmt.gpus=4.stage1' from '../translation/models/gnmt/gpus=4/stage1.py'> stage2 = {module} <module 'translation.models.gnmt.gpus=4.stage2' from '../translation/models/gnmt/gpus=4/stage2.py'> stage3 = {module} <module 'translation.models.gnmt.gpus=4.stage3' from '../translation/models/gnmt/gpus=4/stage3.py'>
接下来会依据模块来构建模型。
model = module.model(criterion)
这里 criterion 是 LabelSmoothing() 。
在 model(criterion) 调用之中,会逐一调用 Stage0() ~ Stage3(),构建每个层。
比如Stage3 会调用到 __init__
函数。
class Stage3(torch.nn.Module): def __init__(self): super(Stage3, self).__init__() self.layer5 = torch.nn.LSTM(2048, 1024) self.layer8 = Classifier(1024, 32320)
得到了model,具体如下。
model = {list: 5} 0 = {tuple: 3} 0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\n (layer7): LSTM(2048, 1024)\n (layer9): Dropout(p=0.2, inplace=False)\n) 1 = {list: 2} ['input0', 'input1'] 2 = {list: 2} ['out2', 'out1'] __len__ = {int} 3 1 = {tuple: 3} 0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n) 1 = {list: 4} ['out2', 'input1', 'input2', 'out1'] 2 = {list: 2} ['out3', 'out7'] __len__ = {int} 3 2 = {tuple: 3} 0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\n (layer9): LSTM(2048, 1024)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer13): LSTM(2048, 1024)\n (layer16): Dropout(p=0.2, inplace=False)\n) 1 = {list: 2} ['out3', 'out7'] 2 = {list: 3} ['out8', 'out9', 'out10'] __len__ = {int} 3 3 = {tuple: 3} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {list: 3} ['out8', 'out9', 'out10'] 2 = {list: 1} ['out12'] __len__ = {int} 3 4 = {tuple: 3} (LabelSmoothing(), ['out12'], ['loss']) 0 = {LabelSmoothing} LabelSmoothing() 1 = {list: 1} ['out12'] 2 = {list: 1} ['loss'] __len__ = {int} 3 __len__ = {int} 5
模型加载完之后,开始设置输入和输出,具体逻辑是:
需要注意的是每个子模块的格式如下:
( Stage0(), ["input0", "input1"], # 输入 ["out2", "out1"] # 输出 )
代码注释如下:
# 依据参数进行配置比如输入大小,batch size等 input_size = [args.max_length_train, args.batch_size] training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size], "input2": input_size, "target": [args.max_length_train * args.batch_size], "target_length": [args.batch_size]} dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64, "target": torch.int64, "target_length": torch.int32} inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0} target_tensor_names = {"target", "target_length"} # 遍历模型的每个层(跳过最后loss层) for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): # Skip last layer (loss). input_tensors = [] # 遍历每层的输入,构建输入张量 for module_input in inputs: if module_input in inputs_module_destinations: inputs_module_destinations[module_input] = module_id input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]), dtype=dtypes[module_input]).cuda() input_tensors.append(input_tensor) stage.cuda() # PyTorch should not maintain metadata for a backward pass on # synthetic inputs. Without the following line, the runtime is # as much as 1.5x slower in a full DP configuration. with torch.no_grad(): # 通过调用stage对应的forward函数,构建出输出 output_tensors = stage(*tuple(input_tensors)) if not type(output_tensors) is tuple: output_tensors = [output_tensors] # 遍历每层的输出,设置其类型和形状 for output, output_tensor in zip(outputs, list(output_tensors)): # output 是 ['out2', 'out1'] training_tensor_shapes[output] = list(output_tensor.size()) dtypes[output] = output_tensor.dtype # 构建输出值张量类型 eval_tensor_shapes = {} for key in training_tensor_shapes: eval_tensor_shapes[key] = tuple( training_tensor_shapes[key]) training_tensor_shapes[key] = tuple( training_tensor_shapes[key])
得到了输出的形状和类型。
dtypes = {dict: 13} 'input0' = {dtype} torch.int64 'input1' = {dtype} torch.int64 'input2' = {dtype} torch.int64 'target' = {dtype} torch.int64 'target_length' = {dtype} torch.int32 'out2' = {dtype} torch.float32 'out1' = {dtype} torch.float32 'out3' = {dtype} torch.float32 'out7' = {dtype} torch.float32 'out8' = {dtype} torch.float32 'out9' = {dtype} torch.float32 'out10' = {dtype} torch.float32 'out12' = {dtype} torch.float32 __len__ = {int} 13 training_tensor_shapes = {dict: 13} 'input0' = {tuple: 2} (50, 128) 'input1' = {tuple: 1} 128 'input2' = {tuple: 2} (50, 128) 'target' = {tuple: 1} 6400 'target_length' = {tuple: 1} 128 'out2' = {tuple: 3} (50, 128, 1024) 'out1' = {tuple: 3} (50, 128, 1024) 'out3' = {tuple: 3} (50, 128, 1024) 'out7' = {tuple: 3} (50, 128, 1024) 'out8' = {tuple: 3} (50, 128, 1024) 'out9' = {tuple: 3} (50, 128, 1024) 'out10' = {tuple: 3} (50, 128, 1024) 'out12' = {tuple: 3} (50, 128, 32320) __len__ = {int} 13 eval_tensor_shapes = {dict: 13} { 'input0' = {tuple: 2} (50, 128) 'input1' = {tuple: 1} 128 'input2' = {tuple: 2} (50, 128) 'target' = {tuple: 1} 6400 'target_length' = {tuple: 1} 128 'out2' = {tuple: 3} (50, 128, 1024) 'out1' = {tuple: 3} (50, 128, 1024) 'out3' = {tuple: 3} (50, 128, 1024) 'out7' = {tuple: 3} (50, 128, 1024) 'out8' = {tuple: 3} (50, 128, 1024) 'out9' = {tuple: 3} (50, 128, 1024) 'out10' = {tuple: 3} (50, 128, 1024) 'out12' = {tuple: 3} (50, 128, 32320) __len__ = {int} 13
加载上文生成的配置文件。
configuration_maps = { 'module_to_stage_map': None, 'stage_to_rank_map': None, 'stage_to_depth_map': None } if args.config_path is not None: json_config_file = json.load(open(args.config_path, 'r')) configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None) configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None) configuration_maps['stage_to_rank_map'] = { int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()} configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
对应的文件是:
{ "module_to_stage_map": [0, 1, 2, 3, 3], "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]} }
得到:
configuration_maps = {dict: 3} 'module_to_stage_map' = {list: 5} [0, 1, 2, 3, 3] 'stage_to_rank_map' = {dict: 4} {0: [0], 1: [1], 2: [2], 3: [3]} 'stage_to_depth_map' = {NoneType} None __len__ = {int} 3
为了演示,我们这里用如下参数进行启动 main_with_runtime.py。
--module translation.models.gnmt.gpus=4 --data_dir=wmt16_ende_data_bpe_clean --config_path pipedream-pipedream/runtime/translation/models/gnmt/gpus=4/mp_conf.json --local_rank 3 --rank 3 --master_addr 127.0.0.1
在main函数中用如下办法构建了Runtime。Runtime是执行引擎,提供一个统一的、可扩展的基础设施层。
r = runtime.StageRuntime( model=model, distributed_backend=args.distributed_backend, fp16=args.fp16, loss_scale=args.loss_scale, training_tensor_shapes=training_tensor_shapes, eval_tensor_shapes=eval_tensor_shapes, training_tensor_dtypes=dtypes, inputs_module_destinations=inputs_module_destinations, target_tensor_names=target_tensor_names, configuration_maps=configuration_maps, master_addr=args.master_addr, rank=args.rank, local_rank=args.local_rank, num_ranks_in_server=args.num_ranks_in_server, verbose_freq=args.verbose_frequency, model_type=runtime.TRANSLATION, enable_recompute=args.recompute)
StageRuntime定义如下,可以看到其主要成员变量为在此stage内部进行前向后向操作所需要的元数据,比如:
张量,梯度,分布式后端,loss scale,训练数据的张量类型,输出值张量形状等等。
class StageRuntime: def __init__(self, model, distributed_backend, fp16, loss_scale, training_tensor_shapes, eval_tensor_shapes, training_tensor_dtypes, inputs_module_destinations, target_tensor_names, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server, verbose_freq, model_type, enable_recompute=False): # Metadata needed for forward and backward pass within this stage. self.tensors = [] self.gradients = {} self.distributed_backend = distributed_backend self.fp16 = fp16 self.loss_scale = loss_scale self.training_tensor_shapes = training_tensor_shapes self.eval_tensor_shapes = eval_tensor_shapes self.training_tensor_dtypes = training_tensor_dtypes self.model_type = model_type self.target_tensor_names = target_tensor_names self.initialize(model, inputs_module_destinations, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server) self.verbose_freq = verbose_freq self.forward_only = False self.forward_stats = runtime_utilities.RuntimeStats(forward=True) self.backward_stats = runtime_utilities.RuntimeStats(forward=False) # Enable recomputation to prevent the need to save activations # computed from the forward pass for the backward pass. self.enable_recompute = enable_recompute # Disable recomputation for the last stage. if rank == num_ranks_in_server - 1: self.enable_recompute = False
初始化函数代码很长,我们逐段进行分析。
在函数开始,遍历模型每一层的输入和输出,设置 tensor_tag,就是给每一个tensor一个独立唯一的tag,tag经过层层传递,最终会用到 distributed_c10d.py 中的 recv 函数。tensor_tag 会在通信过程中被使用,被p2p用作确定标识。
def recv(tensor, src=None, group=None, tag=0): """ Receives a tensor synchronously. Args: tensor (Tensor): Tensor to fill with received data. src (int, optional): Source rank. Will receive from any process if unspecified. group (ProcessGroup, optional): The process group to work on. If None, the default process group will be used. tag (int, optional): Tag to match recv with remote send Returns: Sender rank -1, if not part of the group """ _check_single_tensor(tensor, "tensor") if _rank_not_in_group(group): return -1 if group is None: pg = _get_default_group() else: pg = group if src is None: work = pg.recv_anysource([tensor], tag) work.wait() src_rank = work._source_rank() if group is None or group is GroupMember.WORLD: return src_rank else: return _get_global_rank(pg, src_rank) else: if group is None or group is GroupMember.WORLD: pg.recv([tensor], src, tag).wait() else: group_src_rank = _get_group_rank(pg, src) pg.recv([tensor], group_src_rank, tag).wait() return src
具体设置 tag 的代码如下:
def initialize(self, model, inputs_module_destinations, configuration_maps, master_addr, rank, local_rank, num_ranks_in_server): self.send_ranks = {} self.receive_ranks = {} self.rank = rank self.local_rank = local_rank self.stage = None self.tensor_tags = {} self.forward_minibatch_id = 0 self.backward_minibatch_id = 0 self.criterion_input_name = str(model[-1][1][0]) tensor_tag = 1 # 遍历模型中每一层,每一层的格式是 (_, input_tensors, output_tensors) for (_, input_tensors, output_tensors) in model: # 遍历输入 for input_tensor in input_tensors: if input_tensor not in self.tensor_tags: self.tensor_tags[input_tensor] = tensor_tag tensor_tag += 1 # 设置 tag # 遍历输出 for output_tensor in output_tensors: if output_tensor not in self.tensor_tags: self.tensor_tags[output_tensor] = tensor_tag tensor_tag += 1 # 设置 tag for target_tensor_name in sorted(self.target_tensor_names): self.tensor_tags[target_tensor_name] = tensor_tag tensor_tag += 1 # 设置 tag self.tensor_tags["ack"] = tensor_tag tensor_tag += 1 # 设置 tag
输入是:
target_tensor_names = {set: 2} {'target_length', 'target'} {str} 'target_length' {str} 'target' __len__ = {int} 2 model = {list: 5} 0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\n (layer7): LS 1 = {list: 2} ['input0', 'input1'] 2 = {list: 2} ['out2', 'out1'] __len__ = {int} 3 1 = {tuple: 3} 0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n) 1 = {list: 4} ['out2', 'input1', 'input2', 'out1'] 2 = {list: 2} ['out3', 'out7'] __len__ = {int} 3 2 = {tuple: 3} 0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\n (layer9): LSTM(2048, 1024)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer13): LSTM(2048, 1024)\n (layer16): Dropout(p=0.2, inplace=False)\n) 1 = {list: 2} ['out3', 'out7'] 2 = {list: 3} ['out8', 'out9', 'out10'] __len__ = {int} 3 3 = {tuple: 3} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {list: 3} ['out8', 'out9', 'out10'] 2 = {list: 1} ['out12'] __len__ = {int} 3 4 = {tuple: 3} 0 = {LabelSmoothing} LabelSmoothing() 1 = {list: 1} ['out12'] 2 = {list: 1} ['loss'] __len__ = {int} 3 __len__ = {int} 5
得到:
tensor_tags = {dict: 15} 'input0' = {int} 1 'input1' = {int} 2 'out2' = {int} 3 'out1' = {int} 4 'input2' = {int} 5 'out3' = {int} 6 'out7' = {int} 7 'out8' = {int} 8 'out9' = {int} 9 'out10' = {int} 10 'out12' = {int} 11 'loss' = {int} 12 'target' = {int} 13 'target_length' = {int} 14 'ack' = {int} 15 __len__ = {int} 15
回忆一下配置文件中的部分定义:
我们给出一个样例,对应的文件内容如下:
{ "module_to_stage_map": [0, 1, 2, 2], "stage_to_rank_map": {"0": [0, 1, 4, 5, 8, 9, 12, 13], "1": [2, 6, 10, 14], "2": [3, 7, 11, 15]} }
针对我们本文的模型,配置文件如下:
{ "module_to_stage_map": [0, 1, 2, 3, 3], "stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]} }
加载到内存中为:
module_to_stage_map = {list: 5} [0, 1, 2, 3, 3] rank_to_stage_map = {dict: 4} {0: 0, 1: 1, 2: 2, 3: 3}
因为有时候也需要反过来查找,所以程序接下来进行反向配置,得到如下。
stage_to_module_map = {defaultdict: 4} default_factory = {type} <class 'list'> 0 = {list: 1} [0] 1 = {list: 1} [1] 2 = {list: 1} [2] 3 = {list: 2} [3, 4] __len__ = {int} 4 stage_to_rank_map = {dict: 4} 0 = {list: 1} [0] 1 = {list: 1} [1] 2 = {list: 1} [2] 3 = {list: 1} [3] __len__ = {int} 4
因为在命令行设置了本地的 local_rank 和 rank,所以接下来runtime从配置文件中依据rank找到自己的东西,对自己进一步做配置。
stage_to_module_map = collections.defaultdict(list) for module in range(len(module_to_stage_map)): # 这里配置了哪个stage拥有哪些module stage_to_module_map[module_to_stage_map[module]].append(module) rank_to_stage_map = {} for stage in stage_to_rank_map: for rank in stage_to_rank_map[stage]: # 配置了哪个 rank 拥有哪些 stage rank_to_stage_map[rank] = stage # Now, use this mapping to determine the modules contained in # each stage. assert 0 <= self.rank < len(rank_to_stage_map) self.num_ranks = len(rank_to_stage_map) # 就是得到了world_size,因为有多少个rank,就是有多少个训练进程,就是world size self.num_stages = len(stage_to_module_map) # 多少个阶段 self.stage = rank_to_stage_map[self.rank] # 通过自己的rank得到自己的stage self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) # 本rank在stage之中排第几个 self.num_ranks_in_stage = len(stage_to_rank_map[self.stage])#得到自己stage的rank数目,就是数据并行数目,可以得到本层的数据并行次数 self.num_ranks_in_first_stage = len(stage_to_rank_map[0]) self.num_ranks_in_previous_stage = 0 self.ranks_in_previous_stage = [] if self.stage > 0: self.num_ranks_in_previous_stage = len( stage_to_rank_map[self.stage - 1]) self.ranks_in_previous_stage = stage_to_rank_map[self.stage - 1] self.num_ranks_in_next_stage = 0 self.ranks_in_next_stage = [] if self.stage < self.num_stages - 1: self.num_ranks_in_next_stage = len( stage_to_rank_map[self.stage + 1]) self.ranks_in_next_stage = stage_to_rank_map[self.stage + 1] modules = stage_to_module_map[self.stage] # 这里得到 [3,4],后续会用到。 self.modules_with_dependencies = ModulesWithDependencies( [model[module] for module in modules]) self.is_criterion = self.stage == (self.num_stages - 1) if stage_to_depth_map is not None: self.num_warmup_minibatches = stage_to_depth_map[ str(self.stage)] else: self.num_warmup_minibatches = self.num_ranks - 1 for i in range(self.stage): self.num_warmup_minibatches -= len( stage_to_rank_map[i]) self.num_warmup_minibatches = self.num_warmup_minibatches // \ self.num_ranks_in_stage
变量为:
self = {StageRuntime} backward_minibatch_id = {int} 0 criterion_input_name = {str} 'out12' distributed_backend = {NoneType} None eval_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), ' forward_minibatch_id = {int} 0 fp16 = {bool} False gradients = {dict: 0} {} is_criterion = {bool} True local_rank = {int} 3 loss_scale = {int} 1 model_type = {str} 'translation' modules_with_dependencies = {ModulesWithDependencies} _all_input_names = {list: 2} [['out8', 'out9', 'out10'], ['out12']] _all_output_names = {list: 2} [['out12'], ['loss']] _modules = {list: 2} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {LabelSmoothing} LabelSmoothing() __len__ = {int} 2 num_ranks = {int} 4 num_ranks_in_first_stage = {int} 1 num_ranks_in_next_stage = {int} 0 num_ranks_in_previous_stage = {int} 1 num_ranks_in_stage = {int} 1 num_stages = {int} 4 num_warmup_minibatches = {int} 0 rank = {int} 3 rank_in_stage = {int} 0 ranks_in_next_stage = {list: 0} [] ranks_in_previous_stage = {list: 1} [2] receive_ranks = {dict: 0} {} send_ranks = {dict: 0} {} stage = {int} 3 target = {str} 'python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 178, in _getPyDictionary\n attr = getattr(var, n)\n File "../runtime.py", line 295, in target\n r target_tensor_names = {set: 2} {'target', 'target_length'} tensor_tags = {dict: 15} {'input0': 1, 'input1': 2, 'out2': 3, 'out1': 4, 'input2': 5, 'out3': 6, 'out7': 7, 'out8': 8, 'out9': 9, 'out10': 10, 'out12': 11, 'loss': 12, 'target': 13, 'target_length': 14, 'ack': 15} tensors = {list: 0} [] training_tensor_dtypes = {dict: 13} {'input0': torch.int64, 'input1': torch.int64, 'input2': torch.int64, 'target': torch.int64, 'target_length': torch.int32, 'out2': torch.float32, 'out1': torch.float32, 'out3': torch.float32, 'out7': torch.float32, 'out8': torch.float32, 'out9': torch.floa training_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '
我们看看几个变量如何使用。
首先,看看 num_ranks 如何使用。在后续代码中有使用,比如:
world_size=self.num_ranks # 依据 num_ranks 得到 world_size self.num_warmup_minibatches = self.num_ranks - 1 # 依据 num_ranks 得到热身batch数目
其次,再看看 rank_in_stage 如何使用?
前面有
self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) # 本rank在stage之中排第几个
rank_in_stage 会传递给 Comm 模块。
self.comm_handler.initialize( self.receive_ranks, self.send_ranks, self.tensor_tags, self.target_tensor_names, self.training_tensor_dtypes, self.rank_in_stage, # 在这里作为参数传入,在函数里面代表本节点,后续会详细介绍 self.num_ranks_in_stage, self.ranks_in_previous_stage, self.ranks_in_next_stage)
接下来对通信模块进行配置。
# To determine where tensors should be sent and received, first # determine the "producing" and "consuming" module IDs of each # tensor. We then use the corresponding machine ranks to send # and receive tensors. master_port = 12345 self.comm_handler = communication.CommunicationHandler( master_addr=master_addr, master_port=master_port, rank=self.rank, local_rank=self.local_rank, num_ranks_in_server=num_ranks_in_server, world_size=self.num_ranks, fp16=self.fp16, backend=self.distributed_backend)
配置代码如下,构建了 CommunicationHandler,这个模块是为后续“设置生产者和消费者”服务的,所以我们暂时把后续代码一起放在这里。
else: ...... # To determine where tensors should be sent and received, first # determine the "producing" and "consuming" module IDs of each # tensor. We then use the corresponding machine ranks to send # and receive tensors. master_port = 12345 self.comm_handler = communication.CommunicationHandler( master_addr=master_addr, master_port=master_port, rank=self.rank, local_rank=self.local_rank, num_ranks_in_server=num_ranks_in_server, world_size=self.num_ranks, fp16=self.fp16, backend=self.distributed_backend) # 设置生产者和消费者部分,我们下面会详细分析 # 设置接受ranks for i in range(len(model)): # 遍历层 for j in range(i+1, len(model)): # 遍历 i 层之后的若干层 for tensor_name in model[i][2]: # 找出前面层 output 的tensor if tensor_name in model[j][1]: # 看看 output 在不在input之中 if module_to_stage_map[i] == \ module_to_stage_map[j]: continue # For now, assume that each stage is served by only # a single machine. if module_to_stage_map[j] == self.stage: self.receive_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[i]] if module_to_stage_map[i] == self.stage: self.send_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[j]] # 设置发送ranks for model_inputs in inputs_module_destinations.keys(): destination_stage = module_to_stage_map[ inputs_module_destinations[model_inputs]] if destination_stage > self.stage: self.send_ranks[model_inputs] = \ self.ranks_in_next_stage if 0 < self.stage <= destination_stage: self.receive_ranks[model_inputs] = \ self.ranks_in_previous_stage if destination_stage > 0: if model_inputs not in self.tensor_tags: self.tensor_tags[model_inputs] = tensor_tag tensor_tag += 1
接下来对发送,接受的rank进行配置,receive_ranks 和 send_ranks 就是在本阶段各个张量对应的发送,接收目标 rank。
前面已经提到,在 PipeDream开发时候,PyTorch 并没有发布稳定的RPC,所以 PipeDream (2019年发布论文)只能自己实现一套通信逻辑关系,或者说是分布式计算图。生产者和消费者就是分布式计算图的重要组成部分。
逻辑抽象如下:
具体代码如下:
# To determine where tensors should be sent and received, first # determine the "producing" and "consuming" module IDs of each # tensor. We then use the corresponding machine ranks to send # and receive tensors. for i in range(len(model)): # 遍历层 for j in range(i+1, len(model)): # 遍历 i 层之后的若干层 for tensor_name in model[i][2]: # 找出前面层 output 的tensor if tensor_name in model[j][1]: # 看看 tensor_name 在不在input之中,即tensor_name 是不是 modle[j]的输入 # tensor_name即在 model[i] 的输出,也在 module[j]的输入,就说明他们之间可以建立联系 if module_to_stage_map[i] == \ module_to_stage_map[j]: # 两个module在一个node上,不用通信机制 continue # For now, assume that each stage is served by only # a single machine. # tensor_name 是 modle[j]的输入,且module[j]位于本节点上,说明可以和本节点的 receive_ranks 建立联系 if module_to_stage_map[j] == self.stage: # 所以tensor_name的输入rank包括rank i self.receive_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[i]] # tensor_name 是module[i]的输出,且module[i]位于本节点上,说明可以和本节点的 send_ranks 建立联系 if module_to_stage_map[i] == self.stage: # 所以tensor_name的输出rank包括rank j self.send_ranks[tensor_name] = \ stage_to_rank_map[module_to_stage_map[j]] for model_inputs in inputs_module_destinations.keys(): destination_stage = module_to_stage_map[ inputs_module_destinations[model_inputs]] if destination_stage > self.stage: self.send_ranks[model_inputs] = \ self.ranks_in_next_stage if 0 < self.stage <= destination_stage: self.receive_ranks[model_inputs] = \ self.ranks_in_previous_stage if destination_stage > 0: if model_inputs not in self.tensor_tags: self.tensor_tags[model_inputs] = tensor_tag tensor_tag += 1
得到变量如下:
num_ranks = {int} 4 num_ranks_in_first_stage = {int} 1 num_ranks_in_next_stage = {int} 0 num_ranks_in_previous_stage = {int} 1 num_ranks_in_stage = {int} 1 num_stages = {int} 4 num_warmup_minibatches = {int} 0 rank = {int} 3 rank_in_stage = {int} 0 ranks_in_next_stage = {list: 0} [] ranks_in_previous_stage = {list: 1} [2] receive_ranks = {dict: 3} # 这里就是每个tensor对应的接收目标rank 'out8' = {list: 1} [2] 'out9' = {list: 1} [2] 'out10' = {list: 1} [2] __len__ = {int} 3 send_ranks = {dict: 0} {} # 这里就是每个tensor对应的发送目标rank __len__ = {int} 0 stage = {int} 3
接下来会处理module相关操作,这里具体会:
关于 ModulesWithDependencies 部分,我们重点说明。
之前我们代码中有如下,就是得倒本stage对应的modules index。
modules = stage_to_module_map[self.stage] # 这里得到 [3,4],后续会用到。
stage_to_module_map 就是设置 stage 到 modules 的关系,目的是为了得到本stage所对应的modules。
回忆一下配置文件,本stage(数值为 3)对应的是 index 为 3,4 的两个 module,就是下面的 3 ,3
module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
接下来要通过如下代码拿到本stage具体的modules,包括每个module的输入,输出。
modules = self.modules_with_dependencies.modules() for i in range(len(modules)): modules[i] = modules[i].cuda() if self.fp16: import apex.fp16_utils as fp16_utils modules[i] = fp16_utils.BN_convert_float(modules[i].half())
运行之后如下
modules = {list: 2} 0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n) 1 = {LabelSmoothing} LabelSmoothing() __len__ = {int} 2
具体 ModulesWithDependencies 如下:
class ModulesWithDependencies: def __init__(self, modules_with_dependencies): self._modules = [] self._all_input_names = [] self._all_output_names = [] for (module, input_names, output_names) in modules_with_dependencies: self._modules.append(module) self._all_input_names.append(input_names) self._all_output_names.append(output_names) def modules(self): return self._modules def all_input_names(self): return self._all_input_names def all_output_names(self): return self._all_output_names def is_input_tensor(self, tensor_name): for module_input_names in self._all_input_names: if tensor_name in module_input_names: return True return False
接下来针对每个stage的并行数目,建立group。
ranks就是每个stage的并行 rank,比如 stage 0 对应的就是 [0, 1, 2]。
{ "module_to_stage_map": [0, 1, 1], "stage_to_rank_map": {"0": [0, 1, 2], "1": [3]} # 每个stage的rank,这里目的是得到并行的机器 }
遍历stage,针对每个stage,调用new_group()
建立进程组。new_group()
函数使用所有进程的任意子集来创建新的进程组,该方法返回一个分组句柄,可作为 collectives
(用于特定编程模式中的信息交换)相关分布式函数的 group
参数 。
这里就是最开始问题中提到的:为了数据并行,每个stage都需要建立并且管理自己的进程组。
# Initialize all groups in the same order on every worker. if stage_to_rank_map is not None: groups = [] for stage in range(self.num_stages): # 遍历stage ranks = stage_to_rank_map[stage] # 与stage的数据并行对应,比如得到 [0, 1, 2] if len(ranks) > 1: # 与后面的 ddp 相对应 groups.append(dist.new_group(ranks=ranks)) else: groups.append(None) group = groups[self.stage] else: group = None
最后调用 DistributedDataParallel 进行处理。这里参数 process_group=group
就是前面 “设定group” 返回的。
就是针对每一个group建立一套 DistributedDataParallel。
# self.modules_with_dependencies contains a list of PyTorch # modules, along with a list of user-defined input and output # tensor names. We use our module_executor.ModuleExecutor # class to wrap these dependencies, and use run_forward and # run_backward methods downstream. num_parameters = 0 for i in range(len(modules)): if group is not None: if ((i < (len(modules)-1) and self.is_criterion) or not self.is_criterion): num_parameters += \ sum(x.size()[0] * x.size()[1] if len(x.size()) > 1 else x.size()[0] for x in modules[i].parameters() if x.size()) # 建立分布式数据并行 modules[i] = torch.nn.parallel.DistributedDataParallel( modules[i], process_group=group, device_ids=[local_rank], output_device=local_rank) if self.num_ranks_in_stage > 1: module_size = 4. * num_parameters print("Replicating stage: ranks=%d, module_size=%.3f" % ( self.num_ranks_in_stage, module_size))
关于 DistributedDataParallel,我们以后有专门系列会进行分析。
最后,针对这个通信模块,进行初始化。
if self.comm_handler is not None: self.comm_handler.initialize( self.receive_ranks, self.send_ranks, self.tensor_tags, self.target_tensor_names, self.training_tensor_dtypes, self.rank_in_stage, self.num_ranks_in_stage, self.ranks_in_previous_stage, self.ranks_in_next_stage)
我们还是使用论文中的图片为例来看看运行时引擎初始化之后的结果:
如果针对本文再细化,则是:
+----------------------------------------+ | Stage 2 StageRuntime | | | | CommunicationHandler | | | | +----------------------------+ | | | +------------------------+ | | | | |Rank 2 | | | | | | | | | | | | | | | +-----------------------------+ | | | Layer 3 +---> Layer 4 | | | | Stage 1 StageRuntime | | | | | | | +---------------------------+ | | | | | | | | | Stage 3 StageRuntime | | | | | +------------------------+ | | | | | CommunicationHandler | | | +------------------------+ | | | CommunicationHandler | | | | | |Rank 3 | | | | | | +-----------------------+ | | DDP | | | | | | +-----------------------+ | | |Rank 1 | +---------------->+ | | +----------> | | Rank 4 | | | | | | | | | Layer 3 +---> Layer 4 | | | | | | | | | Layer 1 +---> Layer 2 | | | | | | | | | | Layer 5 +---> Layer 6 | | | | | | | | | | | | | | | | | | | | | | +------------------------+ | | | | | | | +-----------------------+ | | | +------------------------+ | | | +-----------------------+ | | | | | |Rank 4 | | | | | | | | | | | | | | | +-----------------------------+ | | | | | | +---------------------------+ | | | Layer 3 +---> Layer 4 | | | | | | | | | | | | | | | | | +------------------------+ | | | +----------------------------+ | +----------------------------------------+
手机如下:
我们这里只是介绍基础功能函数。另外有几个业务功能函数,比如 run_forward 会在1F1B文章中一并介绍。
以下这几个功能函数都是调用通讯模块完成功能。
receive_tensors_forward 就是在前向传播中,从前面层获取张量。
前向传播中,张量记录在本实例的 self.tensors 之中。
def receive_tensors_forward(self): if self.forward_only and len(self.tensors) > 0: self.tensors.pop(0) # 弹出以前 self.tensors.append({}) if self.loader_iter is not None: # 前向传播第一层,需要加载数据 input = next(self.loader_iter) # 加载新的 if self.model_type == TRANSLATION: (input, target) = input src, src_length = input tgt, tgt_length = target self.tensors[-1]["input0"] = src.cuda(non_blocking=True) self.tensors[-1]["input1"] = torch.LongTensor(src_length).cuda( non_blocking=True) self.tensors[-1]["input2"] = tgt[:-1].cuda(non_blocking=True) self.tensors[-1]["target"] = tgt[1:].cuda().contiguous().view(-1) self.tensors[-1]["target_length"] = \ torch.tensor([int(sum(torch.LongTensor(tgt_length) - 1))], dtype=torch.int).cuda() elif self.model_type == IMAGE_CLASSIFICATION: (input, target) = input if self.fp16: input = input.half() self.tensors[-1]["input0"] = input.cuda(non_blocking=True) self.tensors[-1]["target"] = target.cuda(non_blocking=True) elif self.model_type == SPEECH_TO_TEXT: input, target, input_percentages, target_sizes = input input_sizes = input_percentages.mul_(int(input.size(3))).int() self.tensors[-1]["input0"] = input.cuda(non_blocking=True) self.tensors[-1]["input1"] = input_sizes.cuda(non_blocking=True) self.tensors[-1]["target"] = target.cuda(non_blocking=True) self.tensors[-1]["target_length"] = target_sizes.cuda( non_blocking=True) else: # Receive all required tensors from upstream machines. for input_name in self.receive_ranks: # 遍历本stage对应的接受rank,从前面层获取 if input_name == "ack": continue self.tensors[-1][input_name] = \ self.comm_handler.recv( input_name, forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=False) self.forward_stats.stats['receive_tensors_size'] += \ (self.tensors[-1][input_name].element_size() * self.tensors[-1][input_name].nelement()) # Used to track where to receive forward from. self.comm_handler.increment_messaging_index( sending=False)
send_tensors_forward就是在前向传播中,向后面层发送张量。
def send_tensors_forward(self): # Send all required tensors downstream. for output_name in self.send_ranks: # 遍历本stage对应的发送rank,进行发送 if output_name == "ack": continue self.comm_handler.send( output_name, self.tensors[-1][output_name], forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=False) self.forward_stats.stats['send_tensors_size'] += \ (self.tensors[-1][output_name].element_size() * self.tensors[-1][output_name].nelement())
后向传播中,梯度保存在 self.gradients。
receive_tensors_backward 就是在后向传播中,从前面层获取张量。
注意,这里对应的是self.send_ranks,就是前向过程中的发送rank,它们在反向过程中就是接受rank
def receive_tensors_backward(self): # Receive all required gradients from downstream # machines. for output_name in self.send_ranks: # 遍历本stage对应的发送rank(前向),进行接受 if output_name in self.target_tensor_names: continue # 获取梯度 self.gradients[output_name] = \ self.comm_handler.recv( output_name, forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) self.backward_stats.stats['receive_tensors_size'] += \ (self.gradients[output_name].element_size() * self.gradients[output_name].nelement())
后向传播中,梯度保存在 self.gradients。
send_tensors_forward就是在后向传播中,向后面层发送梯度张量。
注意,这里对应的是self.receive_ranks,就是前向过程中的接受rank,它们在反向过程中就是发送rank
def send_tensors_backward(self): # Send all required gradients upstream. for input_name in self.receive_ranks: # 遍历本stage对应的接受rank,进行发送 if input_name in self.target_tensor_names: continue self.comm_handler.send( input_name, self.gradients[input_name], forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) self.backward_stats.stats['send_tensors_size'] += \ (self.gradients[input_name].element_size() * self.gradients[input_name].nelement()) if self.num_ranks_in_previous_stage > 0: # Used to track where to send tensors in the # backward pass. self.comm_handler.increment_messaging_index( sending=True)
run_ack就是在传播中,给前面层,后面层回应一个确认。
def run_ack(self): # No need for ack if running on a single worker. if self.rank is None: return # Receive ack from next stage. Send ack to previous stage. if self.stage < (self.num_stages-1): self.comm_handler.recv( "ack", forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) if self.stage > 0: self.comm_handler.send( "ack", torch.zeros(self.tensor_shapes["ack"], dtype=torch.int64).cuda(), forward_minibatch_id=self.forward_minibatch_id, backward_minibatch_id=self.backward_minibatch_id, backward=True) # Used to track where to receive forward from. self.comm_handler.increment_messaging_index(sending=True) self.backward_minibatch_id += 1
至此,运行时引擎我们介绍完毕其静态信息和初始化,下一篇我们介绍通信模块。
https://pytorch.org/docs/stable/rpc.html