Merge pull request #77994 from gnufied/csi-resize-migration

Handle CSI volume resize migration.
This commit is contained in:
Kubernetes Prow Robot 2019-05-29 21:28:43 -07:00 committed by GitHub
commit 05df640f33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 523 additions and 29 deletions

View File

@ -244,6 +244,7 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err
ctx.ClientBuilder.ClientOrDie("expand-controller"), ctx.ClientBuilder.ClientOrDie("expand-controller"),
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(), ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1().StorageClasses(),
ctx.Cloud, ctx.Cloud,
ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)) ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration))

View File

@ -1,6 +1,6 @@
package(default_visibility = ["//visibility:public"]) package(default_visibility = ["//visibility:public"])
load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
@ -10,6 +10,7 @@ go_library(
], ],
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand", importpath = "k8s.io/kubernetes/pkg/controller/volume/expand",
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library", "//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/expand/cache:go_default_library", "//pkg/controller/volume/expand/cache:go_default_library",
@ -27,14 +28,17 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )
@ -54,3 +58,29 @@ filegroup(
], ],
tags = ["automanaged"], tags = ["automanaged"],
) )
go_test(
name = "go_default_test",
srcs = ["expand_controller_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/awsebs:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
],
)

View File

@ -30,14 +30,19 @@ import (
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
storageclassinformer "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
kcache "k8s.io/client-go/tools/cache" kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
csitranslation "k8s.io/csi-translation-lib"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
@ -72,6 +77,10 @@ type expandController struct {
pvLister corelisters.PersistentVolumeLister pvLister corelisters.PersistentVolumeLister
pvSynced kcache.InformerSynced pvSynced kcache.InformerSynced
// storageClass lister for fetching provisioner name
classLister storagelisters.StorageClassLister
classListerSynced cache.InformerSynced
// cloud provider used by volume host // cloud provider used by volume host
cloud cloudprovider.Interface cloud cloudprovider.Interface
@ -90,6 +99,7 @@ func NewExpandController(
kubeClient clientset.Interface, kubeClient clientset.Interface,
pvcInformer coreinformers.PersistentVolumeClaimInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer, pvInformer coreinformers.PersistentVolumeInformer,
scInformer storageclassinformer.StorageClassInformer,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
plugins []volume.VolumePlugin) (ExpandController, error) { plugins []volume.VolumePlugin) (ExpandController, error) {
@ -100,6 +110,8 @@ func NewExpandController(
pvcsSynced: pvcInformer.Informer().HasSynced, pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(), pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced, pvSynced: pvInformer.Informer().HasSynced,
classLister: scInformer.Lister(),
classListerSynced: scInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
} }
@ -182,6 +194,8 @@ func (expc *expandController) processNextWorkItem() bool {
return true return true
} }
// syncHandler performs actual expansion of volume. If an error is returned
// from this function - PVC will be requeued for resizing.
func (expc *expandController) syncHandler(key string) error { func (expc *expandController) syncHandler(key string) error {
namespace, name, err := kcache.SplitMetaNamespaceKey(key) namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
@ -201,14 +215,29 @@ func (expc *expandController) syncHandler(key string) error {
klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
return err return err
} }
if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID { if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc)) err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
klog.V(4).Infof("%v", err) klog.V(4).Infof("%v", err)
return err return err
} }
claimClass := v1helper.GetPersistentVolumeClaimClass(pvc)
if claimClass == "" {
klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc))
return nil
}
class, err := expc.classLister.Get(claimClass)
if err != nil {
klog.V(4).Infof("failed to expand PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
return nil
}
volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
volumeResizerName := class.Provisioner
if err != nil || volumePlugin == nil { if err != nil || volumePlugin == nil {
msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
"waiting for an external controller to process this PVC") "waiting for an external controller to process this PVC")
@ -218,14 +247,35 @@ func (expc *expandController) syncHandler(key string) error {
} }
expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg)) expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg) klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
return err // If we are expecting that an external plugin will handle resizing this volume then
// is no point in requeuing this PVC.
return nil
} }
return expc.expand(pvc, pv) if volumePlugin.IsMigratedToCSI() {
msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner)
if err != nil {
errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
} }
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient) if err != nil {
errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
return fmt.Errorf(errorMsg)
}
return nil
}
return expc.expand(pvc, pv, volumeResizerName)
}
func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
if err != nil { if err != nil {
klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
return err return err
@ -250,7 +300,7 @@ func (expc *expandController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting expand controller") klog.Infof("Starting expand controller")
defer klog.Infof("Shutting down expand controller") defer klog.Infof("Shutting down expand controller")
if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) { if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) {
return return
} }

View File

@ -0,0 +1,241 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package expand
import (
"encoding/json"
"fmt"
"reflect"
"regexp"
"testing"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coretesting "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
csitranslationplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/controller"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/awsebs"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
func TestSyncHandler(t *testing.T) {
tests := []struct {
name string
csiMigrationEnabled bool
storageClass *storagev1.StorageClass
pvcKey string
pv *v1.PersistentVolume
pvc *v1.PersistentVolumeClaim
expansionCalled bool
hasError bool
expectedAnnotation map[string]string
}{
{
name: "when pvc has no PV binding",
pvc: getFakePersistentVolumeClaim("no-pv-pvc", "", "", ""),
pvcKey: "default/no-pv-pvc",
hasError: true,
},
{
name: "when pvc has no storageclass",
pv: getFakePersistentVolume("vol-1", csitranslationplugins.AWSEBSInTreePluginName, "no-sc-pvc-vol-1"),
pvc: getFakePersistentVolumeClaim("no-sc-pvc", "vol-1", "", "no-sc-pvc-vol-1"),
pvcKey: "default/no-sc-pvc",
},
{
name: "when pvc storageclass is missing",
pv: getFakePersistentVolume("vol-2", csitranslationplugins.AWSEBSInTreePluginName, "missing-sc-pvc-vol-2"),
pvc: getFakePersistentVolumeClaim("missing-sc-pvc", "vol-2", "resizable", "missing-sc-pvc-vol-2"),
pvcKey: "default/missing-sc-pvc",
},
{
name: "when pvc and pv has everything for in-tree plugin",
pv: getFakePersistentVolume("vol-3", csitranslationplugins.AWSEBSInTreePluginName, "good-pvc-vol-3"),
pvc: getFakePersistentVolumeClaim("good-pvc", "vol-3", "resizable2", "good-pvc-vol-3"),
storageClass: getFakeStorageClass("resizable2", csitranslationplugins.AWSEBSInTreePluginName),
pvcKey: "default/good-pvc",
expansionCalled: true,
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSInTreePluginName},
},
{
name: "when csi migration is enabled for a in-tree plugin",
csiMigrationEnabled: true,
pv: getFakePersistentVolume("vol-4", csitranslationplugins.AWSEBSInTreePluginName, "csi-pvc-vol-4"),
pvc: getFakePersistentVolumeClaim("csi-pvc", "vol-4", "resizable3", "csi-pvc-vol-4"),
storageClass: getFakeStorageClass("resizable3", csitranslationplugins.AWSEBSInTreePluginName),
pvcKey: "default/csi-pvc",
expectedAnnotation: map[string]string{volumetypes.VolumeResizerKey: csitranslationplugins.AWSEBSDriverName},
},
{
name: "for csi plugin without migration path",
pv: getFakePersistentVolume("vol-5", "com.csi.ceph", "ceph-csi-pvc-vol-5"),
pvc: getFakePersistentVolumeClaim("ceph-csi-pvc", "vol-5", "resizable4", "ceph-csi-pvc-vol-5"),
storageClass: getFakeStorageClass("resizable4", "com.csi.ceph"),
pvcKey: "default/ceph-csi-pvc",
expansionCalled: false,
hasError: false,
},
}
for _, tc := range tests {
test := tc
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := informerFactory.Core().V1().PersistentVolumes()
storageClassInformer := informerFactory.Storage().V1().StorageClasses()
pvc := test.pvc
if tc.pv != nil {
informerFactory.Core().V1().PersistentVolumes().Informer().GetIndexer().Add(tc.pv)
}
if tc.pvc != nil {
informerFactory.Core().V1().PersistentVolumeClaims().Informer().GetIndexer().Add(pvc)
}
allPlugins := []volume.VolumePlugin{}
allPlugins = append(allPlugins, awsebs.ProbeVolumePlugins()...)
if tc.storageClass != nil {
informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.storageClass)
}
expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins)
if err != nil {
t.Fatalf("error creating expand controller : %v", err)
}
if test.csiMigrationEnabled {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, true)()
} else {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, false)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationAWS, false)()
}
var expController *expandController
expController, _ = expc.(*expandController)
var expansionCalled bool
expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) {
expansionCalled = true
return nil, nil
})
fakeKubeClient.AddReactor("patch", "persistentvolumeclaims", func(action coretesting.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
patchActionaction, _ := action.(coretesting.PatchAction)
pvc, err = applyPVCPatch(pvc, patchActionaction.GetPatch())
if err != nil {
return false, nil, err
}
return true, pvc, nil
}
return true, pvc, nil
})
err = expController.syncHandler(test.pvcKey)
if err != nil && !test.hasError {
t.Fatalf("for: %s; unexpected error while running handler : %v", test.name, err)
}
if err == nil && test.hasError {
t.Fatalf("for: %s; unexpected success", test.name)
}
if expansionCalled != test.expansionCalled {
t.Fatalf("for: %s; expected expansionCalled to be %v but was %v", test.name, test.expansionCalled, expansionCalled)
}
if len(test.expectedAnnotation) != 0 && !reflect.DeepEqual(test.expectedAnnotation, pvc.Annotations) {
t.Fatalf("for: %s; expected %v annotations, got %v", test.name, test.expectedAnnotation, pvc.Annotations)
}
}
}
func applyPVCPatch(originalPVC *v1.PersistentVolumeClaim, patch []byte) (*v1.PersistentVolumeClaim, error) {
pvcData, err := json.Marshal(originalPVC)
if err != nil {
return nil, fmt.Errorf("failed to marshal pvc with %v", err)
}
updated, err := strategicpatch.StrategicMergePatch(pvcData, patch, v1.PersistentVolumeClaim{})
if err != nil {
return nil, fmt.Errorf("failed to apply patch on pvc %v", err)
}
updatedPVC := &v1.PersistentVolumeClaim{}
if err := json.Unmarshal(updated, updatedPVC); err != nil {
return nil, fmt.Errorf("failed to unmarshal updated pvc : %v", err)
}
return updatedPVC, nil
}
func getFakePersistentVolume(volumeName, pluginName string, pvcUID types.UID) *v1.PersistentVolume {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{},
ClaimRef: &v1.ObjectReference{
Namespace: "default",
},
},
}
if pvcUID != "" {
pv.Spec.ClaimRef.UID = pvcUID
}
if matched, _ := regexp.MatchString(`csi`, pluginName); matched {
pv.Spec.PersistentVolumeSource.CSI = &v1.CSIPersistentVolumeSource{
Driver: pluginName,
VolumeHandle: volumeName,
}
} else {
pv.Spec.PersistentVolumeSource.AWSElasticBlockStore = &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: volumeName,
FSType: "ext4",
}
}
return pv
}
func getFakePersistentVolumeClaim(pvcName, volumeName, scName string, uid types.UID) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: "default", UID: uid},
Spec: v1.PersistentVolumeClaimSpec{},
}
if volumeName != "" {
pvc.Spec.VolumeName = volumeName
}
if scName != "" {
pvc.Spec.StorageClassName = &scName
}
return pvc
}
func getFakeStorageClass(scName, pluginName string) *storagev1.StorageClass {
return &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{Name: scName},
Provisioner: pluginName,
}
}

