Horovod 是一款基于 AllReduce 的分布式训练框架。凭借其对 TensorFlow、PyTorch 等主流深度学习框架的支持,以及通信优化等特点,Horovod 被广泛应用于数据并行的训练中。
本文是 horovod on k8s 的最后一篇,看看 MPI-Operator 可能被如何改进,主要就是根据 Elastic Training Operator 作者 团队的博客内容来学习源码。所以本文以大量源码为主。
本系列其他文章链接如下:
[源码解析] 深度学习分布式训练框架 Horovod (1) --- 基础知识
[源码解析] 深度学习分布式训练框架 horovod (2) --- 从使用者角度切入
[源码解析] 深度学习分布式训练框架 horovod (3) --- Horovodrun背后做了什么
[源码解析] 深度学习分布式训练框架 horovod (4) --- 网络基础 & Driver
[源码解析] 深度学习分布式训练框架 horovod (5) --- 融合框架
[源码解析] 深度学习分布式训练框架 horovod (6) --- 后台线程架构
[源码解析] 深度学习分布式训练框架 horovod (7) --- DistributedOptimizer
[源码解析] 深度学习分布式训练框架 horovod (8) --- on spark
[源码解析] 深度学习分布式训练框架 horovod (9) --- 启动 on spark
[源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark
[源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案
[源码解析] 深度学习分布式训练框架 horovod (12) --- 弹性训练总体架构
[源码解析] 深度学习分布式训练框架 horovod (13) --- 弹性训练之 Driver
[源码解析] 深度学习分布式训练框架 horovod (14) --- 弹性训练发现节点 & State
[源码解析] 深度学习分布式训练框架 horovod (15) --- 广播 & 通知
[源码解析] 深度学习分布式训练框架 horovod (16) --- 弹性训练之Worker生命周期
[源码解析] 深度学习分布式训练框架 horovod (17) --- 弹性训练之容错
[源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator
[源码解析] 深度学习分布式训练框架 horovod (17) --- 弹性训练之容错
[源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator
[源码解析] 深度学习分布式训练框架 horovod (19) --- kubeflow MPI-operator
0x01, 0x02 两节均来自于 Elastic Training Operator 团队博客内容,这个博客真得很给力。
Kubernetes 和云计算提供敏捷性和伸缩性,我们可以通过 cluster-AutoScaler 等组件为训练任务设置弹性策略,利用 Kubernetes 的弹性能力,按需创建,减少 GPU 设备空转。
但这种伸缩模式面对训练这种离线任务还是略有不足:
如何给训练任务赋予弹性能力,是提高性价比的关键路径。近期 horovod 等分布式框架逐渐支持了 Elastic Training,即弹性训练能力。也就是允许一个训练任务在执行的过程中动态的扩容或者缩容训练 worker, 从不会引起训练任务的中断。需要在代码中做少量修改适配,可参考:https://horovod.readthedocs.io/en/stable/elastic_include.html。
在 mpi-operator 中,参与训练的 Worker 都是作为静态资源设计和维护,支持弹性训练模式后,给任务增加了灵活性,同时也给运维层带来了挑战,例如:
针对以上问题,我们设计开发了 et-operator,提供 TrainingJob CRD 描述训练任务, ScaleOut 和 ScaleIn CRD 描述扩容和缩容操作, 通过它们的组合,使我们的训练任务更具有弹性。将这个方案开源,欢迎大家提需求、交流、吐槽。
开源方案地址:https://github.com/AliyunContainerService/et-operator
TrainingJob Controller 主要有以下功能:
TrainingJob 子资源创建顺序如下:
TrainingJob 相关资源:
TrainingJob CR 的配置分为 Lanucher 和 Worker。在 Launcher 中指定任务的镜像和启动执行, 默认 et-operator 会根据 worker 分配情况,生成一个 hostfile 文件和 discover_host 脚本,discover_host 脚本挂载到 Launcher 的 /etc/edl/discover_hosts.sh 文件, 在入口脚本的 horovodrun 执行中通过 --host-discovery-script 参数指定。在 Worker 设置中指定 worker 的镜像和 GPU 占用 ,并可以通过 maxReplicas / minReplicas 指定 workers 的副本数允许范围。
程序主流程图如下:
其实,学习 ETO 主要就是学习如何扩容和缩容。但是为了学习这个,我们还是需要梳理一下程序逻辑。
不熟悉 K8S 的同学顺便也一起看看其 CRD 如何使用。
入口代码是 main.go/main 函数,从入口可以看出,
func main() { mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, LeaderElection: enableLeaderElection, Port: 9443, }) const jobPollInterval = "5s" if err = controllers.NewReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil { os.Exit(1) } if err = controllers.NewScaleOutReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil { os.Exit(1) } if err = controllers.NewScaleInReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil { os.Exit(1) } if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { os.Exit(1) } }
这里的配置就是建立了消息的响应函数,具体就是响应哪些 CR。
除了 TrainingJob 外,et-operator 同时支持 ScaleOut 和 ScaleIn 两种 CRD,下发训练任务扩容和缩容操作。
当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile, 这里工作很简单,根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。
TrainingJobController 中监听到属于 TrainingJob 的 ScaleOut CR 有更新, 触发 TrainingJob 的 Reconcile,遍历过滤 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根据创建时间和状态时间决定执行的扩容或者缩容。
执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。
func (r *ScaleInReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kaiv1alpha1.ScaleIn{}). Complete(r) } func (r *ScaleOutReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kaiv1alpha1.ScaleOut{}). Complete(r) } func (r *TrainingJobReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kaiv1alpha1.TrainingJob{}). Owns(&kaiv1alpha1.ScaleIn{}). Owns(&kaiv1alpha1.ScaleOut{}). Owns(&corev1.Pod{}). Owns(&corev1.Service{}). Owns(&corev1.ConfigMap{}). Owns(&corev1.Secret{}). // Ignore status-only and metadata-only updates //WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) }
顺着代码梳理一下,寻找其设计思想精微之处。
k8s operator 中reconcile方法 的作用就是不断的watch,当资源变化时 就会触发reconcile方法,理论上有多少次的变化就会执行多少次的reconcile方法。
当有消息来的时候,Reconcile 方法会得到调用。
func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { // Fetch latest training job instance. sharedTrainingJob := &kaiv1alpha1.TrainingJob{} err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob) trainingJob := sharedTrainingJob.DeepCopy() // Check reconcile is required. // No need to do reconcile or job has been deleted. r.Scheme.Default(trainingJob) return r.ReconcileJobs(trainingJob) }
因为消息中状态是 "",所以运行了 initializeJob,并且进行 reconcileResource。
func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) { oldJobStatus := job.Status.DeepCopy() defer func() { latestJob := &kaiv1alpha1.TrainingJob{} err := r.Get(context.Background(), types.NamespacedName{ Name: job.Name, Namespace: job.Namespace, }, latestJob) if err == nil { if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion { latestJob.Status = job.Status job = latestJob } } r.updateObjectStatus(job, oldJobStatus) }() switch job.Status.Phase { case commonv1.JobSucceeded, commonv1.JobFailed: err = r.cleanup(job) case "", commonv1.JobCreated: // 如果状态为空 或者 JobCreated,则初始化 r.initializeJob(job) err = r.reconcileResource(job) case commonv1.JobRunning: err = r.reconcileJobRunning(job) case commonv1.Scaling: err = r.executeScaling(job) } if err != nil { if IsRequeueError(err) { return RequeueAfterInterval(r.PollInterval, nil) } return RequeueAfterInterval(r.PollInterval, err) } return NoRequeue() }
reconcileResource 其实就是调用 doSteps,调用一个状态机继续初始化。
func (r *TrainingJobReconciler) reconcileResource(job *kaiv1alpha1.TrainingJob) error { steps := r.newSteps() err := r.doSteps(job, steps) return err }
newSteps 构建了一个简单的状态机,是一个初始化步骤,按照序列执行,doSteps 会根据状态进行不同的分支处理。
有几点需要说明:
代码如下:
func (r *TrainingJobReconciler) newSteps() []Step { return []Step{ Step{ JobCondition: commonv1.WorkersCreated, Action: r.createTrainingJobWorkers, }, Step{ JobCondition: commonv1.WorkersReady, Action: r.waitWorkersRunning, }, Step{ JobCondition: commonv1.LauncherCreated, Action: r.createLauncher, }, Step{ JobCondition: commonv1.JobRunning, Action: r.syncLauncherState, }, } } func (r *TrainingJobReconciler) doSteps(job *kaiv1alpha1.TrainingJob, steps []Step) error { for _, step := range steps { if hasCondition(*job.GetJobStatus(), step.JobCondition) { continue } err := step.Action(job) break } return nil }
所以具体如下:
Request("") K8S +--------------------> Reconcile + | | v +----------------------+---------------------+ | ReconcileJobs | | + | | | | | +------------------------------+ | | | | | | | v v v | | "", JobCreated JobRunning Scaling | +--------+-----------------------------------+ | | v reconcileResource + | | v +---------+---------------+ | doSteps | | | | | | WorkersCreated +---------> createTrainingJobWorkers | | | | | WorkersReady +----------> waitWorkersRunning | | | | | LauncherCreated +--------> createLauncher | | | | | JobRunning +------------> syncLauncherState | | +-------------------------+
在 doSteps 步骤中,先来到 createTrainingJobWorkers 这个Action。这里会设置 Job 状态为 WorkersCreated。
func (r *TrainingJobReconciler) createTrainingJobWorkers(job *kaiv1alpha1.TrainingJob) error { if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH { if cm, err := r.GetOrCreateSecret(job); cm == nil || err != nil { updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg) return nil } } workers := getJobReplicasWorkers(job) job.Status.TargetWorkers = workers // 创建worker if err := r.CreateWorkers(job, workers); err != nil { updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg) return nil } // 设置新状态 updateJobConditions(job.GetJobStatus(), common.WorkersCreated, "", msg) return nil }
CreateWorkers 会进行创建worker,如本文前面介绍,worker 包含 service 和 pod,所以创建过程具体为:
调用 另一个同名函数CreateWorkers 来间接创建 workerService。
调用 newWorker 去创建 Pod。
func (r *TrainingJobReconciler) CreateWorkers(job *kaiv1alpha1.TrainingJob, workers []string) error { return r.createWorkers(job, workers, func(name string, index string) *corev1.Pod { worker := newWorker(job, name, index) return worker }) }
这里会循环调用 createWorker 依据配置生成一系列 workers。
func (r *TrainingJobReconciler) createWorkers(job *kaiv1alpha1.TrainingJob, workers []string, newPod PodTplGenerator) error { // 遍历,创建 for _, podName := range workers { index, err := getWorkerIndex(job.Name, podName) if err != nil { return err } _, err = r.createWorker(job, int32(index), newPod) if err != nil { return err } } return nil }
这里会依据参数对 worker Pod 进行判断,如果不存在,则创建 某一个 worker。
func (r *TrainingJobReconciler) createWorker(job *kaiv1alpha1.TrainingJob, index int32, workerPodTempl PodTplGenerator) (*corev1.Pod, error) { name := getWorkerName(job.Name, int(index)) indexStr := strconv.Itoa(int(index)) pod := &corev1.Pod{} nsn := types.NamespacedName{ Name: name, Namespace: job.Namespace, } err := r.Get(context.Background(), nsn, pod) if err != nil { // If the worker Pod doesn't exist, we'll create it. if errors.IsNotFound(err) { // 如果没有pod,这里也可以创建pod worker := workerPodTempl(name, indexStr) if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH { util.MountRsaKey(worker, job.Name) } if err = r.Create(context.Background(), worker); err != nil { return nil, err } } } service := &corev1.Service{} err = r.Get(context.Background(), nsn, service) if errors.IsNotFound(err) { // 调用newService 进行具体创建 err = r.Create(context.Background(), newService(job, name, indexStr)) } return nil, nil }
这里才来到具体创建service,真是百转千回。
func newService(obj interface{}, name string, index string) *corev1.Service { job, _ := obj.(*kaiv1alpha1.TrainingJob) labels := GenLabels(job.Name) labels[labelTrainingRoleType] = worker labels[replicaIndexLabel] = index return &corev1.Service{ // 具体创建 ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: job.Namespace, Labels: labels, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind), }, }, Spec: corev1.ServiceSpec{ ClusterIP: "None", Selector: labels, Ports: []corev1.ServicePort{ { Name: "ssh-port", Port: 22, }, }, }, } }
newWorker 则构建了 Pod,就是比较常见的套路。
func newWorker(obj interface{}, name string, index string) *corev1.Pod { job, _ := obj.(*kaiv1alpha1.TrainingJob) labels := GenLabels(job.Name) labels[labelTrainingRoleType] = worker labels[replicaIndexLabel] = index podSpec := job.Spec.ETReplicaSpecs.Worker.Template.DeepCopy() // keep the labels which are set in PodTemplate if len(podSpec.Labels) == 0 { podSpec.Labels = make(map[string]string) } for key, value := range labels { podSpec.Labels[key] = value } // RestartPolicy=Never setRestartPolicy(podSpec) container := podSpec.Spec.Containers[0] // if we want to use ssh, will start sshd service firstly. if len(container.Command) == 0 { if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH { container.Command = []string{"sh", "-c", "/usr/sbin/sshd && sleep 365d"} } else { container.Command = []string{"sh", "-c", "sleep 365d"} } } podSpec.Spec.Containers[0] = container // 创建了pod return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: job.Namespace, Labels: podSpec.Labels, Annotations: podSpec.Annotations, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind), }, }, Spec: podSpec.Spec, } }
逻辑如下:
Request("") K8S +--------------------> Reconcile + | | v +----------------------+---------------------+ | ReconcileJobs | | + | | | | | +------------------------------+ | | | | | | | v v v | | "", JobCreated JobRunning Scaling | +--------+-----------------------------------+ | | v reconcileResource + | | v +---------+---------------+ | doSteps | +----> createWorkers +----> createWorker +----> newService | | | | | + | WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers +-------> newWorker +------> WorkersCreated | | | | | WorkersReady +----------> waitWorkersRunning | | | | | LauncherCreated +--------> createLauncher | | | | | JobRunning +------------> syncLauncherState | | +-------------------------+
手机如下:
建立完 worker 之后,就开始建立 Launcher。所以继续执行 createLauncher。
func (r *TrainingJobReconciler) createLauncher(job *kaiv1alpha1.TrainingJob) error { if _, err := r.GetOrCreateLauncherServiceAccount(job); err != nil { updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg) return nil } if _, err := r.GetOrCreateLauncherRole(job, 0); err != nil { updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg) return nil } if _, err := r.GetLauncherRoleBinding(job); err != nil { updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg) return nil } if cm, err := r.CreateHostConfigMap(job); cm == nil || err != nil { updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg) return nil } launcher, err := r.GetLauncherJob(job) if launcher == nil { if _, err := r.CreateLauncher(job); err != nil { updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg) return nil } } updateJobConditions(job.GetJobStatus(), commonv1.LauncherCreated, "", msg) return nil }
我们取两个重点步骤。
这里获取关于host的配置。
func (r *TrainingJobReconciler) CreateHostConfigMap(job *kaiv1alpha1.TrainingJob) (*corev1.ConfigMap, error) { return r.createConfigMap(job, newHostfileConfigMap) } func (r *TrainingJobReconciler) createConfigMap(job *kaiv1alpha1.TrainingJob, newCm func(job *kaiv1alpha1.TrainingJob) *corev1.ConfigMap) (*corev1.ConfigMap, error) { cm := &corev1.ConfigMap{} name := ctrl.Request{} name.NamespacedName.Namespace = job.GetNamespace() name.NamespacedName.Name = job.GetName() + configSuffix err := r.Get(context.Background(), name.NamespacedName, cm) if errors.IsNotFound(err) { if err = r.Create(context.Background(), newCm(job)); err != nil { return cm, err } } return cm, nil }
这里进行pod的创建
func (r *TrainingJobReconciler) CreateLauncher(obj interface{}) (*corev1.Pod, error) { job, ok := obj.(*kaiv1alpha1.TrainingJob) launcher := newLauncher(job) // 创建pod if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH { util.MountRsaKey(launcher, job.Name) } err := r.Create(context.Background(), launcher) return launcher, nil }
这里就是具体构建 Pod。
func newLauncher(obj interface{}) *corev1.Pod { job, _ := obj.(*kaiv1alpha1.TrainingJob) launcherName := job.Name + launcherSuffix labels := GenLabels(job.Name) labels[labelTrainingRoleType] = launcher podSpec := job.Spec.ETReplicaSpecs.Launcher.Template.DeepCopy() // copy the labels and annotations to pod from PodTemplate if len(podSpec.Labels) == 0 { podSpec.Labels = make(map[string]string) } for key, value := range labels { podSpec.Labels[key] = value } podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, initContainer(job)) //podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, kubedeliveryContainer()) container := podSpec.Spec.Containers[0] container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ Name: hostfileVolumeName, MountPath: hostfileMountPath, }, corev1.VolumeMount{ Name: configVolumeName, MountPath: configMountPath, }, corev1.VolumeMount{ Name: kubectlVolumeName, MountPath: kubectlMountPath, }) if job.GetAttachMode() == kaiv1alpha1.AttachModeKubexec { container.Env = append(container.Env, corev1.EnvVar{ Name: "OMPI_MCA_plm_rsh_agent", Value: getKubexecPath(), }) } podSpec.Spec.Containers[0] = container podSpec.Spec.ServiceAccountName = launcherName setRestartPolicy(podSpec) hostfileMode := int32(0444) scriptMode := int32(0555) podSpec.Spec.Volumes = append(podSpec.Spec.Volumes, corev1.Volume{ Name: hostfileVolumeName, VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, corev1.Volume{ Name: kubectlVolumeName, VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, corev1.Volume{ Name: configVolumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: job.Name + configSuffix, }, Items: []corev1.KeyToPath{ { Key: hostfileName, Path: hostfileName, Mode: &hostfileMode, }, { Key: discoverHostName, Path: discoverHostName, Mode: &hostfileMode, }, { Key: kubexeclFileName, Path: kubexeclFileName, Mode: &scriptMode, }, }, }, }, }) return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: launcherName, Namespace: job.Namespace, Labels: podSpec.Labels, Annotations: podSpec.Annotations, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind), }, }, Spec: podSpec.Spec, } }
至此,一个新的训练job被运行起来,其逻辑拓展如下:
Request("") K8S ---------------------> Reconcile + | | v +----------------------+---------------------+ | ReconcileJobs | | + | | | | | +------------------------------+ | | | | | | | v v v | | "", JobCreated JobRunning Scaling | +--------+-----------------------------------+ | | v reconcileResource + | | v +---------+---------------+ | doSteps | +----> createWorkers +----> createWorker +----> newService | | | | | | | WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers +-------> newWorker +------> WorkersCreated | | | | | WorkersReady +----------> waitWorkersRunning | | | | | LauncherCreated +--------> createLauncher+----> CreateHostConfigMap +-----> CreateLauncher +------> newLauncher | | | | | JobRunning +------------> syncLauncherState | | +-------------------------+
手机如下:
完成了新job的创建,我们看看本文的关键技术点,scaleOut 和 scaleIn。
ScaleOut 任务 CR如下:
当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile, 这里工作很简单,根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。
以一个 ScaleOut 操作举例:
- apiVersion: kai.alibabacloud.com/v1alpha1 kind: ScaleOut metadata: creationTimestamp: "2020-11-04T13:54:26Z name: scaleout-ptfnk namespace: default ownerReferences: - apiVersion: kai.alibabacloud.com/v1alpha1 blockOwnerDeletion: true controller: true kind: TrainingJob name: elastic-training // 指向扩容对象TrainingJob uid: 075b9c4a-22f9-40ce-83c7-656b329a2b9e spec: selector: name: elastic-training toAdd: count: 2
当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile。主要就是调用 setScalingOwner。
func (r *ScaleOutReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { scaleOut, err := getScaleOut(req.NamespacedName, r.Client) if err != nil { // Error reading the object - requeue the request. return RequeueImmediately() } if scaleOut == nil || scaleOut.DeletionTimestamp != nil { return NoRequeue() } if isScaleFinished(*scaleOut.GetJobStatus()) { return NoRequeue() } return setScalingOwner(r, scaleOut, r.PollInterval) }
setScalingOwner 是关键之一。
这里主要是处理当 ScaleOut CR 没有设置 OwnerReferences 的情况,就设置一个。
逻辑是 根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。
func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) { ownerRefs := scaler.GetOwnerReferences() if len(ownerRefs) == 0 { trainingJob := &kaiv1alpha1.TrainingJob{} nsn := types.NamespacedName{} nsn.Namespace = scaler.GetNamespace() nsn.Name = scaler.GetSelector().Name err := r.Get(context.Background(), nsn, trainingJob) gvk := kaiv1alpha1.SchemeGroupVersionKind ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind})) scaler.SetOwnerReferences(ownerRefs) initializeJobStatus(scaler.GetJobStatus()) updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg) err = r.Status().Update(context.Background(), scaler) err = r.Update(context.Background(), scaler) } return NoRequeue() } // RequeueAfterInterval requeues after a duration when duration > 0 is specified. func RequeueAfterInterval(interval time.Duration, err error) (ctrl.Result, error) { return ctrl.Result{RequeueAfter: interval}, err }
TrainingJobController 中监听到属于 TrainingJob 的 ScaleOut CR 有更新, 触发 TrainingJob 的 Reconcile,遍历过滤 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根据创建时间和状态时间决定执行的扩容或者缩容。
func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { rlog := r.Log.WithValues("trainingjob", req.NamespacedName) // Fetch latest training job instance. sharedTrainingJob := &kaiv1alpha1.TrainingJob{} err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob) trainingJob := sharedTrainingJob.DeepCopy() // Check reconcile is required. // No need to do reconcile or job has been deleted. r.Scheme.Default(trainingJob) return r.ReconcileJobs(trainingJob) }
func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) { oldJobStatus := job.Status.DeepCopy() logger.Infof("jobName: %v, phase %s", job.Name, job.Status.Phase) defer func() { latestJob := &kaiv1alpha1.TrainingJob{} err := r.Get(context.Background(), types.NamespacedName{ Name: job.Name, Namespace: job.Namespace, }, latestJob) if err == nil { if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion { latestJob.Status = job.Status job = latestJob } } r.updateObjectStatus(job, oldJobStatus) }() switch job.Status.Phase { case commonv1.JobSucceeded, commonv1.JobFailed: err = r.cleanup(job) case "", commonv1.JobCreated: r.initializeJob(job) err = r.reconcileResource(job) case commonv1.JobRunning: err = r.reconcileJobRunning(job) case commonv1.Scaling: err = r.executeScaling(job) default: logger.Warnf("job %s unknown status %s", job.Name, job.Status.Phase) } if err != nil { if IsRequeueError(err) { return RequeueAfterInterval(r.PollInterval, nil) } return RequeueAfterInterval(r.PollInterval, err) } return NoRequeue() }
以下根据当前 job 状态不同,就有两条线,先是 JobRunning ,然后是 Scaling,最后恢复成 JobRunning。
我们一一分析。
首先是来到 JobRunning 状态,我们依次看看如何处理。
func (r *TrainingJobReconciler) reconcileJobRunning(job *kaiv1alpha1.TrainingJob) error { if err := r.syncLauncherState(job); err != nil { return err } if err := r.syncWorkersState(job); err != nil { return err } if job.Status.Phase == commonv1.JobRunning { return r.setTrainingJobScaler(job) // 既然是JobRunning状态,就可以开始进行设置scaler } return nil }
首先,通过 availableScaleOutList 或者 availableScaleInList ,然后进行update。
func (r *TrainingJobReconciler) setTrainingJobScaler(job *kaiv1alpha1.TrainingJob) error { scaleOut, err := r.availableScaleOutList(job) // 找到scaleout list scaleIn, err := r.availableScaleInList(job) // 找到scaleIn list scalerList := append(scaleOut, scaleIn...) // 合并 // Select the latest scaling job r.updateLatestScaler(job, scalerList) // 开始设置 return nil }
依据创建时间和状态时间,找到最后一个Scaler。
func (r *TrainingJobReconciler) updateLatestScaler(job *kaiv1alpha1.TrainingJob, scalers []Scaler) error { var latestScaler Scaler if len(scalers) == 0 { return nil } for i, _ := range scalers { scalerItem := scalers[i] // 依据创建时间和状态时间,找到最后一个Scaler if latestScaler == nil || latestScaler.GetCreationTimestamp().Time.Before(scalerItem.GetCreationTimestamp().Time) { latestScaler = scalerItem } } return r.updateCurrentScaler(job, latestScaler) }
对找到的scaler进行设置。
func (r *TrainingJobReconciler) updateCurrentScaler(job *kaiv1alpha1.TrainingJob, scaleItem Scaler) error { job.Status.CurrentScaler = scaleItem.GetFullName() msg := fmt.Sprintf("trainingJobob(%s/%s) execute %s", job.Namespace, job.Name, scaleItem.GetFullName()) // 设置状态 r.updateScalerState(scaleItem, job, newCondition(common.Scaling, scalingStartReason, msg)) if err := r.updateObjectStatus(scaleItem, nil); err != nil { return err } return nil }
这时候会设置 common.Scaling。所以下次运行,会到 Scaling 分支。
func (r *TrainingJobReconciler) updateScalerState(scaleObj Scaler, trainingJob *kaiv1alpha1.TrainingJob, condition common.JobCondition) error { jobPhase := common.Scaling // 设置 common.Scaling。所以下次运行,会到 Scaling 分支 currentJob := scaleObj.GetFullName() if condition.Type == common.ScaleSucceeded || condition.Type == common.ScaleFailed { jobPhase = common.JobRunning currentJob = "" } setCondition(trainingJob.GetJobStatus(), condition) updateStatusPhase(trainingJob.GetJobStatus(), jobPhase) updateTrainingJobCurrentScaler(trainingJob.GetJobStatus(), currentJob) setCondition(scaleObj.GetJobStatus(), condition) updateStatusPhase(scaleObj.GetJobStatus(), condition.Type) return nil }
逻辑如下:
1 Request("") K8S +--------------------> Reconcile <------------------+ 2 ScaleOut CR + | K8S +--------------------> | | | | v | +----------------------+---------------------+ | | ReconcileJobs | | | + | | | | | | | +------------------------------+ | | | 1 | | 2 3 | | | | v v v | | | "", JobCreated JobRunning Scaling | | +--------+-------------+---------------------+ | | | | 1 | | 2 | v v | reconcileResource reconcileJobRunning | + + | 1 | | 2 | | | | v v | +--------------------+----+ setTrainingJobScaler | | doSteps | + | | | | 2 | | | | | | WorkersCreated | v | | | updateScalerState | | | + | | WorkersReady | | | | | | 2 | | | v | | LauncherCreated | common.Scaling | | | + | | | | | | JobRunning | | 2 | | | | | +-------------------------+ +-------------------------+
依据 scale 的类型不同,进行不同扩展。
func (r *TrainingJobReconciler) executeScaling(job *kaiv1alpha1.TrainingJob) error { if err := r.syncLauncherState(job); err != nil { return err } if job.Status.CurrentScaler == "" { updateStatusPhase(job.GetJobStatus(), common.JobRunning) return nil } if isFinished(*job.GetJobStatus()) { return nil } scalerType, scalerName := getScalerName(job.Status.CurrentScaler) // 根据 in 还是 out 进行不同的处理 if scalerType == "ScaleIn" { scaleIn, err := getScaleIn(scalerName, r) if scaleIn == nil || isScaleFinished(*scaleIn.GetJobStatus()) { finishTrainingScaler(job.GetJobStatus()) return nil } oldStatus := scaleIn.Status.DeepCopy() defer r.updateObjectStatus(scaleIn, oldStatus) // 执行具体缩容操作 if err = r.executeScaleIn(job, scaleIn); err != nil { return err } } else if scalerType == "ScaleOut" { scaleOut, err := getScaleOut(scalerName, r) if scaleOut == nil || isScaleFinished(*scaleOut.GetJobStatus()) { finishTrainingScaler(job.GetJobStatus()) return nil } oldStatus := scaleOut.Status.DeepCopy() defer r.updateObjectStatus(scaleOut, oldStatus) // 执行具体扩容操作 if err = r.executeScaleOut(job, scaleOut); err != nil { } } return nil }
进行扩展。
func (r *TrainingJobReconciler) executeScaleOut(job *kaiv1alpha1.TrainingJob, scaleOut *kaiv1alpha1.ScaleOut) error { initializeJobStatus(scaleOut.GetJobStatus()) if err := r.validateScaleOut(scaleOut); err != nil { r.updateScalerFailed(scaleOut, job, err.Error()) return err } if err := r.setScaleOutWorkers(job, scaleOut); err != nil { return err } err := r.ScaleOutWorkers(job, scaleOut) if err != nil { msg := fmt.Sprintf("%s create scaleout workers failed, error: %v", scaleOut.GetFullName(), err) r.ScaleOutFailed(job, scaleOut, msg) return err } scaleOutWorkers, err := r.getScalerOutWorkers(job, scaleOut) workerStatuses, _ := r.workerReplicasStatus(scaleOut.GetJobStatus(), scaleOutWorkers) if workerStatuses.Active < *scaleOut.Spec.ToAdd.Count { if IsScaleOutTimeout(scaleOut) { msg := fmt.Sprintf("scaleout job %s execution timeout", scaleOut.GetFullName()) r.ScaleOutFailed(job, scaleOut, msg) } return NewRequeueError(fmt.Errorf("wait for workers running")) } hostWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleOut) // execute scalein script // 执行scale脚本 if err := r.executeScaleScript(job, scaleOut, hostWorkers); err != nil { msg := fmt.Sprintf("%s execute script failed, error: %v", scaleOut.GetFullName(), err) r.ScaleOutFailed(job, scaleOut, msg) return err } else { job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleOut) r.updateScalerSuccessd(scaleOut, job) } return nil }
这时候调用 hostfileUpdateScript,更新 host file;
最终调用 executeOnLauncher执行脚本。
func (r *TrainingJobReconciler) executeScaleScript(trainingJob *kaiv1alpha1.TrainingJob, scaler Scaler, workers []string) error { if isScriptExecuted(*scaler.GetJobStatus()) { return nil } msg := fmt.Sprintf("trainingjob(%s/%s): execute script on launcher for %s", trainingJob.Namespace, trainingJob.Name, scaler.GetFullName()) slots := getSlots(trainingJob) scriptSpec := scaler.GetScriptSpec() var script string // 得到脚本 if scriptSpec.Script != "" { script = scalerScript(scriptSpec.GetTimeout(), scriptSpec.Env, scriptSpec.Script, scaler.GetPodNames(), slots) } else { hostfilePath := getHostfilePath(trainingJob) script = hostfileUpdateScript(hostfilePath, workers, slots) } // 执行脚本 _, _, err := r.executeOnLauncher(trainingJob, script) updateJobConditions(scaler.GetJobStatus(), common.ScriptExecuted, "", msg) return nil }
得到最终的脚本string。
func hostfileUpdateScript(hostfile string, workers []string, slot int) string { return fmt.Sprintf( `echo '%s' > %s`, getHostfileContent(workers, slot), hostfile) }
获取host file内容
func getHostfileContent(workers []string, slot int) string { var buffer bytes.Buffer for _, worker := range workers { buffer.WriteString(fmt.Sprintf("%s:%d\n", worker, slot)) } return buffer.String() }
在pod上执行
func (r *TrainingJobReconciler) executeOnLauncher(trainingJob *kaiv1alpha1.TrainingJob, script string) (string, string, error) { var err error var launcherPod *corev1.Pod if launcherPod, err = r.GetLauncherJob(trainingJob); err != nil { } if launcherPod != nil { stdOut, stdErr, err := kubectlOnPod(launcherPod, script) return stdOut, stdErr, nil } return "", "", nil }
拉动 worker。
func kubectlOnPod(pod *corev1.Pod, cmd string) (string, string, error) { cmds := []string{ "/bin/sh", "-c", cmd, } stdout, stderr, err := util.ExecCommandInContainerWithFullOutput(pod.Name, pod.Spec.Containers[0].Name, pod.Namespace, cmds) if err != nil { return stdout, stderr, err } return stdout, stderr, nil }
逻辑如下:
1 Request("") K8S +--------------------> Reconcile <------------------+ 2 ScaleOut CR + | K8S +--------------------> | | | | v | +----------------------+---------------------+ | | ReconcileJobs | | | + | | | | | | | +------------------------------+ | | | 1 | | 2 3 | | | | v v v | | 3 | "", JobCreated JobRunning Scaling +-----------> executeScaling +--------+-------------+---------------------+ | + | | | | 1 | | 2 | | 3 v v | v reconcileResource reconcileJobRunning | executeScaleOut + + | + 1 | | 2 | | | | | | 3 v v | v +--------------------+----+ setTrainingJobScaler | executeScaleScript | doSteps | + | + | | | 2 | | | | | | | 3 | WorkersCreated | v | v | | updateScalerState | hostfileUpdateScript | | + | + | WorkersReady | | | | 3 | | | 2 | | | | v | v | LauncherCreated | common.Scaling | executeOnLauncher | | + | + | | | | | | JobRunning | | 2 | | 3 | | | | v +-------------------------+ +-------------------------+ kubectlOnPod
ScaleIn 任务 CR如下:
执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。
通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。
apiVersion: kai.alibabacloud.com/v1alpha1 kind: ScaleIn metadata: name: scalein-workers spec: selector: name: elastic-training toDelete: count: 1
如果想要缩容特定的 Worker,可以配置 podNames:
apiVersion: kai.alibabacloud.com/v1alpha1 kind: ScaleIn metadata: name: scalein-workers spec: selector: name: elastic-training toDelete: podNames: - elastic-training-worker-1
运行一个缩容示例,指定数量缩容 1 个 worker:
kubectl create -f examples/scale_in_count.yaml
当下发一个 scaleInCR,Controller 触发 Reconcile。主要就是调用 setScalingOwner。
func (r *ScaleInReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { //silog := r.Log.WithValues("scalein", req.NamespacedName) scaleIn, err := getScaleIn(req.NamespacedName, r.Client) if isScaleFinished(*scaleIn.GetJobStatus()) { return NoRequeue() } // 以上基本都是各种校验 return setScalingOwner(r, scaleIn, r.PollInterval) }
setScalingOwner 是关键之一。
这里主要是处理当 ScaleIn CR 没有设置 OwnerReferences 的情况,就设置一个。
逻辑是 根据 ScaleIn CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。
下面移除各种错误检查代码。
func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) { ownerRefs := scaler.GetOwnerReferences() if len(ownerRefs) == 0 { trainingJob := &kaiv1alpha1.TrainingJob{} nsn := types.NamespacedName{} nsn.Namespace = scaler.GetNamespace() nsn.Name = scaler.GetSelector().Name err := r.Get(context.Background(), nsn, trainingJob) gvk := kaiv1alpha1.SchemeGroupVersionKind ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind})) scaler.SetOwnerReferences(ownerRefs) initializeJobStatus(scaler.GetJobStatus()) updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg) err = r.Status().Update(context.Background(), scaler) err = r.Update(context.Background(), scaler) } return NoRequeue() }
JobRunning 状态处理与 ScaleOut类似,所以略过,直接看处理executeScaleIn。
执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。
通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。
具体结合代码就是:
setsSaleInToDelete 指定哪些需要删除;
executeScaleScript 执行脚本;
DeleteWorkers 删除 worker;
func (r *TrainingJobReconciler) executeScaleIn(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error { if scaleIn.DeletionTimestamp != nil || isScaleFinished(*scaleIn.GetJobStatus()) { logger.Info("reconcile cancelled, scalein does not need to do reconcile or has been deleted") return nil } initializeJobStatus(scaleIn.GetJobStatus()) //TODO: Validate the scalein count for minSize err := r.setsSaleInToDelete(job, scaleIn) currentWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleIn) // execute scalein script if err := r.executeScaleScript(job, scaleIn, currentWorkers); err != nil { msg := fmt.Sprintf("%s execute script failed, error: %v", scaleIn.GetFullName(), err) r.updateScalerFailed(scaleIn, job, msg) return nil } toDeleteWorkers := scaleIn.GetPodNames() remainWorkers := false if scaleIn.Spec.Script == "" { if shutdownWorkers, err := r.checkWorkerShutdown(job, toDeleteWorkers); err != nil { return err } else { if len(toDeleteWorkers) != len(shutdownWorkers) { remainWorkers = true toDeleteWorkers = shutdownWorkers } } } if err := r.DeleteWorkers(job, toDeleteWorkers); err != nil { msg := fmt.Sprintf("%s delete resource failed, error: %v", scaleIn.GetFullName(), err) r.updateScalerFailed(scaleIn, job, msg) return nil } // wait pods deleted deleted, _ := r.isWorkersDeleted(job.Namespace, scaleIn.GetPodNames()) if deleted { job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleIn) job.Status.CurrentWorkers = currentWorkers r.updateScalerSuccessd(scaleIn, job) return nil } if remainWorkers { msg := "wait for workers process shutdown" logger.Info(msg) return NewRequeueError(fmt.Errorf(msg)) } return nil }
通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。
func (r *TrainingJobReconciler) setsSaleInToDelete(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error { podNames := scaleIn.Status.ToDeletePods if len(podNames) != 0 { return /*filterPodNames(workers, podNames, false), */ nil } workers, err := r.GetWorkerPods(job) toDelete := scaleIn.Spec.ToDelete if toDelete.PodNames != nil { workers = filterPodNames(workers, toDelete.PodNames, false) } else if toDelete.Count > 0 { if toDelete.Count < len(workers) { allPodNames := getSortPodNames(job.Name, workers) deletePodNames := allPodNames[len(workers)-toDelete.Count:] workers = filterPodNames(workers, deletePodNames, false) } } for _, worker := range workers { scaleIn.Status.ToDeletePods = append(scaleIn.Status.ToDeletePods, worker.Name) } return nil }
具体删除worker service 和 pods。
func (r *TrainingJobReconciler) DeleteWorkers(trainingJob *kaiv1alpha1.TrainingJob, workers []string) error { if err := r.DeleteWorkerServices(trainingJob, workers); err != nil { return fmt.Errorf("delete services failed: %++v", err) } if err := r.DeleteWorkerPods(trainingJob, workers); err != nil { return fmt.Errorf("delete pods failed: %++v", err) } return nil }
删除pods。
func (r *TrainingJobReconciler) DeleteWorkerPods(job *kaiv1alpha1.TrainingJob, pods []string) error { workerPods, err := r.GetWorkerPods(job) if pods != nil { workerPods = filterPodNames(workerPods, pods, false) } for _, pod := range workerPods { deleteOptions := &client.DeleteOptions{GracePeriodSeconds: utilpointer.Int64Ptr(0)} if err := r.Delete(context.Background(), &pod, deleteOptions); err != nil && !errors.IsNotFound(err) { r.recorder.Eventf(job, corev1.EventTypeWarning, trainingJobFailedReason, "Error deleting worker %s: %v", pod.Name, err) //return err } r.recorder.Eventf(job, corev1.EventTypeNormal, trainingJobSucceededReason, "Deleted pod %s", pod.Name) } return nil }
具体逻辑如下:
1 Request("") K8S-----------------> Reconcile <------------------+ 2 ScaleOut CR + | K8S-----------------> | | | | v | +----------------------+---------------------+ | | ReconcileJobs | | | + | | | | | | | +------------------------------+ | | | 1 | | 2 3 | | | | v v v | | 3 | "", JobCreated JobRunning Scaling +---------> executeScaling -----+ +--------+-------------+---------------------+ | + | | | | | | 1 | | 2 | | 3 | 4 v v | v v reconcileResource reconcileJobRunning | executeScaleOut executeScaleIn + + | + + 1 | | 2 | | | | | | | 3 | 4 v v | v v +------------+--------+ setTrainingJobScaler | executeScaleScript executeScaleScript | doSteps | + | + + | | | 2 | | | | | | | | 3 | 4 | WorkersCreated | v | v v | | updateScalerState | hostfileUpdateScript DeleteWorkers | | + | + + | WorkersReady | | | | 3 | 4 | | | 2 | | | | | v | v v | LauncherCreated | common.Scaling | executeOnLauncher DeleteWorkerPods | | + | + + | | | | | | | JobRunning | | 2 | | 3 | 4 | | | | v v +---------------------+ +-------------------------+ kubectlOnPod Delete
至此,Horovod系列分析完毕,下一篇开始分析参数服务器,敬请期待。
★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。
ElasticDL 分析
在 Kubernetes 上弹性深度学习训练利器 -- Elastic Training Operator