From d119899e2294c66ebb917e1f1306c635640064e0 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Tue, 28 May 2019 15:21:23 -0400 Subject: [PATCH] handle review comments --- pkg/controller/volume/expand/BUILD | 4 + .../volume/expand/expand_controller.go | 9 +- .../volume/expand/expand_controller_test.go | 86 +++++++++++++++---- .../util/operationexecutor/fakegenerator.go | 44 +++++----- pkg/volume/util/resize_util.go | 16 ---- 5 files changed, 103 insertions(+), 56 deletions(-) diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 7034d0de45c..38c33353a3f 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -70,12 +70,16 @@ go_test( "//pkg/volume:go_default_library", "//pkg/volume/awsebs:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", ], diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index af278693b8b..4c305ff1644 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -194,6 +194,8 @@ func (expc *expandController) processNextWorkItem() bool { return true } +// syncHandler performs actual expansion of volume. If an error is returned +// from this function - PVC will be requeued for resizing. func (expc *expandController) syncHandler(key string) error { namespace, name, err := kcache.SplitMetaNamespaceKey(key) if err != nil { @@ -228,7 +230,7 @@ func (expc *expandController) syncHandler(key string) error { class, err := expc.classLister.Get(claimClass) if err != nil { - klog.V(4).Infof("volume expansion is not supported for PVC: %s;can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) + klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err) return nil } @@ -245,7 +247,9 @@ func (expc *expandController) syncHandler(key string) error { } expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg)) klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg) - return err + // If we are expecting that an external plugin will handle resizing this volume then + // is no point in requeuing this PVC. + return nil } if volumePlugin.IsMigratedToCSI() { @@ -263,7 +267,6 @@ func (expc *expandController) syncHandler(key string) error { errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) return fmt.Errorf(errorMsg) - } return nil } diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index 517e23ac4c0..15cd524f30b 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -17,15 +17,21 @@ limitations under the License. package expand import ( + "encoding/json" + "fmt" + "reflect" "regexp" "testing" "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + coretesting "k8s.io/client-go/testing" featuregatetesting "k8s.io/component-base/featuregate/testing" csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/controller" @@ -34,24 +40,20 @@ import ( "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/awsebs" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) func TestSyncHandler(t *testing.T) { - fakeKubeClient := controllervolumetesting.CreateTestClient() - informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - pvInformer := informerFactory.Core().V1().PersistentVolumes() - storageClassInformer := informerFactory.Storage().V1().StorageClasses() - tests := []struct { name string csiMigrationEnabled bool - expansionCalled bool storageClass *storagev1.StorageClass pvcKey string pv *v1.PersistentVolume pvc *v1.PersistentVolumeClaim + expansionCalled bool hasError bool + expectedAnnotation map[string]string }{ { name: "when pvc has no PV binding", @@ -72,12 +74,13 @@ func TestSyncHandler(t *testing.T) { pvcKey: "default/missing-sc-pvc", }, { - name: "when pvc and pv has everything for in-tree plugin", - pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), - pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), - storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), - pvcKey: "default/good-pvc", - expansionCalled: true, + name: "when pvc and pv has everything for in-tree plugin", + pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), + storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/good-pvc", + expansionCalled: true, + expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName}, }, { name: "when csi migration is enabled for a in-tree plugin", @@ -86,17 +89,34 @@ func TestSyncHandler(t *testing.T) { pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"), storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName), pvcKey: "default/csi-pvc", + expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName}, + }, + { + name: "for csi plugin without migration path", + pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"), + pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "resizable4", "ceph-csi-pvc-vol-5"), + storageClass: getFakeStorageClass("resizable4", "com.csi.ceph"), + pvcKey: "default/ceph-csi-pvc", + expansionCalled: false, + hasError: false, }, } for _, tc := range tests { test := tc + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + storageClassInformer := informerFactory.Storage().V1().StorageClasses() + + pvc := test.pvc if tc.pv != nil { informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv) } if tc.pvc != nil { - informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(tc.pvc) + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(pvc) } allPlugins := []volume.VolumePlugin{} allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...) @@ -116,21 +136,57 @@ func TestSyncHandler(t *testing.T) { var expController *expandController expController, _ = expc.(*expandController) var expansionCalled bool - expController.operationGenerator = operationexecutor.NewFakeOgCounter(func() (error, error) { + expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) { expansionCalled = true return nil, nil }) + fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action coretesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + patchActionaction, _ := action.(coretesting.PatchAction) + pvc, err = applyPVCPatch(pvc, patchActionaction.GetPatch()) + if err != nil { + return false, nil, err + } + return true, pvc, nil + } + return true, pvc, nil + }) + err = expController.syncHandler(test.pvcKey) if err != nil && !test.hasError { t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err) } + + if err == nil && test.hasError { + t.Fatalf("for: %s; unexpected success", test.name) + } if expansionCalled != test.expansionCalled { t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled) } + + if len(test.expectedAnnotation) != 0 && !reflect.DeepEqual(test.expectedAnnotation, pvc.Annotations) { + t.Fatalf("for: %s; expected %v annotations, got %v", test.name, test.expectedAnnotation, pvc.Annotations) + } } } +func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.PersistentVolumeClaim, error) { + pvcData, err := json.Marshal(originalPVC) + if err != nil { + return nil, fmt.Errorf("failed to marshal pvc with %v", err) + } + updated, err := strategicpatch.StrategicMergePatch(pvcData, patch, v1.PersistentVolumeClaim{}) + if err != nil { + return nil, fmt.Errorf("failed to apply patch on pvc %v", err) + } + updatedPVC := &v1.PersistentVolumeClaim{} + if err := json.Unmarshal(updated, updatedPVC); err != nil { + return nil, fmt.Errorf("failed to unmarshal updated pvc : %v", err) + } + return updatedPVC, nil +} + func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume { pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{Name: volumeName}, diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go index 03cf02e2aeb..0dc2c9e4143 100644 --- a/pkg/volume/util/operationexecutor/fakegenerator.go +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -1,5 +1,5 @@ /* -Copyright 2016 The Kubernetes Authors. +Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -26,84 +26,84 @@ import ( volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) -// fakeOgCounter is a simple OperationGenerator which counts number of times a function +// fakeOGCounter is a simple OperationGenerator which counts number of times a function // has been caled -type fakeOgCounter struct { +type fakeOGCounter struct { // calledFuncs stores name and count of functions calledFuncs map[string]int opFunc func() (error, error) } -var _ OperationGenerator = &fakeOgCounter{} +var _ OperationGenerator = &fakeOGCounter{} -// NewFakeOgCounter returns a OperationGenerator -func NewFakeOgCounter(opFunc func() (error, error)) OperationGenerator { - return &fakeOgCounter{ +// NewFakeOGCounter returns a OperationGenerator +func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator { + return &fakeOGCounter{ calledFuncs: map[string]int{}, opFunc: opFunc, } } -func (f *fakeOgCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { +func (f *fakeOGCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { return f.recordFuncCall("GenerateMountVolumeFunc") } -func (f *fakeOgCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { +func (f *fakeOGCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { return f.recordFuncCall("GenerateAttachVolumeFunc") } -func (f *fakeOgCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateDetachVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil } -func (f *fakeOgCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil } -func (f *fakeOgCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateMapVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil } -func (f *fakeOgCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { +func (f *fakeOGCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { return nil } -func (f *fakeOgCounter) GenerateBulkVolumeVerifyFunc( +func (f *fakeOGCounter) GenerateBulkVolumeVerifyFunc( map[types.NodeName][]*volume.Spec, string, map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil } -func (f *fakeOgCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandVolumeFunc"), nil } -func (f *fakeOgCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { +func (f *fakeOGCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil } -func (f *fakeOgCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { +func (f *fakeOGCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { if _, ok := f.calledFuncs[name]; ok { f.calledFuncs[name]++ } diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index f3ac3678015..b35daba380d 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -82,22 +82,6 @@ func UpdatePVSize( return nil } -// MarkResizeInProgress marks cloudprovider resizing as in progress -func MarkResizeInProgress( - pvc *v1.PersistentVolumeClaim, - kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { - // Mark PVC as Resize Started - progressCondition := v1.PersistentVolumeClaimCondition{ - Type: v1.PersistentVolumeClaimResizing, - Status: v1.ConditionTrue, - LastTransitionTime: metav1.Now(), - } - conditions := []v1.PersistentVolumeClaimCondition{progressCondition} - newPVC := pvc.DeepCopy() - newPVC = MergeResizeConditionOnPVC(newPVC, conditions) - return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) -} - // MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress // and also annotates the PVC with the name of the resizer. func MarkResizeInProgressWithResizer(