From cd2a68473a5a5966fa79f455415cb3269a3f7462 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 4 Sep 2017 09:02:34 +0200 Subject: [PATCH] Implement controller for resizing volumes --- cmd/kube-controller-manager/app/BUILD | 1 + .../app/controllermanager.go | 1 + cmd/kube-controller-manager/app/core.go | 19 ++ cmd/kube-controller-manager/app/plugins.go | 19 ++ pkg/controller/volume/expand/BUILD | 60 ++++ pkg/controller/volume/expand/OWNERS | 4 + pkg/controller/volume/expand/cache/BUILD | 53 ++++ .../volume/expand/cache/volume_resize_map.go | 211 ++++++++++++++ .../expand/cache/volume_resize_map_test.go | 88 ++++++ .../volume/expand/expand_controller.go | 268 ++++++++++++++++++ pkg/controller/volume/expand/pvc_populator.go | 85 ++++++ .../volume/expand/sync_volume_resize.go | 101 +++++++ pkg/controller/volume/expand/util/BUILD | 26 ++ pkg/controller/volume/expand/util/util.go | 46 +++ pkg/volume/glusterfs/glusterfs.go | 50 ++++ pkg/volume/plugins.go | 35 +++ pkg/volume/util/operationexecutor/BUILD | 2 + .../operationexecutor/operation_executor.go | 14 + .../operation_executor_test.go | 9 + .../operationexecutor/operation_generator.go | 56 ++++ pkg/volume/util/types/types.go | 3 + 21 files changed, 1151 insertions(+) create mode 100644 pkg/controller/volume/expand/BUILD create mode 100644 pkg/controller/volume/expand/OWNERS create mode 100644 pkg/controller/volume/expand/cache/BUILD create mode 100644 pkg/controller/volume/expand/cache/volume_resize_map.go create mode 100644 pkg/controller/volume/expand/cache/volume_resize_map_test.go create mode 100644 pkg/controller/volume/expand/expand_controller.go create mode 100644 pkg/controller/volume/expand/pvc_populator.go create mode 100644 pkg/controller/volume/expand/sync_volume_resize.go create mode 100644 pkg/controller/volume/expand/util/BUILD create mode 100644 pkg/controller/volume/expand/util/util.go diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 683c45450c1..a9d8968191e 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -72,6 +72,7 @@ go_library( "//pkg/controller/statefulset:go_default_library", "//pkg/controller/ttl:go_default_library", "//pkg/controller/volume/attachdetach:go_default_library", + "//pkg/controller/volume/expand:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/features:go_default_library", "//pkg/quota/install:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0221f58ab98..ba781a9a6e4 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -357,6 +357,7 @@ func NewControllerInitializers() map[string]InitFunc { controllers["route"] = startRouteController controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController + controllers["persistentvolume-expander"] = startVolumeExpandController return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 9b8be77460d..15443874360 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -51,6 +51,7 @@ import ( serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" + "k8s.io/kubernetes/pkg/controller/volume/expand" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/features" quotainstall "k8s.io/kubernetes/pkg/quota/install" @@ -189,6 +190,24 @@ func startAttachDetachController(ctx ControllerContext) (bool, error) { return true, nil } +func startVolumeExpandController(ctx ControllerContext) (bool, error) { + if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { + expandController, expandControllerErr := expand.NewExpandController( + ctx.ClientBuilder.ClientOrDie("expand-controller"), + ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), + ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.Cloud, + ProbeExpandableVolumePlugins(ctx.Options.VolumeConfiguration)) + + if expandControllerErr != nil { + return true, fmt.Errorf("Failed to start volume expand controller : %v", expandControllerErr) + } + go expandController.Run(ctx.Stop) + return true, nil + } + return false, nil +} + func startEndpointController(ctx ControllerContext) (bool, error) { go endpointcontroller.NewEndpointController( ctx.InformerFactory.Core().V1().Pods(), diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index f26ab264cef..bf9c9fe1299 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -88,6 +88,25 @@ func GetDynamicPluginProber(config componentconfig.VolumeConfiguration) volume.D return flexvolume.GetDynamicPluginProber(config.FlexVolumePluginDir) } +// ProbeExpandableVolumePlugins returns volume plugins which are expandable +func ProbeExpandableVolumePlugins(config componentconfig.VolumeConfiguration) []volume.VolumePlugin { + allPlugins := []volume.VolumePlugin{} + + allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) + return allPlugins +} + // ProbeControllerVolumePlugins collects all persistent volume plugins into an // easy to use list. Only volume plugins that implement any of // provisioner/recycler/deleter interface should be returned. diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD new file mode 100644 index 00000000000..b68223bee84 --- /dev/null +++ b/pkg/controller/volume/expand/BUILD @@ -0,0 +1,60 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "expand_controller.go", + "pvc_populator.go", + "sync_volume_resize.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/cloudprovider:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/volume/expand/cache:go_default_library", + "//pkg/controller/volume/expand/util:go_default_library", + "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", + "//pkg/util/io:go_default_library", + "//pkg/util/mount:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/record:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/controller/volume/expand/cache:all-srcs", + "//pkg/controller/volume/expand/util:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/controller/volume/expand/OWNERS b/pkg/controller/volume/expand/OWNERS new file mode 100644 index 00000000000..f9e1482748f --- /dev/null +++ b/pkg/controller/volume/expand/OWNERS @@ -0,0 +1,4 @@ +approvers: +- saad-ali +- jsafrane +- gnufied diff --git a/pkg/controller/volume/expand/cache/BUILD b/pkg/controller/volume/expand/cache/BUILD new file mode 100644 index 00000000000..f07d34a020a --- /dev/null +++ b/pkg/controller/volume/expand/cache/BUILD @@ -0,0 +1,53 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["volume_resize_map.go"], + tags = ["automanaged"], + deps = [ + "//pkg/controller/volume/expand/util:go_default_library", + "//pkg/util/strings:go_default_library", + "//pkg/volume/util/types:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) + +go_test( + name = "go_default_test", + srcs = ["volume_resize_map_test.go"], + library = ":go_default_library", + deps = [ + "//pkg/volume/util/types:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + ], +) diff --git a/pkg/controller/volume/expand/cache/volume_resize_map.go b/pkg/controller/volume/expand/cache/volume_resize_map.go new file mode 100644 index 00000000000..b04bf9f254e --- /dev/null +++ b/pkg/controller/volume/expand/cache/volume_resize_map.go @@ -0,0 +1,211 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + commontypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/controller/volume/expand/util" + "k8s.io/kubernetes/pkg/util/strings" + "k8s.io/kubernetes/pkg/volume/util/types" +) + +// VolumeResizeMap defines an interface that serves as a cache for holding pending resizing requests +type VolumeResizeMap interface { + // AddPVCUpdate adds pvc for resizing + AddPVCUpdate(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) + // DeletePVC deletes pvc that is scheduled for resizing + DeletePVC(pvc *v1.PersistentVolumeClaim) + // GetPVCsWithResizeRequest returns all pending pvc resize requests + GetPVCsWithResizeRequest() []*PVCWithResizeRequest + // MarkAsResized marks a pvc as fully resized + MarkAsResized(*PVCWithResizeRequest, resource.Quantity) error + // UpdatePVSize updates just pv size after cloudprovider resizing is successful + UpdatePVSize(*PVCWithResizeRequest, resource.Quantity) error +} + +type volumeResizeMap struct { + // map of unique pvc name and resize requests that are pending or inflight + pvcrs map[types.UniquePVCName]*PVCWithResizeRequest + // kube client for making API calls + kubeClient clientset.Interface + // for guarding access to pvcrs map + sync.RWMutex +} + +// PVCWithResizeRequest struct defines data structure that stores state needed for +// performing file system resize +type PVCWithResizeRequest struct { + // PVC that needs to be resized + PVC *v1.PersistentVolumeClaim + // persistentvolume + PersistentVolume *v1.PersistentVolume + // Current volume size + CurrentSize resource.Quantity + // Expended volume size + ExpectedSize resource.Quantity +} + +// UniquePVCKey returns unique key of the PVC based on its UID +func (pvcr *PVCWithResizeRequest) UniquePVCKey() types.UniquePVCName { + return types.UniquePVCName(pvcr.PVC.UID) +} + +// QualifiedName returns namespace and name combination of the PVC +func (pvcr *PVCWithResizeRequest) QualifiedName() string { + return strings.JoinQualifiedName(pvcr.PVC.Namespace, pvcr.PVC.Name) +} + +// NewVolumeResizeMap returns new VolumeResizeMap which acts as a cache +// for holding pending resize requests. +func NewVolumeResizeMap(kubeClient clientset.Interface) VolumeResizeMap { + resizeMap := &volumeResizeMap{} + resizeMap.pvcrs = make(map[types.UniquePVCName]*PVCWithResizeRequest) + resizeMap.kubeClient = kubeClient + return resizeMap +} + +// AddPVCUpdate adds pvc for resizing +func (resizeMap *volumeResizeMap) AddPVCUpdate(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { + if pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.Name != pv.Spec.ClaimRef.Name { + glog.V(4).Infof("Persistent Volume is not mapped to PVC being updated : %s", util.ClaimToClaimKey(pvc)) + return + } + + if pvc.Status.Phase != v1.ClaimBound { + return + } + + resizeMap.Lock() + defer resizeMap.Unlock() + + pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] + pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage] + + if pvcStatusSize.Cmp(pvcSize) >= 0 { + return + } + + glog.V(4).Infof("Adding pvc %s with Size %s/%s for resizing", util.ClaimToClaimKey(pvc), pvcSize.String(), pvcStatusSize.String()) + + pvcRequest := &PVCWithResizeRequest{ + PVC: pvc, + CurrentSize: pvcStatusSize, + ExpectedSize: pvcSize, + PersistentVolume: pv, + } + resizeMap.pvcrs[types.UniquePVCName(pvc.UID)] = pvcRequest +} + +// GetPVCsWithResizeRequest returns all pending pvc resize requests +func (resizeMap *volumeResizeMap) GetPVCsWithResizeRequest() []*PVCWithResizeRequest { + resizeMap.Lock() + defer resizeMap.Unlock() + + pvcrs := []*PVCWithResizeRequest{} + for _, pvcr := range resizeMap.pvcrs { + pvcrs = append(pvcrs, pvcr) + } + // Empty out pvcrs map, we will add back failed resize requests later + resizeMap.pvcrs = map[types.UniquePVCName]*PVCWithResizeRequest{} + return pvcrs +} + +// DeletePVC removes given pvc object from list of pvcs that needs resizing. +// deleting a pvc in this map doesn't affect operations that are already inflight. +func (resizeMap *volumeResizeMap) DeletePVC(pvc *v1.PersistentVolumeClaim) { + resizeMap.Lock() + defer resizeMap.Unlock() + pvcUniqueName := types.UniquePVCName(pvc.UID) + glog.V(5).Infof("Removing PVC %v from resize map", pvcUniqueName) + delete(resizeMap.pvcrs, pvcUniqueName) +} + +// MarkAsResized marks a pvc as fully resized +func (resizeMap *volumeResizeMap) MarkAsResized(pvcr *PVCWithResizeRequest, newSize resource.Quantity) error { + resizeMap.Lock() + defer resizeMap.Unlock() + + emptyCondition := []v1.PersistentVolumeClaimCondition{} + + err := resizeMap.updatePVCCapacityAndConditions(pvcr, newSize, emptyCondition) + if err != nil { + glog.V(4).Infof("Error updating PV spec capacity for volume %q with : %v", pvcr.QualifiedName(), err) + return err + } + return nil +} + +// UpdatePVSize updates just pv size after cloudprovider resizing is successful +func (resizeMap *volumeResizeMap) UpdatePVSize(pvcr *PVCWithResizeRequest, newSize resource.Quantity) error { + resizeMap.Lock() + defer resizeMap.Unlock() + + oldPv := pvcr.PersistentVolume + pvClone := oldPv.DeepCopy() + + oldData, err := json.Marshal(pvClone) + + if err != nil { + return fmt.Errorf("Unexpected error marshaling PV : %q with error %v", pvClone.Name, err) + } + + pvClone.Spec.Capacity[v1.ResourceStorage] = newSize + + newData, err := json.Marshal(pvClone) + + if err != nil { + return fmt.Errorf("Unexpected error marshaling PV : %q with error %v", pvClone.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone) + + if err != nil { + return fmt.Errorf("Error Creating two way merge patch for PV : %q with error %v", pvClone.Name, err) + } + + _, updateErr := resizeMap.kubeClient.CoreV1().PersistentVolumes().Patch(pvClone.Name, commontypes.StrategicMergePatchType, patchBytes) + + if updateErr != nil { + glog.V(4).Infof("Error updating pv %q with error : %v", pvClone.Name, updateErr) + return updateErr + } + return nil +} + +func (resizeMap *volumeResizeMap) updatePVCCapacityAndConditions(pvcr *PVCWithResizeRequest, newSize resource.Quantity, pvcConditions []v1.PersistentVolumeClaimCondition) error { + + claimClone := pvcr.PVC.DeepCopy() + + claimClone.Status.Capacity[v1.ResourceStorage] = newSize + claimClone.Status.Conditions = pvcConditions + + _, updateErr := resizeMap.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone) + if updateErr != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s] status: failed: %v", pvcr.QualifiedName(), updateErr) + return updateErr + } + return nil +} diff --git a/pkg/controller/volume/expand/cache/volume_resize_map_test.go b/pkg/controller/volume/expand/cache/volume_resize_map_test.go new file mode 100644 index 00000000000..8a52df1544c --- /dev/null +++ b/pkg/controller/volume/expand/cache/volume_resize_map_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/volume/util/types" +) + +func Test_AddValidPvcUpdate(t *testing.T) { + resizeMap := createTestVolumeResizeMap() + claim1 := testVolumeClaim("foo", "ns", v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + v1.ReadOnlyMany, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G"), + }, + }, + VolumeName: "foo", + }) + + claimClone := claim1.DeepCopy() + claimClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("12G") + pv := getPersistentVolume("foo", resource.MustParse("10G"), claim1) + resizeMap.AddPVCUpdate(claimClone, pv) + pvcr := resizeMap.GetPVCsWithResizeRequest() + if len(pvcr) != 1 { + t.Fatalf("Expected 1 pvc resize request got 0") + } + assert.Equal(t, resource.MustParse("12G"), pvcr[0].ExpectedSize) + assert.Equal(t, 0, len(resizeMap.pvcrs)) +} + +func createTestVolumeResizeMap() *volumeResizeMap { + fakeClient := &fake.Clientset{} + resizeMap := &volumeResizeMap{} + resizeMap.pvcrs = make(map[types.UniquePVCName]*PVCWithResizeRequest) + resizeMap.kubeClient = fakeClient + return resizeMap +} + +func testVolumeClaim(name string, namespace string, spec v1.PersistentVolumeClaimSpec) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + Spec: spec, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + } +} + +func getPersistentVolume(volumeName string, capacity resource.Quantity, pvc *v1.PersistentVolumeClaim) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: volumeName}, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): capacity, + }, + ClaimRef: &v1.ObjectReference{ + Namespace: pvc.Namespace, + Name: pvc.Name, + }, + }, + } +} diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go new file mode 100644 index 00000000000..1c71528ef91 --- /dev/null +++ b/pkg/controller/volume/expand/expand_controller.go @@ -0,0 +1,268 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package expand implements interfaces that attempt to resize a pvc +// by adding pvc to a volume resize map from which PVCs are picked and +// resized +package expand + +import ( + "fmt" + "net" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/runtime" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + kcache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/volume/expand/cache" + "k8s.io/kubernetes/pkg/util/io" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" +) + +const ( + // How often resizing loop runs + syncLoopPeriod time.Duration = 30 * time.Second + // How often pvc populator runs + populatorLoopPeriod time.Duration = 2 * time.Minute +) + +// ExpandController expands the pvs +type ExpandController interface { + Run(stopCh <-chan struct{}) +} + +type expandController struct { + // kubeClient is the kube API client used by volumehost to communicate with + // the API server. + kubeClient clientset.Interface + + // pvcLister is the shared PVC lister used to fetch and store PVC + // objects from the API server. It is shared with other controllers and + // therefore the PVC objects in its store should be treated as immutable. + pvcLister corelisters.PersistentVolumeClaimLister + pvcsSynced kcache.InformerSynced + + pvLister corelisters.PersistentVolumeLister + pvSynced kcache.InformerSynced + + // cloud provider used by volume host + cloud cloudprovider.Interface + + // volumePluginMgr used to initialize and fetch volume plugins + volumePluginMgr volume.VolumePluginMgr + + // recorder is used to record events in the API server + recorder record.EventRecorder + + // Volume resize map of volumes that needs resizing + resizeMap cache.VolumeResizeMap + + // Worker goroutine to process resize requests from resizeMap + syncResize SyncVolumeResize + + // Operation executor + opExecutor operationexecutor.OperationExecutor + + // populator for periodically polling all PVCs + pvcPopulator PVCPopulator +} + +func NewExpandController( + kubeClient clientset.Interface, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + pvInformer coreinformers.PersistentVolumeInformer, + cloud cloudprovider.Interface, + plugins []volume.VolumePlugin) (ExpandController, error) { + + expc := &expandController{ + kubeClient: kubeClient, + cloud: cloud, + pvcLister: pvcInformer.Lister(), + pvcsSynced: pvcInformer.Informer().HasSynced, + pvLister: pvInformer.Lister(), + pvSynced: pvInformer.Informer().HasSynced, + } + + if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil { + return nil, fmt.Errorf("Could not initialize volume plugins for Expand Controller : %+v", err) + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"}) + + expc.opExecutor = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + &expc.volumePluginMgr, + recorder, + false)) + + expc.resizeMap = cache.NewVolumeResizeMap(expc.kubeClient) + + pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + UpdateFunc: expc.pvcUpdate, + DeleteFunc: expc.deletePVC, + }) + + expc.syncResize = NewSyncVolumeResize(syncLoopPeriod, expc.opExecutor, expc.resizeMap, kubeClient) + expc.pvcPopulator = NewPVCPopulator( + populatorLoopPeriod, + expc.resizeMap, + expc.pvcLister, + expc.pvLister, + kubeClient) + return expc, nil +} + +func (expc *expandController) Run(stopCh <-chan struct{}) { + defer runtime.HandleCrash() + glog.Infof("Starting expand controller") + defer glog.Infof("Shutting down expand controller") + + if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) { + return + } + + // Run volume sync work goroutine + go expc.syncResize.Run(stopCh) + // Start the pvc populator loop + go expc.pvcPopulator.Run(stopCh) + <-stopCh +} + +func (expc *expandController) deletePVC(obj interface{}) { + pvc, ok := obj.(*v1.PersistentVolumeClaim) + + if pvc == nil || !ok { + return + } + + expc.resizeMap.DeletePVC(pvc) +} + +func (expc *expandController) pvcUpdate(oldObj, newObj interface{}) { + oldPvc, ok := oldObj.(*v1.PersistentVolumeClaim) + + if oldPvc == nil || !ok { + return + } + + newPVC, ok := newObj.(*v1.PersistentVolumeClaim) + + if newPVC == nil || !ok { + return + } + pv, err := getPersistentVolume(newPVC, expc.pvLister) + if err != nil { + glog.V(5).Infof("Error getting Persistent Volume for pvc %q : %v", newPVC.UID, err) + return + } + expc.resizeMap.AddPVCUpdate(newPVC, pv) +} + +func getPersistentVolume(pvc *v1.PersistentVolumeClaim, pvLister corelisters.PersistentVolumeLister) (*v1.PersistentVolume, error) { + volumeName := pvc.Spec.VolumeName + pv, err := pvLister.Get(volumeName) + + if err != nil { + return nil, fmt.Errorf("failed to find PV %q in PV informer cache with error : %v", volumeName, err) + } + + return pv.DeepCopy(), nil +} + +// Implementing VolumeHost interface +func (expc *expandController) GetPluginDir(pluginName string) string { + return "" +} + +func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string { + return "" +} + +func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string { + return "" +} + +func (expc *expandController) GetKubeClient() clientset.Interface { + return expc.kubeClient +} + +func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { + return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation") +} + +func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) { + return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation") +} + +func (expc *expandController) GetCloudProvider() cloudprovider.Interface { + return expc.cloud +} + +func (expc *expandController) GetMounter(pluginName string) mount.Interface { + return nil +} + +func (expc *expandController) GetExec(pluginName string) mount.Exec { + return mount.NewOsExec() +} + +func (expc *expandController) GetWriter() io.Writer { + return nil +} + +func (expc *expandController) GetHostName() string { + return "" +} + +func (expc *expandController) GetHostIP() (net.IP, error) { + return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation") +} + +func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) { + return v1.ResourceList{}, nil +} + +func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) { + return func(_, _ string) (*v1.Secret, error) { + return nil, fmt.Errorf("GetSecret unsupported in expandController") + } +} + +func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) { + return func(_, _ string) (*v1.ConfigMap, error) { + return nil, fmt.Errorf("GetConfigMap unsupported in expandController") + } +} + +func (expc *expandController) GetNodeLabels() (map[string]string, error) { + return nil, fmt.Errorf("GetNodeLabels unsupported in expandController") +} diff --git a/pkg/controller/volume/expand/pvc_populator.go b/pkg/controller/volume/expand/pvc_populator.go new file mode 100644 index 00000000000..220e1092f7d --- /dev/null +++ b/pkg/controller/volume/expand/pvc_populator.go @@ -0,0 +1,85 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package reconciler implements interfaces that attempt to reconcile the +// desired state of the with the actual state of the world by triggering +// actions. + +package expand + +import ( + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/controller/volume/expand/cache" +) + +// PVCPopulator iterates through PVCs and checks if for bound PVCs +// their size doesn't match with Persistent Volume size +type PVCPopulator interface { + Run(stopCh <-chan struct{}) +} + +type pvcPopulator struct { + loopPeriod time.Duration + resizeMap cache.VolumeResizeMap + pvcLister corelisters.PersistentVolumeClaimLister + pvLister corelisters.PersistentVolumeLister + kubeClient clientset.Interface +} + +func NewPVCPopulator( + loopPeriod time.Duration, + resizeMap cache.VolumeResizeMap, + pvcLister corelisters.PersistentVolumeClaimLister, + pvLister corelisters.PersistentVolumeLister, + kubeClient clientset.Interface) PVCPopulator { + populator := &pvcPopulator{ + loopPeriod: loopPeriod, + pvcLister: pvcLister, + pvLister: pvLister, + resizeMap: resizeMap, + kubeClient: kubeClient, + } + return populator +} + +func (populator *pvcPopulator) Run(stopCh <-chan struct{}) { + wait.Until(populator.Sync, populator.loopPeriod, stopCh) +} + +func (populator *pvcPopulator) Sync() { + pvcs, err := populator.pvcLister.List(labels.Everything()) + if err != nil { + glog.Errorf("Listing PVCs failed in populator : %v", err) + return + } + + for _, pvc := range pvcs { + pv, err := getPersistentVolume(pvc, populator.pvLister) + + if err != nil { + glog.V(5).Infof("Error getting persistent volume for pvc %q : %v", pvc.UID, err) + continue + } + populator.resizeMap.AddPVCUpdate(pvc, pv) + } +} diff --git a/pkg/controller/volume/expand/sync_volume_resize.go b/pkg/controller/volume/expand/sync_volume_resize.go new file mode 100644 index 00000000000..06565775e63 --- /dev/null +++ b/pkg/controller/volume/expand/sync_volume_resize.go @@ -0,0 +1,101 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expand + +import ( + "time" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/controller/volume/expand/cache" + "k8s.io/kubernetes/pkg/controller/volume/expand/util" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" +) + +type SyncVolumeResize interface { + Run(stopCh <-chan struct{}) +} + +type syncResize struct { + loopPeriod time.Duration + resizeMap cache.VolumeResizeMap + opsExecutor operationexecutor.OperationExecutor + kubeClient clientset.Interface +} + +// NewSyncVolumeResize returns actual volume resize handler +func NewSyncVolumeResize( + loopPeriod time.Duration, + opsExecutor operationexecutor.OperationExecutor, + resizeMap cache.VolumeResizeMap, + kubeClient clientset.Interface) SyncVolumeResize { + rc := &syncResize{ + loopPeriod: loopPeriod, + opsExecutor: opsExecutor, + resizeMap: resizeMap, + kubeClient: kubeClient, + } + return rc +} + +func (rc *syncResize) Run(stopCh <-chan struct{}) { + wait.Until(rc.Sync, rc.loopPeriod, stopCh) +} + +func (rc *syncResize) Sync() { + // Resize PVCs that require resize + for _, pvcWithResizeRequest := range rc.resizeMap.GetPVCsWithResizeRequest() { + uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey()) + updatedClaim, err := markPVCResizeInProgress(pvcWithResizeRequest, rc.kubeClient) + if err != nil { + glog.V(5).Infof("Error setting PVC %s in progress with error : %v", pvcWithResizeRequest.QualifiedName(), err) + continue + } + if updatedClaim != nil { + pvcWithResizeRequest.PVC = updatedClaim + } + + if rc.opsExecutor.IsOperationPending(uniqueVolumeKey, "") { + glog.V(10).Infof("Operation for PVC %v is already pending", pvcWithResizeRequest.QualifiedName()) + continue + } + glog.V(5).Infof("Starting opsExecutor.ExpandVolume for volume %s", pvcWithResizeRequest.QualifiedName()) + growFuncError := rc.opsExecutor.ExpandVolume(pvcWithResizeRequest, rc.resizeMap) + if growFuncError != nil && !exponentialbackoff.IsExponentialBackoff(growFuncError) { + glog.Errorf("Error growing pvc %s with %v", pvcWithResizeRequest.QualifiedName(), growFuncError) + } + if growFuncError == nil { + glog.V(5).Infof("Started opsExecutor.ExpandVolume for volume %s", pvcWithResizeRequest.QualifiedName()) + } + } +} + +func markPVCResizeInProgress(pvcWithResizeRequest *cache.PVCWithResizeRequest, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + // Mark PVC as Resize Started + progressCondition := v1.PersistentVolumeClaimCondition{ + Type: v1.PersistentVolumeClaimResizing, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + } + conditions := []v1.PersistentVolumeClaimCondition{progressCondition} + + return util.UpdatePVCCondition(pvcWithResizeRequest.PVC, conditions, kubeClient) +} diff --git a/pkg/controller/volume/expand/util/BUILD b/pkg/controller/volume/expand/util/BUILD new file mode 100644 index 00000000000..ce155342a61 --- /dev/null +++ b/pkg/controller/volume/expand/util/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["util.go"], + visibility = ["//visibility:public"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/volume/expand/util/util.go b/pkg/controller/volume/expand/util/util.go new file mode 100644 index 00000000000..bc6795c4fa7 --- /dev/null +++ b/pkg/controller/volume/expand/util/util.go @@ -0,0 +1,46 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + clientset "k8s.io/client-go/kubernetes" +) + +// ClaimToClaimKey return namespace/name string for pvc +func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string { + return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) +} + +// UpdatePVCCondition updates pvc with given condition status +func UpdatePVCCondition(pvc *v1.PersistentVolumeClaim, + pvcConditions []v1.PersistentVolumeClaimCondition, + kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) { + + claimClone := pvc.DeepCopy() + claimClone.Status.Conditions = pvcConditions + updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone) + if updateErr != nil { + glog.V(4).Infof("updating PersistentVolumeClaim[%s] status: failed: %v", ClaimToClaimKey(pvc), updateErr) + return nil, updateErr + } + return updatedClaim, nil +} diff --git a/pkg/volume/glusterfs/glusterfs.go b/pkg/volume/glusterfs/glusterfs.go index 2145fa63333..a3225e16a44 100644 --- a/pkg/volume/glusterfs/glusterfs.go +++ b/pkg/volume/glusterfs/glusterfs.go @@ -127,6 +127,10 @@ func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool { return false } +func (plugin *glusterfsPlugin) RequiresFSResize() bool { + return false +} + func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { return []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, @@ -1046,3 +1050,49 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa } return &cfg, nil } + +func (plugin *glusterfsPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) { + pvSpec := spec.PersistentVolume.Spec + glog.V(2).Infof("Request to expand volume: %s ", pvSpec.Glusterfs.Path) + volumeName := pvSpec.Glusterfs.Path + + // Fetch the volume for expansion. + volumeID := dstrings.TrimPrefix(volumeName, volPrefix) + + //Get details of SC. + class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume) + if err != nil { + return oldSize, err + } + cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient()) + if err != nil { + return oldSize, err + } + + glog.V(4).Infof("Expanding volume %q with configuration %+v", volumeID, cfg) + + //Create REST server connection + cli := gcli.NewClient(cfg.url, cfg.user, cfg.secretValue) + if cli == nil { + glog.Errorf("failed to create glusterfs rest client") + return oldSize, fmt.Errorf("failed to create glusterfs rest client, REST server authentication failed") + } + + // Find out delta size + expansionSize := (newSize.Value() - oldSize.Value()) + expansionSizeGB := int(volume.RoundUpSize(expansionSize, 1024*1024*1024)) + + // Make volume expansion request + volumeExpandReq := &gapi.VolumeExpandRequest{Size: expansionSizeGB} + + // Expand the volume + volumeInfoRes, err := cli.VolumeExpand(volumeID, volumeExpandReq) + if err != nil { + glog.Errorf("error when expanding the volume :%v", err) + return oldSize, err + } + + glog.V(2).Infof("volume %s expanded to new size %d successfully", volumeName, volumeInfoRes.Size) + newVolumeSize := resource.MustParse(fmt.Sprintf("%dG", volumeInfoRes.Size)) + return newVolumeSize, nil +} diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index b97f54af33b..eb2f8149ac4 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -201,6 +202,14 @@ type AttachableVolumePlugin interface { GetDeviceMountRefs(deviceMountPath string) ([]string, error) } +// ExpandableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that can be +// expanded +type ExpandableVolumePlugin interface { + VolumePlugin + ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) + RequiresFSResize() bool +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which @@ -642,6 +651,32 @@ func (pm *VolumePluginMgr) FindAttachablePluginByName(name string) (AttachableVo return nil, nil } +// FindExpandablePluginBySpec fetches a persistent volume plugin by spec. +func (pm *VolumePluginMgr) FindExpandablePluginBySpec(spec *Spec) (ExpandableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + + if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok { + return expandableVolumePlugin, nil + } + return nil, nil +} + +// FindExpandablePluginBySpec fetches a persistent volume plugin by name. +func (pm *VolumePluginMgr) FindExpandablePluginByName(name string) (ExpandableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginByName(name) + if err != nil { + return nil, err + } + + if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok { + return expandableVolumePlugin, nil + } + return nil, nil +} + // NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler // pod. By default, a recycler pod simply runs "rm -rf" on a volume and tests // for emptiness. Most attributes of the template will be correct for most diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index dae1131fc3a..647fd569d33 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -13,6 +13,7 @@ go_library( "operation_generator.go", ], deps = [ + "//pkg/controller/volume/expand/cache:go_default_library", "//pkg/features:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/mount:go_default_library", @@ -37,6 +38,7 @@ go_test( srcs = ["operation_executor_test.go"], library = ":go_default_library", deps = [ + "//pkg/controller/volume/expand/cache:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util/types:go_default_library", diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index 21e8eecbdc7..8b11358436a 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -29,6 +29,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" @@ -119,6 +120,8 @@ type OperationExecutor interface { // IsOperationPending returns true if an operation for the given volumeName and podName is pending, // otherwise it returns false IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool + // Expand Volume will grow size available to PVC + ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error } // NewOperationExecutor returns a new instance of OperationExecutor. @@ -719,6 +722,17 @@ func (oe *operationExecutor) UnmountDevice( deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc) } +func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error { + expandFunc, pluginName, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap) + + if err != nil { + return err + } + uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey()) + opCompleteFunc := util.OperationCompleteHook(pluginName, "expand_volume") + return oe.pendingOperations.Run(uniqueVolumeKey, "", expandFunc, opCompleteFunc) +} + func (oe *operationExecutor) VerifyControllerAttachedVolume( volumeToMount VolumeToMount, nodeName types.NodeName, diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index 94118674ecc..35b7064c0d1 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" @@ -282,6 +283,14 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v }, "", nil } +func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, + resizeMap expandcache.VolumeResizeMap) (func() error, string, error) { + return func() error { + startOperationAndBlock(fopg.ch, fopg.quit) + return nil + }, "", nil +} + func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc( pluginNodeVolumes map[types.NodeName][]*volume.Spec, pluginNane string, diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 73445ac3a0f..93b05ae6dd1 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -28,6 +28,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" + expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache" "k8s.io/kubernetes/pkg/features" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/mount" @@ -100,6 +101,8 @@ type OperationGenerator interface { map[types.NodeName][]*volume.Spec, string, map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error) + + GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (func() error, string, error) } func (og *operationGenerator) GenerateVolumesAreAttachedFunc( @@ -726,6 +729,59 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach( return nil } +func (og *operationGenerator) GenerateExpandVolumeFunc( + pvcWithResizeRequest *expandcache.PVCWithResizeRequest, + resizeMap expandcache.VolumeResizeMap) (func() error, string, error) { + + volumeSpec := volume.NewSpecFromPersistentVolume(pvcWithResizeRequest.PersistentVolume, false) + + volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) + + if err != nil { + return nil, "", fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err) + } + + expandFunc := func() error { + newSize := pvcWithResizeRequest.ExpectedSize + pvSize := pvcWithResizeRequest.PersistentVolume.Spec.Capacity[v1.ResourceStorage] + if pvSize.Cmp(newSize) < 0 { + updatedSize, expandErr := volumePlugin.ExpandVolumeDevice( + volumeSpec, + pvcWithResizeRequest.ExpectedSize, + pvcWithResizeRequest.CurrentSize) + + if expandErr != nil { + glog.Errorf("Error expanding volume %q of plugin %s : %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr) + og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, expandErr.Error()) + return expandErr + } + newSize = updatedSize + updateErr := resizeMap.UpdatePVSize(pvcWithResizeRequest, newSize) + + if updateErr != nil { + glog.V(4).Infof("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr) + og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, updateErr.Error()) + return updateErr + } + } + + // No Cloudprovider resize needed, lets mark resizing as done + if !volumePlugin.RequiresFSResize() { + glog.V(4).Infof("Controller resizing done for PVC %s", pvcWithResizeRequest.QualifiedName()) + err := resizeMap.MarkAsResized(pvcWithResizeRequest, newSize) + + if err != nil { + glog.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err) + og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, err.Error()) + return err + } + } + return nil + + } + return expandFunc, volumePlugin.GetPluginName(), nil +} + func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error { mountOptions := volume.MountOptionFromSpec(volumeToMount.VolumeSpec) diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 90bbdc4e21f..9375ad6750c 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -21,3 +21,6 @@ import "k8s.io/apimachinery/pkg/types" // UniquePodName defines the type to key pods off of type UniquePodName types.UID + +// UniquePVCName defines the type to key pvc off +type UniquePVCName types.UID