View File

@ -9,6 +9,7 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"fakegenerator.go",
"operation_executor.go", "operation_executor.go",
"operation_generator.go", "operation_generator.go",
], ],

View File

@ -0,0 +1,115 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
// fakeOGCounter is a simple OperationGenerator which counts number of times a function
// has been caled
type fakeOGCounter struct {
// calledFuncs stores name and count of functions
calledFuncs map[string]int
opFunc func() (error, error)
}
var _ OperationGenerator = &fakeOGCounter{}
// NewFakeOGCounter returns a OperationGenerator
func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator {
return &fakeOGCounter{
calledFuncs: map[string]int{},
opFunc: opFunc,
}
}
func (f *fakeOGCounter) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
return f.recordFuncCall("GenerateMountVolumeFunc")
}
func (f *fakeOGCounter) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmountVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
return f.recordFuncCall("GenerateAttachVolumeFunc")
}
func (f *fakeOGCounter) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateDetachVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateVolumesAreAttachedFunc"), nil
}
func (f *fakeOGCounter) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmountDeviceFunc"), nil
}
func (f *fakeOGCounter) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateVerifyControllerAttachedVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateMapVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmapVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateUnmapDeviceFunc"), nil
}
func (f *fakeOGCounter) GetVolumePluginMgr() *volume.VolumePluginMgr {
return nil
}
func (f *fakeOGCounter) GenerateBulkVolumeVerifyFunc(
map[types.NodeName][]*volume.Spec,
string,
map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateBulkVolumeVerifyFunc"), nil
}
func (f *fakeOGCounter) GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandVolumeFunc"), nil
}
func (f *fakeOGCounter) GenerateExpandVolumeFSWithoutUnmountingFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
return f.recordFuncCall("GenerateExpandVolumeFSWithoutUnmountingFunc"), nil
}
func (f *fakeOGCounter) recordFuncCall(name string) volumetypes.GeneratedOperations {
if _, ok := f.calledFuncs[name]; ok {
f.calledFuncs[name]++
}
ops := volumetypes.GeneratedOperations{
OperationName: name,
OperationFunc: f.opFunc,
}
return ops
}

