handle review comments

This commit is contained in:
Hemant Kumar 2019-05-28 15:21:23 -04:00
parent 33b95ebddb
commit d119899e22
5 changed files with 103 additions and 56 deletions

View File

@ -70,12 +70,16 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/awsebs:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
],

View File

@ -194,6 +194,8 @@ func (expc *expandController) processNextWorkItem() bool {
return true
}
// syncHandler performs actual expansion of volume. If an error is returned
// from this function - PVC will be requeued for resizing.
func (expc *expandController) syncHandler(key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
@ -228,7 +230,7 @@ func (expc *expandController) syncHandler(key string) error {
class, err := expc.classLister.Get(claimClass)
if err != nil {
klog.V(4).Infof("volume expansion is not supported for PVC: %s;can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass)
klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
return nil
}
@ -245,7 +247,9 @@ func (expc *expandController) syncHandler(key string) error {
}
expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
return err
// If we are expecting that an external plugin will handle resizing this volume then
// is no point in requeuing this PVC.
return nil
}
if volumePlugin.IsMigratedToCSI() {
@ -263,7 +267,6 @@ func (expc *expandController) syncHandler(key string) error {
errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
}
return nil
}

View File

@ -17,15 +17,21 @@ limitations under the License.
package expand
import (
"encoding/json"
"fmt"
"reflect"
"regexp"
"testing"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coretesting "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
csitranslationplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/controller"
@ -34,24 +40,20 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/awsebs"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
func TestSyncHandler(t *testing.T) {
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
storageClassInformer := informerFactory.Storage().V1().StorageClasses()
tests := []struct {
name string
csiMigrationEnabled bool
expansionCalled bool
storageClass *storagev1.StorageClass
pvcKey string
pv *v1.PersistentVolume
pvc *v1.PersistentVolumeClaim
expansionCalled bool
hasError bool
expectedAnnotation map[string]string
}{
{
name: "when pvc has no PV binding",
@ -72,12 +74,13 @@ func TestSyncHandler(t *testing.T) {
pvcKey: "default/missing-sc-pvc",
},
{
name: "when pvc and pv has everything for in-tree plugin",
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"),
storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName),
pvcKey: "default/good-pvc",
expansionCalled: true,
name: "when pvc and pv has everything for in-tree plugin",
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"),
storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName),
pvcKey: "default/good-pvc",
expansionCalled: true,
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName},
},
{
name: "when csi migration is enabled for a in-tree plugin",
@ -86,17 +89,34 @@ func TestSyncHandler(t *testing.T) {
pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"),
storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName),
pvcKey: "default/csi-pvc",
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName},
},
{
name: "for csi plugin without migration path",
pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"),
pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "resizable4", "ceph-csi-pvc-vol-5"),
storageClass: getFakeStorageClass("resizable4", "com.csi.ceph"),
pvcKey: "default/ceph-csi-pvc",
expansionCalled: false,
hasError: false,
},
}
for _, tc := range tests {
test := tc
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
storageClassInformer := informerFactory.Storage().V1().StorageClasses()
pvc := test.pvc
if tc.pv != nil {
informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv)
}
if tc.pvc != nil {
informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(tc.pvc)
informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(pvc)
}
allPlugins := []volume.VolumePlugin{}
allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...)
@ -116,21 +136,57 @@ func TestSyncHandler(t *testing.T) {
var expController *expandController
expController, _ = expc.(*expandController)
var expansionCalled bool
expController.operationGenerator = operationexecutor.NewFakeOgCounter(func() (error, error) {
expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) {
expansionCalled = true
return nil, nil
})
fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action coretesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
patchActionaction, _ := action.(coretesting.PatchAction)
pvc, err = applyPVCPatch(pvc, patchActionaction.GetPatch())
if err != nil {
return false, nil, err
}
return true, pvc, nil
}
return true, pvc, nil
})
err = expController.syncHandler(test.pvcKey)
if err != nil && !test.hasError {
t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err)
}
if err == nil && test.hasError {
t.Fatalf("for: %s; unexpected success", test.name)
}
if expansionCalled != test.expansionCalled {
t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled)
}
if len(test.expectedAnnotation) != 0 && !reflect.DeepEqual(test.expectedAnnotation, pvc.Annotations) {
t.Fatalf("for: %s; expected %v annotations, got %v", test.name, test.expectedAnnotation, pvc.Annotations)
}
}
}
func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.PersistentVolumeClaim, error) {
pvcData, err := json.Marshal(originalPVC)
if err != nil {
return nil, fmt.Errorf("failed to marshal pvc with %v", err)
}
updated, err := strategicpatch.StrategicMergePatch(pvcData, patch, v1.PersistentVolumeClaim{})
if err != nil {
return nil, fmt.Errorf("failed to apply patch on pvc %v", err)
}
updatedPVC := &v1.PersistentVolumeClaim{}
if err := json.Unmarshal(updated, updatedPVC); err != nil {
return nil, fmt.Errorf("failed to unmarshal updated pvc : %v", err)
}
return updatedPVC, nil
}
func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},

