diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 185fcb75ca1..7034d0de45c 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -66,6 +66,17 @@ go_test( deps = [ "//pkg/controller:go_default_library", "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//pkg/features:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/awsebs:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", ], ) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 29997d26eb9..af278693b8b 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -228,7 +228,7 @@ func (expc *expandController) syncHandler(key string) error { class, err := expc.classLister.Get(claimClass) if err != nil { - klog.V(4).Infof("volume expansion is not supported for PVC: %s, can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) + klog.V(4).Infof("volume expansion is not supported for PVC: %s;can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) return nil } @@ -297,7 +297,7 @@ func (expc *expandController) Run(stopCh <-chan struct{}) { klog.Infof("Starting expand controller") defer klog.Infof("Shutting down expand controller") - if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) { + if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) { return } diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go index c01d4f69f72..517e23ac4c0 100644 --- a/pkg/controller/volume/expand/expand_controller_test.go +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -17,11 +17,23 @@ limitations under the License. package expand import ( + "regexp" "testing" + "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + featuregatetesting "k8s.io/component-base/featuregate/testing" + csitranslationplugins "k8s.io/csi-translation-lib/plugins" "k8s.io/kubernetes/pkg/controller" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/awsebs" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) func TestSyncHandler(t *testing.T) { @@ -31,17 +43,140 @@ func TestSyncHandler(t *testing.T) { pvInformer := informerFactory.Core().V1().PersistentVolumes() storageClassInformer := informerFactory.Storage().V1().StorageClasses() - expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, nil) - if err != nil { - t.Fatalf("error creating expand controller : %v", err) + tests := []struct { + name string + csiMigrationEnabled bool + expansionCalled bool + storageClass *storagev1.StorageClass + pvcKey string + pv *v1.PersistentVolume + pvc *v1.PersistentVolumeClaim + hasError bool + }{ + { + name: "when pvc has no PV binding", + pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "", ""), + pvcKey: "default/no-pv-pvc", + hasError: true, + }, + { + name: "when pvc has no storageclass", + pv: getFakePersistentVolume("vol-1", csitranslationplugins.AWSEBSInTreePluginName, "no-sc-pvc-vol-1"), + pvc: getFakePersistentVolumeClaim("no-sc-pvc", "vol-1", "", "no-sc-pvc-vol-1"), + pvcKey: "default/no-sc-pvc", + }, + { + name: "when pvc storageclass is missing", + pv: getFakePersistentVolume("vol-2", csitranslationplugins.AWSEBSInTreePluginName, "missing-sc-pvc-vol-2"), + pvc: getFakePersistentVolumeClaim("missing-sc-pvc", "vol-2", "resizable", "missing-sc-pvc-vol-2"), + pvcKey: "default/missing-sc-pvc", + }, + { + name: "when pvc and pv has everything for in-tree plugin", + pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"), + pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"), + storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/good-pvc", + expansionCalled: true, + }, + { + name: "when csi migration is enabled for a in-tree plugin", + csiMigrationEnabled: true, + pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"), + pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"), + storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName), + pvcKey: "default/csi-pvc", + }, } - var expController *expandController - expController, _ = expc.(*expandController) + for _, tc := range tests { + test := tc + if tc.pv != nil { + informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv) + } - err = expController.syncHandler("default/foo") - if err != nil { - t.Fatalf("error running sync handler : %v", err) + if tc.pvc != nil { + informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(tc.pvc) + } + allPlugins := []volume.VolumePlugin{} + allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...) + if tc.storageClass != nil { + informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.storageClass) + } + expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins) + if err != nil { + t.Fatalf("error creating expand controller : %v", err) + } + + if test.csiMigrationEnabled { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)() + } + + var expController *expandController + expController, _ = expc.(*expandController) + var expansionCalled bool + expController.operationGenerator = operationexecutor.NewFakeOgCounter(func() (error, error) { + expansionCalled = true + return nil, nil + }) + + err = expController.syncHandler(test.pvcKey) + if err != nil && !test.hasError { + t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err) + } + if expansionCalled != test.expansionCalled { + t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled) + } + } +} + +func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: volumeName}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{}, + ClaimRef: &v1.ObjectReference{ + Namespace: "default", + }, + }, + } + if pvcUID != "" { + pv.Spec.ClaimRef.UID = pvcUID + } + + if matched, _ := regexp.MatchString(`csi`, pluginName); matched { + pv.Spec.PersistentVolumeSource.CSI = &v1.CSIPersistentVolumeSource{ + Driver: pluginName, + VolumeHandle: volumeName, + } + } else { + pv.Spec.PersistentVolumeSource.AWSElasticBlockStore = &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: volumeName, + FSType: "ext4", + } + } + return pv +} + +func getFakePersistentVolumeClaim(pvcName, volumeName, scName string, uid types.UID) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid}, + Spec: v1.PersistentVolumeClaimSpec{}, + } + if volumeName != "" { + pvc.Spec.VolumeName = volumeName + } + + if scName != "" { + pvc.Spec.StorageClassName = &scName + } + return pvc +} + +func getFakeStorageClass(scName, pluginName string) *storagev1.StorageClass { + return &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{Name: scName}, + Provisioner: pluginName, } - } diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 8ae0ce393bc..8c12938b53c 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -9,6 +9,7 @@ load( go_library( name = "go_default_library", srcs = [ + "fakegenerator.go", "operation_executor.go", "operation_generator.go", ], diff --git a/pkg/volume/util/operationexecutor/fakegenerator.go b/pkg/volume/util/operationexecutor/fakegenerator.go new file mode 100644 index 00000000000..03cf02e2aeb --- /dev/null +++ b/pkg/volume/util/operationexecutor/fakegenerator.go @@ -0,0 +1,115 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + "time" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" +) + +// fakeOgCounter is a simple OperationGenerator which counts number of times a function +// has been caled +type fakeOgCounter struct { + // calledFuncs stores name and count of functions + calledFuncs map[string]int + opFunc func() (error, error) +} + +var _ OperationGenerator = &fakeOgCounter{} + +// NewFakeOgCounter returns a OperationGenerator +func NewFakeOgCounter(opFunc func() (error, error)) OperationGenerator { + return &fakeOgCounter{ + calledFuncs: map[string]int{}, + opFunc: opFunc, + } +} + +func (f *fakeOgCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { + return f.recordFuncCall("GenerateMountVolumeFunc") +} + +func (f *fakeOgCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { + return f.recordFuncCall("GenerateAttachVolumeFunc") +} + +func (f *fakeOgCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateDetachVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil +} + +func (f *fakeOgCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil +} + +func (f *fakeOgCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateMapVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil +} + +func (f *fakeOgCounter) GetVolumePluginMgr() *volume.VolumePluginMgr { + return nil +} + +func (f *fakeOgCounter) GenerateBulkVolumeVerifyFunc( + map[types.NodeName][]*volume.Spec, + string, + map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil +} + +func (f *fakeOgCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFunc"), nil +} + +func (f *fakeOgCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil +} + +func (f *fakeOgCounter) recordFuncCall(name string) volumetypes.GeneratedOperations { + if _, ok := f.calledFuncs[name]; ok { + f.calledFuncs[name]++ + } + ops := volumetypes.GeneratedOperations{ + OperationName: name, + OperationFunc: f.opFunc, + } + return ops +} diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 7300d843992..9ae816bffd9 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -683,7 +683,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // resizeFileSystem will resize the file system if user has requested a resize of // underlying persistent volume and is allowed to do so. - resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) + resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) @@ -721,7 +721,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( // - Volume does not support DeviceMounter interface. // - In case of CSI the volume does not have node stage_unstage capability. if !resizeDone { - resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) + resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions) if resizeError != nil { klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) @@ -760,7 +760,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } } -func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions, pluginName string) (bool, error) { +func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName) return true, nil @@ -1577,6 +1577,18 @@ func (og *operationGenerator) GenerateExpandVolumeFunc( func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err == nil { + volumeToMount.VolumeSpec = csiSpec + } + // If we have an error here we ignore it, the metric emitted will then be for the + // in-tree plugin. This error case(skipped one) will also trigger an error + // while the generated function is executed. And those errors will be handled during the execution of the generated + // function with a back off policy. + } volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { @@ -1603,7 +1615,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err) } resizeOptions.DeviceMountPath = dmp - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) + resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr } @@ -1623,7 +1635,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( resizeOptions.DeviceMountPath = volumeMounter.GetPath() resizeOptions.CSIVolumePhase = volume.CSIVolumePublished - resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) + resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions) if simpleErr != nil || detailedErr != nil { return simpleErr, detailedErr } @@ -1651,9 +1663,8 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, - resizeOptions volume.NodeResizeOptions, - pluginName string) (bool, error, error) { - resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions, pluginName) + resizeOptions volume.NodeResizeOptions) (bool, error, error) { + resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions) if err != nil { klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err) e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err)