View File

@ -683,7 +683,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
// resizeFileSystem will resize the file system if user has requested a resize of // resizeFileSystem will resize the file system if user has requested a resize of
// underlying persistent volume and is allowed to do so. // underlying persistent volume and is allowed to do so.
resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions)
if resizeError != nil { if resizeError != nil {
klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError)
@ -721,7 +721,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
// - Volume does not support DeviceMounter interface. // - Volume does not support DeviceMounter interface.
// - In case of CSI the volume does not have node stage_unstage capability. // - In case of CSI the volume does not have node stage_unstage capability.
if !resizeDone { if !resizeDone {
resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions, volumePluginName) resizeDone, resizeError = og.resizeFileSystem(volumeToMount, resizeOptions)
if resizeError != nil { if resizeError != nil {
klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError) klog.Errorf("MountVolume.resizeFileSystem failed with %v", resizeError)
return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError) return volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
@ -760,7 +760,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
} }
} }
func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions, pluginName string) (bool, error) { func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName) klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName)
return true, nil return true, nil
@ -1577,13 +1577,23 @@ func (og *operationGenerator) GenerateExpandVolumeFunc(
func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc( func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
volumeToMount VolumeToMount, volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) { actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
fsResizeFunc := func() (error, error) {
// Need to translate the spec here if the plugin is migrated so that the metrics
// emitted show the correct (migrated) plugin
if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
if err != nil {
return volumeToMount.GenerateError("VolumeFSResize.translateSpec failed", err)
}
volumeToMount.VolumeSpec = csiSpec
}
volumePlugin, err := volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil { if err != nil || volumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err) return volumeToMount.GenerateError("VolumeFSResize.FindPluginBySpec failed", err)
} }
fsResizeFunc := func() (error, error) {
var resizeDone bool var resizeDone bool
var simpleErr, detailedErr error var simpleErr, detailedErr error
resizeOptions := volume.NodeResizeOptions{ resizeOptions := volume.NodeResizeOptions{
@ -1603,7 +1613,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err) return volumeToMount.GenerateError("VolumeFSResize.GetDeviceMountPath failed", err)
} }
resizeOptions.DeviceMountPath = dmp resizeOptions.DeviceMountPath = dmp
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
if simpleErr != nil || detailedErr != nil { if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr return simpleErr, detailedErr
} }
@ -1623,7 +1633,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
resizeOptions.DeviceMountPath = volumeMounter.GetPath() resizeOptions.DeviceMountPath = volumeMounter.GetPath()
resizeOptions.CSIVolumePhase = volume.CSIVolumePublished resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions, volumePlugin.GetPluginName()) resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
if simpleErr != nil || detailedErr != nil { if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr return simpleErr, detailedErr
} }
@ -1631,7 +1641,7 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
return nil, nil return nil, nil
} }
// This is a placeholder error - we should NEVER reach here. // This is a placeholder error - we should NEVER reach here.
err := fmt.Errorf("volume resizing failed for unknown reason") err = fmt.Errorf("volume resizing failed for unknown reason")
return volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed to resize volume", err) return volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed to resize volume", err)
} }
@ -1641,6 +1651,24 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
} }
} }
// Need to translate the spec here if the plugin is migrated so that the metrics
// emitted show the correct (migrated) plugin
if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
if err == nil {
volumeToMount.VolumeSpec = csiSpec
}
// If we have an error here we ignore it, the metric emitted will then be for the
// in-tree plugin. This error case(skipped one) will also trigger an error
// while the generated function is executed. And those errors will be handled during the execution of the generated
// function with a back off policy.
}
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VolumeFSResize.FindPluginBySpec failed", err)
}
return volumetypes.GeneratedOperations{ return volumetypes.GeneratedOperations{
OperationName: "volume_fs_resize", OperationName: "volume_fs_resize",
OperationFunc: fsResizeFunc, OperationFunc: fsResizeFunc,
@ -1651,9 +1679,8 @@ func (og *operationGenerator) GenerateExpandVolumeFSWithoutUnmountingFunc(
func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount, func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater, actualStateOfWorld ActualStateOfWorldMounterUpdater,
resizeOptions volume.NodeResizeOptions, resizeOptions volume.NodeResizeOptions) (bool, error, error) {
pluginName string) (bool, error, error) { resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions)
resizeDone, err := og.resizeFileSystem(volumeToMount, resizeOptions, pluginName)
if err != nil { if err != nil {
klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err) klog.Errorf("VolumeFSResize.resizeFileSystem failed : %v", err)
e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err) e1, e2 := volumeToMount.GenerateError("VolumeFSResize.resizeFileSystem failed", err)

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/resizefs" "k8s.io/kubernetes/pkg/util/resizefs"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
) )
var ( var (
@ -81,9 +82,11 @@ func UpdatePVSize(
return nil return nil
} }
// MarkResizeInProgress marks cloudprovider resizing as in progress // MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress
func MarkResizeInProgress( // and also annotates the PVC with the name of the resizer.
func MarkResizeInProgressWithResizer(
pvc *v1.PersistentVolumeClaim, pvc *v1.PersistentVolumeClaim,
resizerName string,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
// Mark PVC as Resize Started // Mark PVC as Resize Started
progressCondition := v1.PersistentVolumeClaimCondition{ progressCondition := v1.PersistentVolumeClaimCondition{
@ -94,9 +97,28 @@ func MarkResizeInProgress(
conditions := []v1.PersistentVolumeClaimCondition{progressCondition} conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
newPVC := pvc.DeepCopy() newPVC := pvc.DeepCopy()
newPVC = MergeResizeConditionOnPVC(newPVC, conditions) newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
newPVC = setResizer(newPVC, resizerName)
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
} }
// SetClaimResizer sets resizer annotation on PVC
func SetClaimResizer(
pvc *v1.PersistentVolumeClaim,
resizerName string,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
newPVC := pvc.DeepCopy()
newPVC = setResizer(newPVC, resizerName)
return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
}
func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim {
if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName {
return pvc
}
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName)
return pvc
}
// MarkForFSResize marks file system resizing as pending // MarkForFSResize marks file system resizing as pending
func MarkForFSResize( func MarkForFSResize(
pvc *v1.PersistentVolumeClaim, pvc *v1.PersistentVolumeClaim,

View File

@ -50,3 +50,10 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) {
defer runtime.RecoverFromPanic(&detailedErr) defer runtime.RecoverFromPanic(&detailedErr)
return o.OperationFunc() return o.OperationFunc()
} }
const (
// VolumeResizerKey is key that will be used to store resizer used
// for resizing PVC. The generated key/value pair will be added
// as a annotation to the PVC.
VolumeResizerKey = "volume.kubernetes.io/storage-resizer"
)