Implement controller and kubelet changes for recovery from resize

failures
This commit is contained in:
Hemant Kumar 2021-11-12 11:06:40 -05:00
parent 9684763568
commit 1ddd598d31
10 changed files with 877 additions and 91 deletions

View File

@ -23,7 +23,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -148,6 +148,10 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac
oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service)
notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService)
case schema.GroupResource{Resource: "persistentvolumeclaims"}:
oldPVC := oldObj.(*v1.PersistentVolumeClaim)
newPVC := newObj.(*v1.PersistentVolumeClaim)
notifyUpdate = core.RequiresQuotaReplenish(newPVC, oldPVC)
}
if notifyUpdate {
event := &event{

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@ -44,12 +45,14 @@ import (
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/features"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/subpath"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
@ -302,19 +305,31 @@ func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.Persi
return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient())
}
pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
var generatedOptions volumetypes.GeneratedOperations
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
generatedOptions, err = expc.operationGenerator.GenerateExpandAndRecoverVolumeFunc(pvc, pv, resizerName)
if err != nil {
klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
} else {
pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil {
klog.Errorf("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
generatedOptions, err = expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
if err != nil {
klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
}
generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
if err != nil {
klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err
}
klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
_, detailedErr := generatedOperations.Run()
_, detailedErr := generatedOptions.Run()
return detailedErr
}
@ -357,8 +372,15 @@ func (expc *expandController) getPersistentVolume(ctx context.Context, pvc *v1.P
// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity
func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage])
pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage]
return pvcCap.Cmp(pvCap) >= 0
pvcSpecCap := pvc.Spec.Resources.Requests.Storage()
pvcStatusCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage]
// since we allow shrinking volumes, we must compare both pvc status and capacity
// with pv spec capacity.
if pvcStatusCap.Cmp(*pvcSpecCap) >= 0 && pvcStatusCap.Cmp(pvCap) >= 0 {
return true
}
return false
}
// Implementing VolumeHost interface

View File

@ -347,8 +347,12 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
resp, err := nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
if !isFinalError(err) {
return opts.newSize, volumetypes.NewUncertainProgressError(err.Error())
}
return opts.newSize, err
}
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}

View File

