From 405d33eae4ebc4b7d1fe18ee0f34ffae8e6da7e5 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 16 May 2019 17:42:16 -0400 Subject: [PATCH] Add code to handle in-tree to CSI migration for resizing --- cmd/kube-controller-manager/app/core.go | 1 + pkg/controller/volume/expand/BUILD | 17 ++++- .../volume/expand/expand_controller.go | 67 ++++++++++++++++--- .../volume/expand/expand_controller_test.go | 47 +++++++++++++ pkg/volume/util/resize_util.go | 38 +++++++++++ pkg/volume/util/types/types.go | 7 ++ 6 files changed, 166 insertions(+), 11 deletions(-) create mode 100644 pkg/controller/volume/expand/expand_controller_test.go diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 99a4ac166ac..b21feca75ef 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -244,6 +244,7 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err ctx.ClientBuilder.ClientOrDie("expand-controller"), ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.InformerFactory.Storage().V1().StorageClasses(), ctx.Cloud, ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)) diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index 5f3a8b7b5df..185fcb75ca1 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -1,6 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -10,6 +10,7 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/volume/expand", deps = [ + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/volume/events:go_default_library", "//pkg/controller/volume/expand/cache:go_default_library", @@ -27,14 +28,17 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", + "//staging/src/k8s.io/csi-translation-lib:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -54,3 +58,14 @@ filegroup( ], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["expand_controller_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/controller:go_default_library", + "//pkg/controller/volume/attachdetach/testing:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + ], +) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 2332f322fad..29997d26eb9 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -30,14 +30,19 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" + storageclassinformer "k8s.io/client-go/informers/storage/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + storagelisters "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/tools/cache" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" + csitranslation "k8s.io/csi-translation-lib" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/util/mount" @@ -72,6 +77,10 @@ type expandController struct { pvLister corelisters.PersistentVolumeLister pvSynced kcache.InformerSynced + // storageClass lister for fetching provisioner name + classLister storagelisters.StorageClassLister + classListerSynced cache.InformerSynced + // cloud provider used by volume host cloud cloudprovider.Interface @@ -90,17 +99,20 @@ func NewExpandController( kubeClient clientset.Interface, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, + scInformer storageclassinformer.StorageClassInformer, cloud cloudprovider.Interface, plugins []volume.VolumePlugin) (ExpandController, error) { expc := &expandController{ - kubeClient: kubeClient, - cloud: cloud, - pvcLister: pvcInformer.Lister(), - pvcsSynced: pvcInformer.Informer().HasSynced, - pvLister: pvInformer.Lister(), - pvSynced: pvInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), + kubeClient: kubeClient, + cloud: cloud, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvSynced: pvInformer.Informer().HasSynced, + classLister: scInformer.Lister(), + classListerSynced: scInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), } if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil { @@ -201,14 +213,29 @@ func (expc *expandController) syncHandler(key string) error { klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) return err } + if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID { err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc)) klog.V(4).Infof("%v", err) return err } + claimClass := v1helper.GetPersistentVolumeClaimClass(pvc) + if claimClass == "" { + klog.V(4).Infof("volume expansion is disabled for PVC without StorageClasses: %s", util.ClaimToClaimKey(pvc)) + return nil + } + + class, err := expc.classLister.Get(claimClass) + if err != nil { + klog.V(4).Infof("volume expansion is not supported for PVC: %s, can not find StorageClass %s", util.ClaimToClaimKey(pvc), claimClass) + return nil + } + volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) + volumeResizerName := class.Provisioner + if err != nil || volumePlugin == nil { msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + "waiting for an external controller to process this PVC") @@ -221,11 +248,31 @@ func (expc *expandController) syncHandler(key string) error { return err } - return expc.expand(pvc, pv) + if volumePlugin.IsMigratedToCSI() { + msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName) + expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg) + csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner) + if err != nil { + errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) + expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) + return fmt.Errorf(errorMsg) + } + + pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient) + if err != nil { + errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) + expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) + return fmt.Errorf(errorMsg) + + } + return nil + } + + return expc.expand(pvc, pv, volumeResizerName) } -func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error { - pvc, err := util.MarkResizeInProgress(pvc, expc.kubeClient) +func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error { + pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) if err != nil { klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) return err diff --git a/pkg/controller/volume/expand/expand_controller_test.go b/pkg/controller/volume/expand/expand_controller_test.go new file mode 100644 index 00000000000..c01d4f69f72 --- /dev/null +++ b/pkg/controller/volume/expand/expand_controller_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expand + +import ( + "testing" + + "k8s.io/client-go/informers" + "k8s.io/kubernetes/pkg/controller" + controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" +) + +func TestSyncHandler(t *testing.T) { + fakeKubeClient := controllervolumetesting.CreateTestClient() + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + storageClassInformer := informerFactory.Storage().V1().StorageClasses() + + expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, nil) + if err != nil { + t.Fatalf("error creating expand controller : %v", err) + } + + var expController *expandController + expController, _ = expc.(*expandController) + + err = expController.syncHandler("default/foo") + if err != nil { + t.Fatalf("error running sync handler : %v", err) + } + +} diff --git a/pkg/volume/util/resize_util.go b/pkg/volume/util/resize_util.go index 52398412e5e..f3ac3678015 100644 --- a/pkg/volume/util/resize_util.go +++ b/pkg/volume/util/resize_util.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/resizefs" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) var ( @@ -97,6 +98,43 @@ func MarkResizeInProgress( return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) } +// MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress +// and also annotates the PVC with the name of the resizer. +func MarkResizeInProgressWithResizer( + pvc *v1.PersistentVolumeClaim, + resizerName string, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + // Mark PVC as Resize Started + progressCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimResizing, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } + conditions := []v1.PersistentVolumeClaimCondition{progressCondition} + newPVC := pvc.DeepCopy() + newPVC = MergeResizeConditionOnPVC(newPVC, conditions) + newPVC = setResizer(newPVC, resizerName) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + +// SetClaimResizer sets resizer annotation on PVC +func SetClaimResizer( + pvc *v1.PersistentVolumeClaim, + resizerName string, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + newPVC := pvc.DeepCopy() + newPVC = setResizer(newPVC, resizerName) + return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient) +} + +func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim { + if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName { + return pvc + } + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName) + return pvc +} + // MarkForFSResize marks file system resizing as pending func MarkForFSResize( pvc *v1.PersistentVolumeClaim, diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 8afea9bac29..2811cd1dc6c 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -50,3 +50,10 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { defer runtime.RecoverFromPanic(&detailedErr) return o.OperationFunc() } + +const ( + // VolumeResizerKey is key that will be used to store resizer used + // for resizing PVC. The generated key/value pair will be added + // as a annotation to the PVC. + VolumeResizerKey = "volume.kubernetes.io/storage-resizer" +)