kubernetes ceph-csi分析目录导航
https://github.com/kubernetes-csi/external-resizer/releases/tag/v0.5.0
存储扩容分为controller端和node端两步,先进行controller端(external-resizer触发)的扩容,然后再进行node端(kubelet触发)扩容(当volumemode是block,则不用进行node端扩容操作),存储的扩容操作才算完成。
将底层存储扩容,如ceph rbd扩容,则会让ceph集群中的rbd image扩容。
在pod所在的node上做相应的操作,让node感知该存储已经扩容,如ceph rbd filesystem扩容,则会调用node上的文件系统扩容命令让文件系统扩容。
某些存储无需进行node端扩容操作如cephfs。
(1)更改pvc.Spec.Resources.Requests.storgage,触发扩容
(2)controller端存储扩容:external-resizer watch pvc对象,当发现pvc.Spec.Resources.Requests.storgage比pvc.Status.Capacity.storgage大,于是调csi plugin的ControllerExpandVolume方法进行 controller端扩容,进行底层存储扩容,并更新pv.Spec.Capacity.storgage。
(3)node端存储扩容:kubelet发现pv.Spec.Capacity.storage大于pvc.Status.Capacity.storage,于是调csi node端扩容,对dnode上文件系统扩容,成功后kubelet更新pvc.Status.Capacity.storage。
下面以ceph rbd存储扩容为例,对详细的存储扩容过程进行分析。
(1)修改pvc对象,修改申请存储大小(pvc.spec.resources.requests.storage);
(2)修改成功后,external-resizer监听到该pvc的update事件,发现pvc.Spec.Resources.Requests.storgage比pvc.Status.Capacity.storgage大,于是调ceph-csi组件进行 controller端扩容;
(3)ceph-csi组件调用ceph存储,进行底层存储扩容;
(4)底层存储扩容完成后,ceph-csi组件更新pv对象的.Spec.Capacity.storgage的值为扩容后的存储大小;
(5)kubelet的volume manager在reconcile()调谐过程中发现pv.Spec.Capacity.storage大于pvc.Status.Capacity.storage,于是调ceph-csi组件进行 node端扩容;
(6)ceph-csi组件对node上存储对应的文件系统扩容;
(7)扩容完成后,kubelet更新pvc.Status.Capacity.storage的值为扩容后的存储大小。
本节将对controller端存储扩容进行分析,node端存储扩容已经在之前有分析过了,可以看kubelet pvc存储扩容相关代码分析
当pvc.Spec.Resources.Requests大小比pvc.Status.Capacity.storgage大时,会触发到controller端(external-resizer)的扩容逻辑。
controller端(external-resizer)的扩容操作包括:
(1)调用csi plugin的ControllerExpandVolume方法进行存储扩容;
(2)更新pv对象的.spec.capacity.storage为扩容后的存储大小;
(3)更新pvc对象的.Status.Conditions,追加键值对"FileSystemResizePending":"true",表明该pvc的controller端扩容已经完成,接下来将由kubelet完成node端的存储扩容操作。
主要逻辑:根据workers的数量,起数量相等的goroutine不断的跑ctrl.syncPVCs来处理pvc变更事件,筛选出需要扩容的pvc,触发扩容操作。
// pkg/controller/controller.go // Run starts the controller. func (ctrl *resizeController) Run( workers int, ctx context.Context) { defer ctrl.claimQueue.ShutDown() klog.Infof("Starting external resizer %s", ctrl.name) defer klog.Infof("Shutting down external resizer %s", ctrl.name) stopCh := ctx.Done() if !cache.WaitForCacheSync(stopCh, ctrl.pvSynced, ctrl.pvcSynced) { klog.Errorf("Cannot sync pv/pvc caches") return } for i := 0; i < workers; i++ { go wait.Until(ctrl.syncPVCs, 0, stopCh) } <-stopCh }
主要逻辑:调用ctrl.syncPVC
// syncPVCs is the main worker. func (ctrl *resizeController) syncPVCs() { key, quit := ctrl.claimQueue.Get() if quit { return } defer ctrl.claimQueue.Done(key) if err := ctrl.syncPVC(key.(string)); err != nil { // Put PVC back to the queue so that we can retry later. ctrl.claimQueue.AddRateLimited(key) } else { ctrl.claimQueue.Forget(key) } }
处理扩容判断逻辑与执行扩容操作。
主要逻辑:
(1)获取pvc对象;
(2)调用ctrl.pvcNeedResize从pvc对象层面判断是否需要扩容;
(3)获取pv对象;
(4)调用ctrl.pvNeedResize对比pvc与pv对象判断是否需要扩容;
(5)如需扩容,则调用ctrl.resizePVC做扩容操作。
// syncPVC checks if a pvc requests resizing, and execute the resize operation if requested. func (ctrl *resizeController) syncPVC(key string) error { klog.V(4).Infof("Started PVC processing %q", key) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.Errorf("Split meta namespace key of pvc %s failed: %v", key, err) return err } pvc, err := ctrl.pvcLister.PersistentVolumeClaims(namespace).Get(name) if err != nil { if k8serrors.IsNotFound(err) { klog.V(3).Infof("PVC %s/%s is deleted, no need to process it", namespace, name) return nil } klog.Errorf("Get PVC %s/%s failed: %v", namespace, name, err) return err } if !ctrl.pvcNeedResize(pvc) { klog.V(4).Infof("No need to resize PVC %q", util.PVCKey(pvc)) return nil } pv, err := ctrl.pvLister.Get(pvc.Spec.VolumeName) if err != nil { if k8serrors.IsNotFound(err) { klog.V(3).Infof("PV %s is deleted, no need to process it", pvc.Spec.VolumeName) return nil } klog.Errorf("Get PV %q of pvc %q failed: %v", pvc.Spec.VolumeName, util.PVCKey(pvc), err) return err } if !ctrl.pvNeedResize(pvc, pv) { klog.V(4).Infof("No need to resize PV %q", pv.Name) return nil } return ctrl.resizePVC(pvc, pv) }
下面先分析下pvcNeedResize与pvNeedResize方法。
当pvc.Status.Phase==Bound而且pvc.Spec.Resources.Requests.storgage大小比pvc.Status.Capacity.storgage大时返回true,说明符合扩容条件。
// pvcNeedResize returns true is a pvc requests a resize operation. func (ctrl *resizeController) pvcNeedResize(pvc *v1.PersistentVolumeClaim) bool { // Only Bound pvc can be expanded. if pvc.Status.Phase != v1.ClaimBound { return false } if pvc.Spec.VolumeName == "" { return false } actualSize := pvc.Status.Capacity[v1.ResourceStorage] requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] return requestSize.Cmp(actualSize) > 0 }
当pv.Spec.Resources.Requests.storgage大于或者等于pvc.Status.Capacity.storgage,且pvc的.Status.Conditions中有key为"FileSystemResizePending",值为“true”的键值对,则说明controller端扩容已完成,该方法返回false;相反的,如果pv.Spec.Resources.Requests.storgage小于pvc.Status.Capacity.storgage,则说明controller端未做扩容操作,需要进行扩容操作,返回true。
扩容分为controller端和node端,先进行controller端(external-resizer触发)的扩容,然后再进行node端(kubelet触发)扩容,扩容操作才算完成。
// pvNeedResize returns true if a pv supports and also requests resize. func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { if !ctrl.resizer.CanSupport(pv, pvc) { klog.V(4).Infof("Resizer %q doesn't support PV %q", ctrl.name, pv.Name) return false } if (pv.Spec.ClaimRef == nil) || (pvc.Namespace != pv.Spec.ClaimRef.Namespace) || (pvc.UID != pv.Spec.ClaimRef.UID) { klog.V(4).Infof("persistent volume is not bound to PVC being updated: %s", util.PVCKey(pvc)) return false } pvSize := pv.Spec.Capacity[v1.ResourceStorage] requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] if pvSize.Cmp(requestSize) >= 0 { // If PV size is equal or bigger than request size, that means we have already resized PV. // In this case we need to check PVC's condition. // 1. If PVC in PersistentVolumeClaimResizing condition, we should continue to perform the // resizing operation as we need to know if file system resize if required. (What's more, // we hope the driver can find that the actual size already matched the request size and do nothing). // 2. If PVC in PersistentVolumeClaimFileSystemResizePending condition, we need to // do nothing as kubelet will finish file system resizing and mark resize as finished. if util.HasFileSystemResizePendingCondition(pvc) { // This is case 2. return false } // This is case 1. return true } // PV size is smaller than request size, we need to resize the volume. return true }
当controller端扩容已经完成时,util.HasFileSystemResizePendingCondition返回true。主要根据pvc.Status.Conditions中key为"FileSystemResizePending",值为“true”来判断。
const ( // PersistentVolumeClaimFileSystemResizePending - controller resize is finished and a file system resize is pending on node PersistentVolumeClaimFileSystemResizePending PersistentVolumeClaimConditionType = "FileSystemResizePending" ) // HasFileSystemResizePendingCondition returns true if a pvc has a FileSystemResizePending condition. // This means the controller side resize operation is finished, and kublete side operation is in progress. func HasFileSystemResizePendingCondition(pvc *v1.PersistentVolumeClaim) bool { for _, condition := range pvc.Status.Conditions { if condition.Type == v1.PersistentVolumeClaimFileSystemResizePending && condition.Status == v1.ConditionTrue { return true } } return false }
该方法负责扩容操作的逻辑。
主要逻辑:
(1)调用ctrl.markPVCResizeInProgress,更新pvc对象的.Status.Conditions,追加键值对"Resizing":"true",表明该pvc正在进行resize;
(2)调用ctrl.resizeVolume做扩容操作;
(3)扩容完成,调用ctrl.markPVCResizeFinished,更新pvc对象的.Status.Conditions,追加键值对"FileSystemResizePending":"true",表明该pvc的controller端扩容已经完成。
// resizePVC will: // 1. Mark pvc as resizing. // 2. Resize the volume and the pv object. // 3. Mark pvc as resizing finished(no error, no need to resize fs), need resizing fs or resize failed. func (ctrl *resizeController) resizePVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { if updatedPVC, err := ctrl.markPVCResizeInProgress(pvc); err != nil { klog.Errorf("Mark pvc %q as resizing failed: %v", util.PVCKey(pvc), err) return err } else if updatedPVC != nil { pvc = updatedPVC } // Record an event to indicate that external resizer is resizing this volume. ctrl.eventRecorder.Event(pvc, v1.EventTypeNormal, util.VolumeResizing, fmt.Sprintf("External resizer is resizing volume %s", pv.Name)) err := func() error { newSize, fsResizeRequired, err := ctrl.resizeVolume(pvc, pv) if err != nil { return err } if fsResizeRequired { // Resize volume succeeded and need to resize file system by kubelet, mark it as file system resizing required. return ctrl.markPVCAsFSResizeRequired(pvc) } // Resize volume succeeded and no need to resize file system by kubelet, mark it as resizing finished. return ctrl.markPVCResizeFinished(pvc, newSize) }() if err != nil { // Record an event to indicate that resize operation is failed. ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeResizeFailed, err.Error()) } return err }
主要逻辑:
(1)调用ctrl.resizer.Resize进行存储扩容;
(2)调用util.UpdatePVCapacity更新pv的.spec.capacity.storage。
// resizeVolume resize the volume to request size, and update PV's capacity if succeeded. func (ctrl *resizeController) resizeVolume( pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (resource.Quantity, bool, error) { requestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] newSize, fsResizeRequired, err := ctrl.resizer.Resize(pv, requestSize) if err != nil { klog.Errorf("Resize volume %q by resizer %q failed: %v", pv.Name, ctrl.name, err) return newSize, fsResizeRequired, fmt.Errorf("resize volume %s failed: %v", pv.Name, err) } klog.V(4).Infof("Resize volume succeeded for volume %q, start to update PV's capacity", pv.Name) if err := util.UpdatePVCapacity(pv, newSize, ctrl.kubeClient); err != nil { klog.Errorf("Update capacity of PV %q to %s failed: %v", pv.Name, newSize.String(), err) return newSize, fsResizeRequired, err } klog.V(4).Infof("Update capacity of PV %q to %s succeeded", pv.Name, newSize.String()) return newSize, fsResizeRequired, nil }
ctrl.resizer.Resize:组装请求,调用r.client.Expand进行存储扩容(实际是调用csi plugin的ControllerExpandVolume方法)
// Resize resizes the persistence volume given request size // It supports both CSI volume and migrated in-tree volume func (r *csiResizer) Resize(pv *v1.PersistentVolume, requestSize resource.Quantity) (resource.Quantity, bool, error) { oldSize := pv.Spec.Capacity[v1.ResourceStorage] var volumeID string var source *v1.CSIPersistentVolumeSource var pvSpec v1.PersistentVolumeSpec if pv.Spec.CSI != nil { // handle CSI volume source = pv.Spec.CSI volumeID = source.VolumeHandle pvSpec = pv.Spec } else { if csitranslationlib.IsMigratedCSIDriverByName(r.name) { // handle migrated in-tree volume csiPV, err := csitranslationlib.TranslateInTreePVToCSI(pv) if err != nil { return oldSize, false, fmt.Errorf("failed to translate persistent volume: %v", err) } source = csiPV.Spec.CSI pvSpec = csiPV.Spec volumeID = source.VolumeHandle } else { // non-migrated in-tree volume return oldSize, false, fmt.Errorf("volume %v is not migrated to CSI", pv.Name) } } if len(volumeID) == 0 { return oldSize, false, errors.New("empty volume handle") } var secrets map[string]string secreRef := source.ControllerExpandSecretRef if secreRef != nil { var err error secrets, err = getCredentials(r.k8sClient, secreRef) if err != nil { return oldSize, false, err } } secrets[pvCephMountPathKey] = pv.Annotations[pvCephMountPathKey] capability, err := GetVolumeCapabilities(pvSpec) if err != nil { return oldSize, false, fmt.Errorf("failed to get capabilities of volume %s with %v", pv.Name, err) } ctx, cancel := timeoutCtx(r.timeout) defer cancel() newSizeBytes, nodeResizeRequired, err := r.client.Expand(ctx, volumeID, requestSize.Value(), secrets, capability) if err != nil { return oldSize, nodeResizeRequired, err } return *resource.NewQuantity(newSizeBytes, resource.BinarySI), nodeResizeRequired, err }
// pkg/csi/client.go func (c *client) Expand( ctx context.Context, volumeID string, requestBytes int64, secrets map[string]string, capability *csi.VolumeCapability) (int64, bool, error) { req := &csi.ControllerExpandVolumeRequest{ Secrets: secrets, VolumeId: volumeID, CapacityRange: &csi.CapacityRange{RequiredBytes: requestBytes}, VolumeCapability: capability, } resp, err := c.ctrlClient.ControllerExpandVolume(ctx, req) if err != nil { return 0, false, err } return resp.CapacityBytes, resp.NodeExpansionRequired, nil }
util.UpdatePVCapacity:更新pv对象的.spec.capacity.storage为扩容后的大小。
// UpdatePVCapacity updates PVC capacity with requested size. func UpdatePVCapacity(pv *v1.PersistentVolume, newCapacity resource.Quantity, kubeClient kubernetes.Interface) error { newPV := pv.DeepCopy() newPV.Spec.Capacity[v1.ResourceStorage] = newCapacity patchBytes, err := getPatchData(pv, newPV) if err != nil { return fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", pv.Name, err) } _, updateErr := kubeClient.CoreV1().PersistentVolumes().Patch(pv.Name, types.StrategicMergePatchType, patchBytes) if updateErr != nil { return fmt.Errorf("update capacity of PV %s failed: %v", pv.Name, updateErr) } return nil }
至此,external-resizer的扩容分析结束。
存储扩容分为controller端和node端两步,先进行controller端(external-resizer触发)的扩容,然后再进行node端(kubelet触发)扩容(当volumemode是block,则不用进行node端扩容操作),存储的扩容操作才算完成。
将底层存储扩容,如ceph rbd扩容,则会让ceph集群中的rbd image扩容。
在pod所在的node上做相应的操作,让node感知该存储已经扩容,如ceph rbd filesystem扩容,则会调用node上的文件系统扩容命令让文件系统扩容。
某些存储无需进行node端扩容操作如cephfs。
(1)更改pvc.Spec.Resources.Requests.storgage,触发扩容
(2)controller端存储扩容:external-resizer watch pvc对象,当发现pvc.Spec.Resources.Requests.storgage比pvc.Status.Capacity.storgage大,于是调csi plugin的ControllerExpandVolume方法进行 controller端扩容,进行底层存储扩容,并更新pv.Spec.Capacity.storgage。
(3)node端存储扩容:kubelet发现pv.Spec.Capacity.storage大于pvc.Status.Capacity.storage,于是调csi node端扩容,对dnode上文件系统扩容,成功后kubelet更新pvc.Status.Capacity.storage
controller端(external-resizer)的主要扩容操作包括:
(1)调用csi plugin的ControllerExpandVolume方法进行存储扩容;
(2)更新pv对象的.spec.capacity.storage为扩容后的存储大小;
(3)更新pvc对象的.Status.Conditions,追加键值对"FileSystemResizePending":"true",表明该pvc的controller端扩容已经完成,接下来将由kubelet完成node端的存储扩容操作。
kubelet pvc存储扩容相关代码分析