@ -85,6 +85,8 @@ const (
FailVolumeExpansion = "fail-expansion-test"
AlwaysFailNodeExpansion = "always-fail-node-expansion"
deviceNotMounted = "deviceNotMounted"
deviceMountUncertain = "deviceMountUncertain"
deviceMounted = "deviceMounted"
@ -178,6 +180,7 @@ type FakeVolumePlugin struct {
LimitKey string
ProvisionDelaySeconds int
SupportsRemount bool
DisableNodeExpansion bool
// default to false which means it is attachable by default
NonAttachable bool
@ -464,13 +467,17 @@ func (plugin *FakeVolumePlugin) ExpandVolumeDevice(spec *Spec, newSize resource.
}
func (plugin *FakeVolumePlugin) RequiresFSResize() bool {
return true
return !plugin.DisableNodeExpansion
}
func (plugin *FakeVolumePlugin) NodeExpand(resizeOptions NodeResizeOptions) (bool, error) {
if resizeOptions.VolumeSpec.Name() == FailWithInUseVolumeName {
return false, volumetypes.NewFailedPreconditionError("volume-in-use")
}
if resizeOptions.VolumeSpec.Name() == AlwaysFailNodeExpansion {
return false, fmt.Errorf("Test failure: NodeExpand")
}
// Set up fakeVolumePlugin not support STAGE_UNSTAGE for testing the behavior
// so as volume can be node published before we can resize
if resizeOptions.CSIVolumePhase == volume.CSIVolumeStaged {

View File

@ -104,6 +104,10 @@ func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.
return f.recordFuncCall("GenerateExpandVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandInUseVolumeFunc"), nil
}

View File

@ -658,6 +658,16 @@ func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentV
}, nil
}
func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)
return volumetypes.NewOperationContext(nil, nil, false)
}
return volumetypes.GeneratedOperations{
OperationFunc: opFunc,
}, nil
}
func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
opFunc := func() volumetypes.OperationContext {
startOperationAndBlock(fopg.ch, fopg.quit)

View File

@ -25,6 +25,8 @@ import (
"strings"
"time"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -150,10 +152,50 @@ type OperationGenerator interface {
GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error)
GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error)
// Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume.
GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
}
type inTreeResizeOpts struct {
resizerName string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
volumeSpec *volume.Spec
volumePlugin volume.ExpandableVolumePlugin
}
type inTreeResizeResponse struct {
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
err error
// Indicates whether kubelet should assume resize operation as finished.
// For kubelet - resize operation could be assumed as finished even if
// actual resizing is *not* finished. This can happen, because certain prechecks
// are failing and kubelet should not retry expansion, or it could happen
// because resize operation is genuinely finished.
assumeResizeOpAsFinished bool
// indicates that resize operation was called on underlying volume driver
// mainly useful for testing.
resizeCalled bool
// indicates whether entire volume expansion is finished or not
// only used from nodeExpansion calls. Mainly used for testing.
resizeFinished bool
}
type nodeResizeOperationOpts struct {
vmt VolumeToMount
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
pluginResizeOpts volume.NodeResizeOptions
volumePlugin volume.NodeExpandableVolumePlugin
actualStateOfWorld ActualStateOfWorldMounterUpdater
}
func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
attachedVolumes []AttachedVolume,
nodeName types.NodeName,
@ -1595,7 +1637,6 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
}
expandVolumeFunc := func() volumetypes.OperationContext {
migrated := false
newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
@ -1617,7 +1658,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
// k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be
// successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed
// until they reflect user requested size in pvc.Status.Size
updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
_, updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
if updateErr != nil {
detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
@ -1632,7 +1673,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
// reflects user requested size.
if !volumePlugin.RequiresFSResize() || !fsVolume {
klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
_, err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
@ -1640,7 +1681,7 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
} else {
err := util.MarkForFSResize(pvc, og.kubeClient)
_, err := util.MarkForFSResize(pvc, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
klog.Warning(detailedErr)
@ -1672,6 +1713,210 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
}, nil
}
func (og *operationGenerator) GenerateExpandAndRecoverVolumeFunc(
pvc *v1.PersistentVolumeClaim,
pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) {
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
}
if volumePlugin == nil {
return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
}
expandVolumeFunc := func() volumetypes.OperationContext {
resizeOpts := inTreeResizeOpts{
pvc: pvc,
pv: pv,
resizerName: resizerName,
volumePlugin: volumePlugin,
volumeSpec: volumeSpec,
}
migrated := false
resp := og.expandAndRecoverFunction(resizeOpts)
if resp.err != nil {
return volumetypes.NewOperationContext(resp.err, resp.err, migrated)
}
return volumetypes.NewOperationContext(nil, nil, migrated)
}
eventRecorderFunc := func(err *error) {
if *err != nil {
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
}
}
return volumetypes.GeneratedOperations{
OperationName: "expand_volume",
OperationFunc: expandVolumeFunc,
EventRecorderFunc: eventRecorderFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"),
}, nil
}
func (og *operationGenerator) expandAndRecoverFunction(resizeOpts inTreeResizeOpts) inTreeResizeResponse {
pvc := resizeOpts.pvc
pv := resizeOpts.pv
resizerName := resizeOpts.resizerName
volumePlugin := resizeOpts.volumePlugin
volumeSpec := resizeOpts.volumeSpec
pvcSpecSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
pvSize := pv.Spec.Capacity[v1.ResourceStorage]
resizeResponse := inTreeResizeResponse{
pvc: pvc,
pv: pv,
resizeCalled: false,
}
// by default we are expanding to full-fill size requested in pvc.Spec.Resources
newSize := pvcSpecSize
resizeStatus := v1.PersistentVolumeClaimNoExpansionInProgress
if pvc.Status.ResizeStatus != nil {
resizeStatus = *pvc.Status.ResizeStatus
}
var allocatedSize *resource.Quantity
t, ok := pvc.Status.AllocatedResources[v1.ResourceStorage]
if ok {
allocatedSize = &t
}
var err error
if pvSize.Cmp(pvcSpecSize) < 0 {
// pv is not of requested size yet and hence will require expanding
switch resizeStatus {
case v1.PersistentVolumeClaimControllerExpansionInProgress:
case v1.PersistentVolumeClaimNodeExpansionPending:
case v1.PersistentVolumeClaimNodeExpansionInProgress:
case v1.PersistentVolumeClaimNodeExpansionFailed:
if allocatedSize != nil {
newSize = *allocatedSize
}
default:
newSize = pvcSpecSize
}
} else {
// PV has already been expanded and hence we can be here for following reasons:
// 1. If expansion is pending on the node and this was just a spurious update event
// we don't need to do anything and let kubelet handle it.
// 2. It could be that - although we successfully expanded the volume, we failed to
// record our work in API objects, in which case - we should resume resizing operation
// and let API objects be updated.
// 3. Controller successfully expanded the volume, but expansion is failing on the node
// and before kubelet can retry failed node expansion - controller must verify if it is
// safe to do so.
// 4. While expansion was still pending on the node, user reduced the pvc size.
switch resizeStatus {
case v1.PersistentVolumeClaimNodeExpansionInProgress:
case v1.PersistentVolumeClaimNodeExpansionPending:
// we don't need to do any work. We could be here because of a spurious update event.
// This is case #1
return resizeResponse
case v1.PersistentVolumeClaimNodeExpansionFailed:
// This is case#3
pvc, err = og.markForPendingNodeExpansion(pvc, pv)
resizeResponse.pvc = pvc
resizeResponse.err = err
return resizeResponse
case v1.PersistentVolumeClaimControllerExpansionInProgress:
case v1.PersistentVolumeClaimControllerExpansionFailed:
case v1.PersistentVolumeClaimNoExpansionInProgress:
// This is case#2 or it could also be case#4 when user manually shrunk the PVC
// after expanding it.
if allocatedSize != nil {
newSize = *allocatedSize
}
default:
// It is impossible for ResizeStatus to be nil and allocatedSize to be not nil but somehow
// if we do end up in this state, it is safest to resume expansion to last recorded size in
// allocatedSize variable.
if pvc.Status.ResizeStatus == nil && allocatedSize != nil {
newSize = *allocatedSize
} else {
newSize = pvcSpecSize
}
}
}
pvc, err = util.MarkControllerReisizeInProgress(pvc, resizerName, newSize, og.kubeClient)
if err != nil {
msg := fmt.Errorf("error updating pvc %s with resize in progress: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
resizeResponse.err = msg
resizeResponse.pvc = pvc
return resizeResponse
}
updatedSize, err := volumePlugin.ExpandVolumeDevice(volumeSpec, newSize, pvcStatusSize)
resizeResponse.resizeCalled = true
if err != nil {
msg := fmt.Errorf("error expanding pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
resizeResponse.err = msg
resizeResponse.pvc = pvc
return resizeResponse
}
// update PV size
var updateErr error
pv, updateErr = util.UpdatePVSize(pv, updatedSize, og.kubeClient)
// if updating PV failed, we are going to leave the PVC in ControllerExpansionInProgress state, so as expansion can be retried to previously set allocatedSize value.
if updateErr != nil {
msg := fmt.Errorf("error updating pv for pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
resizeResponse.err = msg
return resizeResponse
}
resizeResponse.pv = pv
fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
if !volumePlugin.RequiresFSResize() || !fsVolume {
pvc, err = util.MarkResizeFinished(pvc, updatedSize, og.kubeClient)
if err != nil {
msg := fmt.Errorf("error marking pvc %s as resized: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
resizeResponse.err = msg
return resizeResponse
}
resizeResponse.pvc = pvc
successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
} else {
pvc, err = og.markForPendingNodeExpansion(pvc, pv)
resizeResponse.pvc = pvc
if err != nil {
msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
resizeResponse.err = msg
return resizeResponse
}
}
return resizeResponse
}
func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
var err error
pvc, err = util.MarkForFSResize(pvc, og.kubeClient)
if err != nil {
msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return pvc, msg
}
// store old PVC capacity in pv, so as if PVC gets deleted while node expansion was pending
// we can restore size of pvc from PV annotation and still perform expansion on the node
oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
if err != nil {
detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
klog.Warning(detailedErr)
return pvc, detailedErr
}
return pvc, nil
}
func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
@ -1825,6 +2070,7 @@ func (og *operationGenerator) nodeExpandVolume(
if expandableVolumePlugin != nil &&
expandableVolumePlugin.RequiresFSResize() &&
volumeToMount.VolumeSpec.PersistentVolume != nil {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
@ -1832,56 +2078,200 @@ func (og *operationGenerator) nodeExpandVolume(
return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
}
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
if pvcStatusCap.Cmp(pvSpecCap) < 0 {
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
if volumeToMount.VolumeSpec.ReadOnly {
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil
}
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return true, nil
}
return false, resizeErr
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return false, nil
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
}
if volumeToMount.VolumeSpec.ReadOnly {
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil
}
resizeOp := nodeResizeOperationOpts{
vmt: volumeToMount,
pvc: pvc,
pv: pv,
pluginResizeOpts: rsOpts,
volumePlugin: expandableVolumePlugin,
actualStateOfWorld: actualStateOfWorld,
}
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
resizeResponse := og.callNodeExpandOnPlugin(resizeOp)
return resizeResponse.assumeResizeOpAsFinished, resizeResponse.err
} else {
return og.legacyCallNodeExpandOnPlugin(resizeOp)
}
}
return true, nil
}
// callNodeExpandOnPlugin is newer version of calling node expansion on plugins, which does support
// recovery from volume expansion failure.
func (og *operationGenerator) callNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) inTreeResizeResponse {
pvc := resizeOp.pvc
pv := resizeOp.pv
volumeToMount := resizeOp.vmt
rsOpts := resizeOp.pluginResizeOpts
actualStateOfWorld := resizeOp.actualStateOfWorld
expandableVolumePlugin := resizeOp.volumePlugin
var err error
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
resizeResponse := inTreeResizeResponse{
pvc: pvc,
pv: pv,
}
if permitNodeExpansion(pvc, pv) {
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
pvc, err = util.MarkNodeExpansionInProgress(pvc, og.kubeClient)
if err != nil {
msg := volumeToMount.GenerateErrorDetailed("MountVolume.NodeExpandVolume failed to mark node expansion in progress: %v", err)
klog.Errorf(msg.Error())
resizeResponse.err = msg
return resizeResponse
}
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
resizeResponse.resizeCalled = true
if resizeErr != nil {
if volumetypes.IsOperationFinishedError(resizeErr) {
var markFailedError error
pvc, markFailedError = util.MarkNodeExpansionFailed(pvc, og.kubeClient)
// update the pvc with node expansion object
resizeResponse.pvc = pvc
resizeResponse.assumeResizeOpAsFinished = true
if markFailedError != nil {
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountMount.NodeExpandVolume failed to mark node expansion as failed: %v", err).Error())
}
}
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
}
resizeResponse.err = resizeErr
return resizeResponse
}
resizeResponse.resizeFinished = resizeDone
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return resizeResponse
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
pvc, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
resizeResponse.pvc = pvc
if err != nil {
resizeResponse.err = fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
// On retry, NodeExpandVolume will be called again but do nothing
return resizeResponse
}
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
}
// somehow a resize operation was queued, but we can not perform any resizing because
// prechecks required for node expansion failed. Kubelet should not retry expanding the volume.
resizeResponse.assumeResizeOpAsFinished = true
return resizeResponse
}
// legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support
// recovery from volume expansion failure
func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) {
pvc := resizeOp.pvc
pv := resizeOp.pv
volumeToMount := resizeOp.vmt
rsOpts := resizeOp.pluginResizeOpts
actualStateOfWorld := resizeOp.actualStateOfWorld
expandableVolumePlugin := resizeOp.volumePlugin
var err error
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
if pvcStatusCap.Cmp(pvSpecCap) < 0 {
// File system resize was requested, proceed
klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
// if driver returned FailedPrecondition error that means
// volume expansion should not be retried on this node but
// expansion operation should not block mounting
if volumetypes.IsFailedPreconditionError(resizeErr) {
actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed with %v", resizeErr).Error())
return true, nil
}
return false, resizeErr
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return false, nil
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
// File system resize succeeded, now update the PVC's Capacity to match the PV's
_, err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
}
return true, nil
}
return true, nil
}
func permitNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
// if pvc.Status.Cap is >= pv.Spec.Cap then volume is already expanded
if pvcStatusCap.Cmp(pvSpecCap) >= 0 {
return false
}
resizeStatus := pvc.Status.ResizeStatus
// if resizestatus is nil or NodeExpansionInProgress or NodeExpansionPending then we should allow volume expansion on
// the node to proceed. We are making an exception for resizeStatus being nil because it will support use cases where
// resizeStatus may not be set (old control-plane expansion controller etc).
if resizeStatus == nil || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionPending || *resizeStatus == v1.PersistentVolumeClaimNodeExpansionInProgress {
return true
} else {
klog.Infof("volume %s/%s can not be expanded because resizeStaus is: %s", pvc.Namespace, pvc.Name, *resizeStatus)
return false
}
}
func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)