View File

@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -26,84 +26,84 @@ import (
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// fakeOgCounter is a simple OperationGenerator which counts number of times a function
// fakeOGCounter is a simple OperationGenerator which counts number of times a function
// has been caled
type fakeOgCounter struct {
type fakeOGCounter struct {
// calledFuncs stores name and count of functions
calledFuncs map[string]int
opFunc func() (error, error)
}
var _ OperationGenerator = &fakeOgCounter{}
var _ OperationGenerator = &fakeOGCounter{}
// NewFakeOgCounter returns a OperationGenerator
func NewFakeOgCounter(opFunc func() (error, error)) OperationGenerator {
return &fakeOgCounter{
// NewFakeOGCounter returns a OperationGenerator
func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator {
return &fakeOGCounter{
calledFuncs: map[string]int{},
opFunc: opFunc,
}
}
func (f *fakeOgCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
func (f *fakeOGCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
return f.recordFuncCall("GenerateMountVolumeFunc")
}
func (f *fakeOgCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil
}
func (f *fakeOgCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
func (f *fakeOGCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
return f.recordFuncCall("GenerateAttachVolumeFunc")
}
func (f *fakeOgCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateDetachVolumeFunc"), nil
}
func (f *fakeOgCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil
}
func (f *fakeOgCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil
}
func (f *fakeOgCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil
}
func (f *fakeOgCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateMapVolumeFunc"), nil
}
func (f *fakeOgCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil
}
func (f *fakeOgCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil
}
func (f *fakeOgCounter) GetVolumePluginMgr() *volume.VolumePluginMgr {
func (f *fakeOGCounter) GetVolumePluginMgr() *volume.VolumePluginMgr {
return nil
}
func (f *fakeOgCounter) GenerateBulkVolumeVerifyFunc(
func (f *fakeOGCounter) GenerateBulkVolumeVerifyFunc(
map[types.NodeName][]*volume.Spec,
string,
map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil
}
func (f *fakeOgCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandVolumeFunc"), nil
}
func (f *fakeOgCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
func (f *fakeOGCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil
}
func (f *fakeOgCounter) recordFuncCall(name string) volumetypes.GeneratedOperations {
func (f *fakeOGCounter) recordFuncCall(name string) volumetypes.GeneratedOperations {
if _, ok := f.calledFuncs[name]; ok {
f.calledFuncs[name]++
}

View File

@ -82,22 +82,6 @@ func UpdatePVSize(
return nil
}
// MarkResizeInProgress marks cloudprovider resizing as in progress
func MarkResizeInProgress(
pvc *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimResizing,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
newPVC := pvc.DeepCopy()
newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
}
// MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress
// and also annotates the PVC with the name of the resizer.
func MarkResizeInProgressWithResizer(