View File

@ -17,6 +17,12 @@ limitations under the License.
package operationexecutor
import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"os"
"testing"
@ -27,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/volume"
@ -111,6 +116,267 @@ func TestOperationGenerator_GenerateUnmapVolumeFunc_PluginName(t *testing.T) {
}
}
func TestOperationGenerator_GenerateExpandAndRecoverVolumeFunc(t *testing.T) {
var tests = []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
recoverFeatureGate bool
disableNodeExpansion bool
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedAllocatedSize resource.Quantity
expectResizeCall bool
}{
{
name: "pvc.spec.size > pv.spec.size, recover_expansion=on",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNoExpansionInProgress),
pv: getTestPV("test-vol0", "1G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending,
expectedAllocatedSize: resource.MustParse("2G"),
expectResizeCall: true,
},
{
name: "pvc.spec.size = pv.spec.size, recover_expansion=on",
pvc: getTestPVC("test-vol0", "1G", "1G", "", v1.PersistentVolumeClaimNoExpansionInProgress),
pv: getTestPV("test-vol0", "1G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending,
expectedAllocatedSize: resource.MustParse("1G"),
expectResizeCall: true,
},
{
name: "pvc.spec.size = pv.spec.size, recover_expansion=on",
pvc: getTestPVC("test-vol0", "1G", "1G", "1G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "1G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending,
expectedAllocatedSize: resource.MustParse("1G"),
expectResizeCall: false,
},
{
name: "pvc.spec.size > pv.spec.size, recover_expansion=on, disable_node_expansion=true",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNoExpansionInProgress),
pv: getTestPV("test-vol0", "1G"),
disableNodeExpansion: true,
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectedAllocatedSize: resource.MustParse("2G"),
expectResizeCall: true,
},
{
name: "pv.spec.size >= pvc.spec.size, recover_expansion=on, resize_status=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionPending,
expectedAllocatedSize: resource.MustParse("2G"),
expectResizeCall: false,
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
fakePlugin.DisableNodeExpansion = test.disableNodeExpansion
pvc := test.pvc
pv := test.pv
og := getTestOperationGenerator(volumePluginMgr, pvc, pv)
rsOpts := inTreeResizeOpts{
pvc: pvc,
pv: pv,
resizerName: fakePlugin.GetPluginName(),
volumePlugin: fakePlugin,
}
ogInstance, _ := og.(*operationGenerator)
expansionResponse := ogInstance.expandAndRecoverFunction(rsOpts)
if expansionResponse.err != nil {
t.Fatalf("GenerateExpandAndRecoverVolumeFunc failed: %v", expansionResponse.err)
}
updatedPVC := expansionResponse.pvc
assert.Equal(t, *updatedPVC.Status.ResizeStatus, test.expectedResizeStatus)
actualAllocatedSize := updatedPVC.Status.AllocatedResources.Storage()
if test.expectedAllocatedSize.Cmp(*actualAllocatedSize) != 0 {
t.Fatalf("GenerateExpandAndRecoverVolumeFunc failed: expected allocated size %s, got %s", test.expectedAllocatedSize.String(), actualAllocatedSize.String())
}
if test.expectResizeCall != expansionResponse.resizeCalled {
t.Fatalf("GenerateExpandAndRecoverVolumeFunc failed: expected resize called %t, got %t", test.expectResizeCall, expansionResponse.resizeCalled)
}
})
}
}
func TestOperationGenerator_callNodeExpansionOnPlugin(t *testing.T) {
var tests = []struct {
name string
pvc *v1.PersistentVolumeClaim
pv *v1.PersistentVolume
recoverFeatureGate bool
// expectations of test
expectedResizeStatus v1.PersistentVolumeClaimResizeStatus
expectedStatusSize resource.Quantity
expectResizeCall bool
assumeResizeOpAsFinished bool
expectError bool
}{
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_failed",
pvc: getTestPVC("test-vol0", "2G", "1G", "", v1.PersistentVolumeClaimNodeExpansionFailed),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: false,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending",
pvc: getTestPVC("test-vol0", "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV("test-vol0", "2G"),
recoverFeatureGate: true,
expectedResizeStatus: v1.PersistentVolumeClaimNoExpansionInProgress,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("2G"),
},
{
name: "pv.spec.cap > pvc.status.cap, resizeStatus=node_expansion_pending, reize_op=failing",
pvc: getTestPVC(volumetesting.AlwaysFailNodeExpansion, "2G", "1G", "2G", v1.PersistentVolumeClaimNodeExpansionPending),
pv: getTestPV(volumetesting.AlwaysFailNodeExpansion, "2G"),
recoverFeatureGate: true,
expectError: true,
expectedResizeStatus: v1.PersistentVolumeClaimNodeExpansionFailed,
expectResizeCall: true,
assumeResizeOpAsFinished: true,
expectedStatusSize: resource.MustParse("1G"),
},
}
for i := range tests {
test := tests[i]
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RecoverVolumeExpansionFailure, test.recoverFeatureGate)()
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
pvc := test.pvc
pv := test.pv
pod := getTestPod("test-pod", pvc.Name)
og := getTestOperationGenerator(volumePluginMgr, pvc, pv)
vmt := VolumeToMount{
Pod: pod,
VolumeName: v1.UniqueVolumeName(pv.Name),
VolumeSpec: volume.NewSpecFromPersistentVolume(pv, false),
}
resizeOp := nodeResizeOperationOpts{
pvc: pvc,
pv: pv,
volumePlugin: fakePlugin,
vmt: vmt,
actualStateOfWorld: nil,
}
ogInstance, _ := og.(*operationGenerator)
expansionResponse := ogInstance.callNodeExpandOnPlugin(resizeOp)
pvc = expansionResponse.pvc
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
if !test.expectError && expansionResponse.err != nil {
t.Errorf("For test %s, expected no error got: %v", test.name, expansionResponse.err)
}
if test.expectError && expansionResponse.err == nil {
t.Errorf("For test %s, expected error but got none", test.name)
}
if test.expectResizeCall != expansionResponse.resizeCalled {
t.Errorf("For test %s, expected resize called %t, got %t", test.name, test.expectResizeCall, expansionResponse.resizeCalled)
}
if test.assumeResizeOpAsFinished != expansionResponse.assumeResizeOpAsFinished {
t.Errorf("For test %s, expected assumeResizeOpAsFinished %t, got %t", test.name, test.assumeResizeOpAsFinished, expansionResponse.assumeResizeOpAsFinished)
}
if test.expectedResizeStatus != *pvc.Status.ResizeStatus {
t.Errorf("For test %s, expected resizeStatus %v, got %v", test.name, test.expectedResizeStatus, *pvc.Status.ResizeStatus)
}
if pvcStatusCap.Cmp(test.expectedStatusSize) != 0 {
t.Errorf("For test %s, expected status size %s, got %s", test.name, test.expectedStatusSize.String(), pvcStatusCap.String())
}
})
}
}
func getTestPod(podName, pvcName string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
UID: "test-pod-uid",
Namespace: "ns",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: pvcName,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
},
},
},
},
},
}
}
func getTestPVC(volumeName string, specSize, statusSize, allocatedSize string, resizeStatus v1.PersistentVolumeClaimResizeStatus) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "claim01",
Namespace: "ns",
UID: "test-uid",
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse(specSize)}},
VolumeName: volumeName,
},
Status: v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
},
}
if len(statusSize) > 0 {
pvc.Status.Capacity = v1.ResourceList{v1.ResourceStorage: resource.MustParse(statusSize)}
}
if len(allocatedSize) > 0 {
pvc.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: resource.MustParse(allocatedSize)}
}
if len(resizeStatus) > 0 {
pvc.Status.ResizeStatus = &resizeStatus
}
return pvc
}
func getTestPV(volumeName string, specSize string) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
UID: "test-uid",
},
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Capacity: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(specSize),
},
},
Status: v1.PersistentVolumeStatus{
Phase: v1.VolumeBound,
},
}
}
func findMetricWithNameAndLabels(metricFamilyName string, labelFilter map[string]string) *io_prometheus_client.Metric {
metricFamily := getMetricFamily(metricFamilyName)
if metricFamily == nil {
@ -145,8 +411,8 @@ func isLabelsMatchWithMetric(labelFilter map[string]string, metric *io_prometheu
return true
}
func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr) OperationGenerator {
fakeKubeClient := fakeclient.NewSimpleClientset()
func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr, objects ...runtime.Object) OperationGenerator {
fakeKubeClient := fakeclient.NewSimpleClientset(objects...)
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
operationGenerator := NewOperationGenerator(

View File

@ -28,7 +28,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/mount-utils"
@ -61,7 +63,7 @@ func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string {
func UpdatePVSize(
pv *v1.PersistentVolume,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
kubeClient clientset.Interface) (*v1.PersistentVolume, error) {
pvClone := pv.DeepCopy()
pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
@ -84,7 +86,8 @@ func AddAnnPreResizeCapacity(
}
pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String()
return PatchPV(pv, pvClone, kubeClient)
_, err := PatchPV(pv, pvClone, kubeClient)
return err
}
// DeleteAnnPreResizeCapacity deletes volume.alpha.kubernetes.io/pre-resize-capacity from the pv
@ -97,35 +100,35 @@ func DeleteAnnPreResizeCapacity(
}
pvClone := pv.DeepCopy()
delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity)
return PatchPV(pv, pvClone, kubeClient)
_, err := PatchPV(pv, pvClone, kubeClient)
return err
}
// PatchPV creates and executes a patch for pv
func PatchPV(
oldPV *v1.PersistentVolume,
newPV *v1.PersistentVolume,
kubeClient clientset.Interface) error {
kubeClient clientset.Interface) (*v1.PersistentVolume, error) {
oldData, err := json.Marshal(oldPV)
if err != nil {
return fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err)
return oldPV, fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err)
}
newData, err := json.Marshal(newPV)
if err != nil {
return fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err)
return oldPV, fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
if err != nil {
return fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err)
return oldPV, fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err)
}
_, err = kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
updatedPV, err := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err)
return oldPV, fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err)
}
return nil
return updatedPV, nil
}
// MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress
@ -147,6 +150,23 @@ func MarkResizeInProgressWithResizer(
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
}
func MarkControllerReisizeInProgress(pvc *v1.PersistentVolumeClaim, resizerName string, newSize resource.Quantity, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimResizing,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}
controllerExpansionInProgress := v1.PersistentVolumeClaimControllerExpansionInProgress
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
newPVC := pvc.DeepCopy()
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
newPVC.Status.ResizeStatus = &controllerExpansionInProgress
newPVC.Status.AllocatedResources = v1.ResourceList{v1.ResourceStorage: newSize}
newPVC = setResizer(newPVC, resizerName)
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
}
// SetClaimResizer sets resizer annotation on PVC
func SetClaimResizer(
pvc *v1.PersistentVolumeClaim,
@ -168,7 +188,7 @@ func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.Persisten
// MarkForFSResize marks file system resizing as pending
func MarkForFSResize(
pvc *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) error {
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
pvcCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimFileSystemResizePending,
Status: v1.ConditionTrue,
@ -177,16 +197,20 @@ func MarkForFSResize(
}
conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
newPVC := pvc.DeepCopy()
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
expansionPendingOnNode := v1.PersistentVolumeClaimNodeExpansionPending
newPVC.Status.ResizeStatus = &expansionPendingOnNode
}
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return err
updatedPVC, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return updatedPVC, err
}
// MarkResizeFinished marks all resizing as done
func MarkResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
return MarkFSResizeFinished(pvc, newSize, kubeClient)
}
@ -194,12 +218,65 @@ func MarkResizeFinished(
func MarkFSResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
// if RecoverVolumeExpansionFailure is enabled, we need to reset ResizeStatus back to nil
if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
expansionFinished := v1.PersistentVolumeClaimNoExpansionInProgress
newPVC.Status.ResizeStatus = &expansionFinished
}
newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return err
updatedPVC, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return updatedPVC, err
}
func MarkControllerExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
expansionFailedOnController := v1.PersistentVolumeClaimControllerExpansionFailed
newPVC := pvc.DeepCopy()
newPVC.Status.ResizeStatus = &expansionFailedOnController
patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */)
if err != nil {
return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).
Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr)
}
return updatedClaim, nil
}
// MarkNodeExpansionFailed marks a PVC for node expansion as failed. Kubelet should not retry expansion
// of volumes which are in failed state.
func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
expansionFailedOnNode := v1.PersistentVolumeClaimNodeExpansionFailed
newPVC := pvc.DeepCopy()
newPVC.Status.ResizeStatus = &expansionFailedOnNode
patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */)
if err != nil {
return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).
Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr)
}
return updatedClaim, nil
}
// MarkNodeExpansionInProgress marks pvc expansion in progress on node
func MarkNodeExpansionInProgress(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
nodeExpansionInProgress := v1.PersistentVolumeClaimNodeExpansionInProgress
newPVC := pvc.DeepCopy()
newPVC.Status.ResizeStatus = &nodeExpansionInProgress
updatedPVC, err := PatchPVCStatus(pvc /* oldPVC */, newPVC, kubeClient)
return updatedPVC, err
}
// PatchPVCStatus updates PVC status using PATCH verb
@ -210,22 +287,22 @@ func PatchPVCStatus(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := createPVCPatch(oldPVC, newPVC)
patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */)
if err != nil {
return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if updateErr != nil {
return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
}
return updatedClaim, nil
}
func createPVCPatch(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim) ([]byte, error) {
newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
oldData, err := json.Marshal(oldPVC)
if err != nil {
return nil, fmt.Errorf("failed to marshal old data: %v", err)
@ -241,9 +318,11 @@ func createPVCPatch(
return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err)
}
patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("failed to add resource version: %v", err)
if addResourceVersionCheck {
patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
if err != nil {
return nil, fmt.Errorf("failed to add resource version: %v", err)
}
}
return patchBytes, nil

View File

@ -155,7 +155,7 @@ func TestCreatePVCPatch(t *testing.T) {
pvc2.Status.Capacity = v1.ResourceList{
v1.ResourceName("size"): resource.MustParse("10G"),
}
patchBytes, err := createPVCPatch(pvc1, pvc2)
patchBytes, err := createPVCPatch(pvc1, pvc2, true /* addResourceVersionCheck */)
if err != nil {
t.Errorf("error creating patch bytes %v", err)
}