mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Implement controller for resizing volumes
This commit is contained in:
parent
e78d433150
commit
cd2a68473a
@ -72,6 +72,7 @@ go_library(
|
||||
"//pkg/controller/statefulset:go_default_library",
|
||||
"//pkg/controller/ttl:go_default_library",
|
||||
"//pkg/controller/volume/attachdetach:go_default_library",
|
||||
"//pkg/controller/volume/expand:go_default_library",
|
||||
"//pkg/controller/volume/persistentvolume:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/quota/install:go_default_library",
|
||||
|
@ -357,6 +357,7 @@ func NewControllerInitializers() map[string]InitFunc {
|
||||
controllers["route"] = startRouteController
|
||||
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
|
||||
controllers["attachdetach"] = startAttachDetachController
|
||||
controllers["persistentvolume-expander"] = startVolumeExpandController
|
||||
|
||||
return controllers
|
||||
}
|
||||
|
@ -51,6 +51,7 @@ import (
|
||||
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/expand"
|
||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
quotainstall "k8s.io/kubernetes/pkg/quota/install"
|
||||
@ -189,6 +190,24 @@ func startAttachDetachController(ctx ControllerContext) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startVolumeExpandController(ctx ControllerContext) (bool, error) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
|
||||
expandController, expandControllerErr := expand.NewExpandController(
|
||||
ctx.ClientBuilder.ClientOrDie("expand-controller"),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ctx.Cloud,
|
||||
ProbeExpandableVolumePlugins(ctx.Options.VolumeConfiguration))
|
||||
|
||||
if expandControllerErr != nil {
|
||||
return true, fmt.Errorf("Failed to start volume expand controller : %v", expandControllerErr)
|
||||
}
|
||||
go expandController.Run(ctx.Stop)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func startEndpointController(ctx ControllerContext) (bool, error) {
|
||||
go endpointcontroller.NewEndpointController(
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
|
@ -88,6 +88,25 @@ func GetDynamicPluginProber(config componentconfig.VolumeConfiguration) volume.D
|
||||
return flexvolume.GetDynamicPluginProber(config.FlexVolumePluginDir)
|
||||
}
|
||||
|
||||
// ProbeExpandableVolumePlugins returns volume plugins which are expandable
|
||||
func ProbeExpandableVolumePlugins(config componentconfig.VolumeConfiguration) []volume.VolumePlugin {
|
||||
allPlugins := []volume.VolumePlugin{}
|
||||
|
||||
allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, storageos.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)
|
||||
return allPlugins
|
||||
}
|
||||
|
||||
// ProbeControllerVolumePlugins collects all persistent volume plugins into an
|
||||
// easy to use list. Only volume plugins that implement any of
|
||||
// provisioner/recycler/deleter interface should be returned.
|
||||
|
60
pkg/controller/volume/expand/BUILD
Normal file
60
pkg/controller/volume/expand/BUILD
Normal file
@ -0,0 +1,60 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"expand_controller.go",
|
||||
"pvc_populator.go",
|
||||
"sync_volume_resize.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/volume/expand/cache:go_default_library",
|
||||
"//pkg/controller/volume/expand/util:go_default_library",
|
||||
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
|
||||
"//pkg/util/io:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/util/operationexecutor:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//pkg/controller/volume/expand/cache:all-srcs",
|
||||
"//pkg/controller/volume/expand/util:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
4
pkg/controller/volume/expand/OWNERS
Normal file
4
pkg/controller/volume/expand/OWNERS
Normal file
@ -0,0 +1,4 @@
|
||||
approvers:
|
||||
- saad-ali
|
||||
- jsafrane
|
||||
- gnufied
|
53
pkg/controller/volume/expand/cache/BUILD
vendored
Normal file
53
pkg/controller/volume/expand/cache/BUILD
vendored
Normal file
@ -0,0 +1,53 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["volume_resize_map.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/controller/volume/expand/util:go_default_library",
|
||||
"//pkg/util/strings:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["volume_resize_map_test.go"],
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
],
|
||||
)
|
211
pkg/controller/volume/expand/cache/volume_resize_map.go
vendored
Normal file
211
pkg/controller/volume/expand/cache/volume_resize_map.go
vendored
Normal file
@ -0,0 +1,211 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
commontypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/expand/util"
|
||||
"k8s.io/kubernetes/pkg/util/strings"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
// VolumeResizeMap defines an interface that serves as a cache for holding pending resizing requests
|
||||
type VolumeResizeMap interface {
|
||||
// AddPVCUpdate adds pvc for resizing
|
||||
AddPVCUpdate(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume)
|
||||
// DeletePVC deletes pvc that is scheduled for resizing
|
||||
DeletePVC(pvc *v1.PersistentVolumeClaim)
|
||||
// GetPVCsWithResizeRequest returns all pending pvc resize requests
|
||||
GetPVCsWithResizeRequest() []*PVCWithResizeRequest
|
||||
// MarkAsResized marks a pvc as fully resized
|
||||
MarkAsResized(*PVCWithResizeRequest, resource.Quantity) error
|
||||
// UpdatePVSize updates just pv size after cloudprovider resizing is successful
|
||||
UpdatePVSize(*PVCWithResizeRequest, resource.Quantity) error
|
||||
}
|
||||
|
||||
type volumeResizeMap struct {
|
||||
// map of unique pvc name and resize requests that are pending or inflight
|
||||
pvcrs map[types.UniquePVCName]*PVCWithResizeRequest
|
||||
// kube client for making API calls
|
||||
kubeClient clientset.Interface
|
||||
// for guarding access to pvcrs map
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// PVCWithResizeRequest struct defines data structure that stores state needed for
|
||||
// performing file system resize
|
||||
type PVCWithResizeRequest struct {
|
||||
// PVC that needs to be resized
|
||||
PVC *v1.PersistentVolumeClaim
|
||||
// persistentvolume
|
||||
PersistentVolume *v1.PersistentVolume
|
||||
// Current volume size
|
||||
CurrentSize resource.Quantity
|
||||
// Expended volume size
|
||||
ExpectedSize resource.Quantity
|
||||
}
|
||||
|
||||
// UniquePVCKey returns unique key of the PVC based on its UID
|
||||
func (pvcr *PVCWithResizeRequest) UniquePVCKey() types.UniquePVCName {
|
||||
return types.UniquePVCName(pvcr.PVC.UID)
|
||||
}
|
||||
|
||||
// QualifiedName returns namespace and name combination of the PVC
|
||||
func (pvcr *PVCWithResizeRequest) QualifiedName() string {
|
||||
return strings.JoinQualifiedName(pvcr.PVC.Namespace, pvcr.PVC.Name)
|
||||
}
|
||||
|
||||
// NewVolumeResizeMap returns new VolumeResizeMap which acts as a cache
|
||||
// for holding pending resize requests.
|
||||
func NewVolumeResizeMap(kubeClient clientset.Interface) VolumeResizeMap {
|
||||
resizeMap := &volumeResizeMap{}
|
||||
resizeMap.pvcrs = make(map[types.UniquePVCName]*PVCWithResizeRequest)
|
||||
resizeMap.kubeClient = kubeClient
|
||||
return resizeMap
|
||||
}
|
||||
|
||||
// AddPVCUpdate adds pvc for resizing
|
||||
func (resizeMap *volumeResizeMap) AddPVCUpdate(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) {
|
||||
if pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.Name != pv.Spec.ClaimRef.Name {
|
||||
glog.V(4).Infof("Persistent Volume is not mapped to PVC being updated : %s", util.ClaimToClaimKey(pvc))
|
||||
return
|
||||
}
|
||||
|
||||
if pvc.Status.Phase != v1.ClaimBound {
|
||||
return
|
||||
}
|
||||
|
||||
resizeMap.Lock()
|
||||
defer resizeMap.Unlock()
|
||||
|
||||
pvcSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
|
||||
pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
|
||||
|
||||
if pvcStatusSize.Cmp(pvcSize) >= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Adding pvc %s with Size %s/%s for resizing", util.ClaimToClaimKey(pvc), pvcSize.String(), pvcStatusSize.String())
|
||||
|
||||
pvcRequest := &PVCWithResizeRequest{
|
||||
PVC: pvc,
|
||||
CurrentSize: pvcStatusSize,
|
||||
ExpectedSize: pvcSize,
|
||||
PersistentVolume: pv,
|
||||
}
|
||||
resizeMap.pvcrs[types.UniquePVCName(pvc.UID)] = pvcRequest
|
||||
}
|
||||
|
||||
// GetPVCsWithResizeRequest returns all pending pvc resize requests
|
||||
func (resizeMap *volumeResizeMap) GetPVCsWithResizeRequest() []*PVCWithResizeRequest {
|
||||
resizeMap.Lock()
|
||||
defer resizeMap.Unlock()
|
||||
|
||||
pvcrs := []*PVCWithResizeRequest{}
|
||||
for _, pvcr := range resizeMap.pvcrs {
|
||||
pvcrs = append(pvcrs, pvcr)
|
||||
}
|
||||
// Empty out pvcrs map, we will add back failed resize requests later
|
||||
resizeMap.pvcrs = map[types.UniquePVCName]*PVCWithResizeRequest{}
|
||||
return pvcrs
|
||||
}
|
||||
|
||||
// DeletePVC removes given pvc object from list of pvcs that needs resizing.
|
||||
// deleting a pvc in this map doesn't affect operations that are already inflight.
|
||||
func (resizeMap *volumeResizeMap) DeletePVC(pvc *v1.PersistentVolumeClaim) {
|
||||
resizeMap.Lock()
|
||||
defer resizeMap.Unlock()
|
||||
pvcUniqueName := types.UniquePVCName(pvc.UID)
|
||||
glog.V(5).Infof("Removing PVC %v from resize map", pvcUniqueName)
|
||||
delete(resizeMap.pvcrs, pvcUniqueName)
|
||||
}
|
||||
|
||||
// MarkAsResized marks a pvc as fully resized
|
||||
func (resizeMap *volumeResizeMap) MarkAsResized(pvcr *PVCWithResizeRequest, newSize resource.Quantity) error {
|
||||
resizeMap.Lock()
|
||||
defer resizeMap.Unlock()
|
||||
|
||||
emptyCondition := []v1.PersistentVolumeClaimCondition{}
|
||||
|
||||
err := resizeMap.updatePVCCapacityAndConditions(pvcr, newSize, emptyCondition)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Error updating PV spec capacity for volume %q with : %v", pvcr.QualifiedName(), err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdatePVSize updates just pv size after cloudprovider resizing is successful
|
||||
func (resizeMap *volumeResizeMap) UpdatePVSize(pvcr *PVCWithResizeRequest, newSize resource.Quantity) error {
|
||||
resizeMap.Lock()
|
||||
defer resizeMap.Unlock()
|
||||
|
||||
oldPv := pvcr.PersistentVolume
|
||||
pvClone := oldPv.DeepCopy()
|
||||
|
||||
oldData, err := json.Marshal(pvClone)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error marshaling PV : %q with error %v", pvClone.Name, err)
|
||||
}
|
||||
|
||||
pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
|
||||
|
||||
newData, err := json.Marshal(pvClone)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error marshaling PV : %q with error %v", pvClone.Name, err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvClone)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Creating two way merge patch for PV : %q with error %v", pvClone.Name, err)
|
||||
}
|
||||
|
||||
_, updateErr := resizeMap.kubeClient.CoreV1().PersistentVolumes().Patch(pvClone.Name, commontypes.StrategicMergePatchType, patchBytes)
|
||||
|
||||
if updateErr != nil {
|
||||
glog.V(4).Infof("Error updating pv %q with error : %v", pvClone.Name, updateErr)
|
||||
return updateErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (resizeMap *volumeResizeMap) updatePVCCapacityAndConditions(pvcr *PVCWithResizeRequest, newSize resource.Quantity, pvcConditions []v1.PersistentVolumeClaimCondition) error {
|
||||
|
||||
claimClone := pvcr.PVC.DeepCopy()
|
||||
|
||||
claimClone.Status.Capacity[v1.ResourceStorage] = newSize
|
||||
claimClone.Status.Conditions = pvcConditions
|
||||
|
||||
_, updateErr := resizeMap.kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone)
|
||||
if updateErr != nil {
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s] status: failed: %v", pvcr.QualifiedName(), updateErr)
|
||||
return updateErr
|
||||
}
|
||||
return nil
|
||||
}
|
88
pkg/controller/volume/expand/cache/volume_resize_map_test.go
vendored
Normal file
88
pkg/controller/volume/expand/cache/volume_resize_map_test.go
vendored
Normal file
@ -0,0 +1,88 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/volume/util/types"
|
||||
)
|
||||
|
||||
func Test_AddValidPvcUpdate(t *testing.T) {
|
||||
resizeMap := createTestVolumeResizeMap()
|
||||
claim1 := testVolumeClaim("foo", "ns", v1.PersistentVolumeClaimSpec{
|
||||
AccessModes: []v1.PersistentVolumeAccessMode{
|
||||
v1.ReadWriteOnce,
|
||||
v1.ReadOnlyMany,
|
||||
},
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G"),
|
||||
},
|
||||
},
|
||||
VolumeName: "foo",
|
||||
})
|
||||
|
||||
claimClone := claim1.DeepCopy()
|
||||
claimClone.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("12G")
|
||||
pv := getPersistentVolume("foo", resource.MustParse("10G"), claim1)
|
||||
resizeMap.AddPVCUpdate(claimClone, pv)
|
||||
pvcr := resizeMap.GetPVCsWithResizeRequest()
|
||||
if len(pvcr) != 1 {
|
||||
t.Fatalf("Expected 1 pvc resize request got 0")
|
||||
}
|
||||
assert.Equal(t, resource.MustParse("12G"), pvcr[0].ExpectedSize)
|
||||
assert.Equal(t, 0, len(resizeMap.pvcrs))
|
||||
}
|
||||
|
||||
func createTestVolumeResizeMap() *volumeResizeMap {
|
||||
fakeClient := &fake.Clientset{}
|
||||
resizeMap := &volumeResizeMap{}
|
||||
resizeMap.pvcrs = make(map[types.UniquePVCName]*PVCWithResizeRequest)
|
||||
resizeMap.kubeClient = fakeClient
|
||||
return resizeMap
|
||||
}
|
||||
|
||||
func testVolumeClaim(name string, namespace string, spec v1.PersistentVolumeClaimSpec) *v1.PersistentVolumeClaim {
|
||||
return &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
|
||||
Spec: spec,
|
||||
Status: v1.PersistentVolumeClaimStatus{
|
||||
Phase: v1.ClaimBound,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func getPersistentVolume(volumeName string, capacity resource.Quantity, pvc *v1.PersistentVolumeClaim) *v1.PersistentVolume {
|
||||
return &v1.PersistentVolume{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: volumeName},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceName(v1.ResourceStorage): capacity,
|
||||
},
|
||||
ClaimRef: &v1.ObjectReference{
|
||||
Namespace: pvc.Namespace,
|
||||
Name: pvc.Name,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
268
pkg/controller/volume/expand/expand_controller.go
Normal file
268
pkg/controller/volume/expand/expand_controller.go
Normal file
@ -0,0 +1,268 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package expand implements interfaces that attempt to resize a pvc
|
||||
// by adding pvc to a volume resize map from which PVCs are picked and
|
||||
// resized
|
||||
package expand
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
kcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
"k8s.io/kubernetes/pkg/util/io"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
)
|
||||
|
||||
const (
|
||||
// How often resizing loop runs
|
||||
syncLoopPeriod time.Duration = 30 * time.Second
|
||||
// How often pvc populator runs
|
||||
populatorLoopPeriod time.Duration = 2 * time.Minute
|
||||
)
|
||||
|
||||
// ExpandController expands the pvs
|
||||
type ExpandController interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
}
|
||||
|
||||
type expandController struct {
|
||||
// kubeClient is the kube API client used by volumehost to communicate with
|
||||
// the API server.
|
||||
kubeClient clientset.Interface
|
||||
|
||||
// pvcLister is the shared PVC lister used to fetch and store PVC
|
||||
// objects from the API server. It is shared with other controllers and
|
||||
// therefore the PVC objects in its store should be treated as immutable.
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvcsSynced kcache.InformerSynced
|
||||
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
pvSynced kcache.InformerSynced
|
||||
|
||||
// cloud provider used by volume host
|
||||
cloud cloudprovider.Interface
|
||||
|
||||
// volumePluginMgr used to initialize and fetch volume plugins
|
||||
volumePluginMgr volume.VolumePluginMgr
|
||||
|
||||
// recorder is used to record events in the API server
|
||||
recorder record.EventRecorder
|
||||
|
||||
// Volume resize map of volumes that needs resizing
|
||||
resizeMap cache.VolumeResizeMap
|
||||
|
||||
// Worker goroutine to process resize requests from resizeMap
|
||||
syncResize SyncVolumeResize
|
||||
|
||||
// Operation executor
|
||||
opExecutor operationexecutor.OperationExecutor
|
||||
|
||||
// populator for periodically polling all PVCs
|
||||
pvcPopulator PVCPopulator
|
||||
}
|
||||
|
||||
func NewExpandController(
|
||||
kubeClient clientset.Interface,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
cloud cloudprovider.Interface,
|
||||
plugins []volume.VolumePlugin) (ExpandController, error) {
|
||||
|
||||
expc := &expandController{
|
||||
kubeClient: kubeClient,
|
||||
cloud: cloud,
|
||||
pvcLister: pvcInformer.Lister(),
|
||||
pvcsSynced: pvcInformer.Informer().HasSynced,
|
||||
pvLister: pvInformer.Lister(),
|
||||
pvSynced: pvInformer.Informer().HasSynced,
|
||||
}
|
||||
|
||||
if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
|
||||
return nil, fmt.Errorf("Could not initialize volume plugins for Expand Controller : %+v", err)
|
||||
}
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
|
||||
|
||||
expc.opExecutor = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
kubeClient,
|
||||
&expc.volumePluginMgr,
|
||||
recorder,
|
||||
false))
|
||||
|
||||
expc.resizeMap = cache.NewVolumeResizeMap(expc.kubeClient)
|
||||
|
||||
pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: expc.pvcUpdate,
|
||||
DeleteFunc: expc.deletePVC,
|
||||
})
|
||||
|
||||
expc.syncResize = NewSyncVolumeResize(syncLoopPeriod, expc.opExecutor, expc.resizeMap, kubeClient)
|
||||
expc.pvcPopulator = NewPVCPopulator(
|
||||
populatorLoopPeriod,
|
||||
expc.resizeMap,
|
||||
expc.pvcLister,
|
||||
expc.pvLister,
|
||||
kubeClient)
|
||||
return expc, nil
|
||||
}
|
||||
|
||||
func (expc *expandController) Run(stopCh <-chan struct{}) {
|
||||
defer runtime.HandleCrash()
|
||||
glog.Infof("Starting expand controller")
|
||||
defer glog.Infof("Shutting down expand controller")
|
||||
|
||||
if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
// Run volume sync work goroutine
|
||||
go expc.syncResize.Run(stopCh)
|
||||
// Start the pvc populator loop
|
||||
go expc.pvcPopulator.Run(stopCh)
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (expc *expandController) deletePVC(obj interface{}) {
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
|
||||
if pvc == nil || !ok {
|
||||
return
|
||||
}
|
||||
|
||||
expc.resizeMap.DeletePVC(pvc)
|
||||
}
|
||||
|
||||
func (expc *expandController) pvcUpdate(oldObj, newObj interface{}) {
|
||||
oldPvc, ok := oldObj.(*v1.PersistentVolumeClaim)
|
||||
|
||||
if oldPvc == nil || !ok {
|
||||
return
|
||||
}
|
||||
|
||||
newPVC, ok := newObj.(*v1.PersistentVolumeClaim)
|
||||
|
||||
if newPVC == nil || !ok {
|
||||
return
|
||||
}
|
||||
pv, err := getPersistentVolume(newPVC, expc.pvLister)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Error getting Persistent Volume for pvc %q : %v", newPVC.UID, err)
|
||||
return
|
||||
}
|
||||
expc.resizeMap.AddPVCUpdate(newPVC, pv)
|
||||
}
|
||||
|
||||
func getPersistentVolume(pvc *v1.PersistentVolumeClaim, pvLister corelisters.PersistentVolumeLister) (*v1.PersistentVolume, error) {
|
||||
volumeName := pvc.Spec.VolumeName
|
||||
pv, err := pvLister.Get(volumeName)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to find PV %q in PV informer cache with error : %v", volumeName, err)
|
||||
}
|
||||
|
||||
return pv.DeepCopy(), nil
|
||||
}
|
||||
|
||||
// Implementing VolumeHost interface
|
||||
func (expc *expandController) GetPluginDir(pluginName string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (expc *expandController) GetKubeClient() clientset.Interface {
|
||||
return expc.kubeClient
|
||||
}
|
||||
|
||||
func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
|
||||
return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation")
|
||||
}
|
||||
|
||||
func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
|
||||
return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation")
|
||||
}
|
||||
|
||||
func (expc *expandController) GetCloudProvider() cloudprovider.Interface {
|
||||
return expc.cloud
|
||||
}
|
||||
|
||||
func (expc *expandController) GetMounter(pluginName string) mount.Interface {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (expc *expandController) GetExec(pluginName string) mount.Exec {
|
||||
return mount.NewOsExec()
|
||||
}
|
||||
|
||||
func (expc *expandController) GetWriter() io.Writer {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (expc *expandController) GetHostName() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (expc *expandController) GetHostIP() (net.IP, error) {
|
||||
return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation")
|
||||
}
|
||||
|
||||
func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) {
|
||||
return v1.ResourceList{}, nil
|
||||
}
|
||||
|
||||
func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
|
||||
return func(_, _ string) (*v1.Secret, error) {
|
||||
return nil, fmt.Errorf("GetSecret unsupported in expandController")
|
||||
}
|
||||
}
|
||||
|
||||
func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return func(_, _ string) (*v1.ConfigMap, error) {
|
||||
return nil, fmt.Errorf("GetConfigMap unsupported in expandController")
|
||||
}
|
||||
}
|
||||
|
||||
func (expc *expandController) GetNodeLabels() (map[string]string, error) {
|
||||
return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
|
||||
}
|
85
pkg/controller/volume/expand/pvc_populator.go
Normal file
85
pkg/controller/volume/expand/pvc_populator.go
Normal file
@ -0,0 +1,85 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package reconciler implements interfaces that attempt to reconcile the
|
||||
// desired state of the with the actual state of the world by triggering
|
||||
// actions.
|
||||
|
||||
package expand
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
)
|
||||
|
||||
// PVCPopulator iterates through PVCs and checks if for bound PVCs
|
||||
// their size doesn't match with Persistent Volume size
|
||||
type PVCPopulator interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
}
|
||||
|
||||
type pvcPopulator struct {
|
||||
loopPeriod time.Duration
|
||||
resizeMap cache.VolumeResizeMap
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvLister corelisters.PersistentVolumeLister
|
||||
kubeClient clientset.Interface
|
||||
}
|
||||
|
||||
func NewPVCPopulator(
|
||||
loopPeriod time.Duration,
|
||||
resizeMap cache.VolumeResizeMap,
|
||||
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
pvLister corelisters.PersistentVolumeLister,
|
||||
kubeClient clientset.Interface) PVCPopulator {
|
||||
populator := &pvcPopulator{
|
||||
loopPeriod: loopPeriod,
|
||||
pvcLister: pvcLister,
|
||||
pvLister: pvLister,
|
||||
resizeMap: resizeMap,
|
||||
kubeClient: kubeClient,
|
||||
}
|
||||
return populator
|
||||
}
|
||||
|
||||
func (populator *pvcPopulator) Run(stopCh <-chan struct{}) {
|
||||
wait.Until(populator.Sync, populator.loopPeriod, stopCh)
|
||||
}
|
||||
|
||||
func (populator *pvcPopulator) Sync() {
|
||||
pvcs, err := populator.pvcLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Listing PVCs failed in populator : %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pvc := range pvcs {
|
||||
pv, err := getPersistentVolume(pvc, populator.pvLister)
|
||||
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Error getting persistent volume for pvc %q : %v", pvc.UID, err)
|
||||
continue
|
||||
}
|
||||
populator.resizeMap.AddPVCUpdate(pvc, pv)
|
||||
}
|
||||
}
|
101
pkg/controller/volume/expand/sync_volume_resize.go
Normal file
101
pkg/controller/volume/expand/sync_volume_resize.go
Normal file
@ -0,0 +1,101 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package expand
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/expand/util"
|
||||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
|
||||
)
|
||||
|
||||
type SyncVolumeResize interface {
|
||||
Run(stopCh <-chan struct{})
|
||||
}
|
||||
|
||||
type syncResize struct {
|
||||
loopPeriod time.Duration
|
||||
resizeMap cache.VolumeResizeMap
|
||||
opsExecutor operationexecutor.OperationExecutor
|
||||
kubeClient clientset.Interface
|
||||
}
|
||||
|
||||
// NewSyncVolumeResize returns actual volume resize handler
|
||||
func NewSyncVolumeResize(
|
||||
loopPeriod time.Duration,
|
||||
opsExecutor operationexecutor.OperationExecutor,
|
||||
resizeMap cache.VolumeResizeMap,
|
||||
kubeClient clientset.Interface) SyncVolumeResize {
|
||||
rc := &syncResize{
|
||||
loopPeriod: loopPeriod,
|
||||
opsExecutor: opsExecutor,
|
||||
resizeMap: resizeMap,
|
||||
kubeClient: kubeClient,
|
||||
}
|
||||
return rc
|
||||
}
|
||||
|
||||
func (rc *syncResize) Run(stopCh <-chan struct{}) {
|
||||
wait.Until(rc.Sync, rc.loopPeriod, stopCh)
|
||||
}
|
||||
|
||||
func (rc *syncResize) Sync() {
|
||||
// Resize PVCs that require resize
|
||||
for _, pvcWithResizeRequest := range rc.resizeMap.GetPVCsWithResizeRequest() {
|
||||
uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey())
|
||||
updatedClaim, err := markPVCResizeInProgress(pvcWithResizeRequest, rc.kubeClient)
|
||||
if err != nil {
|
||||
glog.V(5).Infof("Error setting PVC %s in progress with error : %v", pvcWithResizeRequest.QualifiedName(), err)
|
||||
continue
|
||||
}
|
||||
if updatedClaim != nil {
|
||||
pvcWithResizeRequest.PVC = updatedClaim
|
||||
}
|
||||
|
||||
if rc.opsExecutor.IsOperationPending(uniqueVolumeKey, "") {
|
||||
glog.V(10).Infof("Operation for PVC %v is already pending", pvcWithResizeRequest.QualifiedName())
|
||||
continue
|
||||
}
|
||||
glog.V(5).Infof("Starting opsExecutor.ExpandVolume for volume %s", pvcWithResizeRequest.QualifiedName())
|
||||
growFuncError := rc.opsExecutor.ExpandVolume(pvcWithResizeRequest, rc.resizeMap)
|
||||
if growFuncError != nil && !exponentialbackoff.IsExponentialBackoff(growFuncError) {
|
||||
glog.Errorf("Error growing pvc %s with %v", pvcWithResizeRequest.QualifiedName(), growFuncError)
|
||||
}
|
||||
if growFuncError == nil {
|
||||
glog.V(5).Infof("Started opsExecutor.ExpandVolume for volume %s", pvcWithResizeRequest.QualifiedName())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func markPVCResizeInProgress(pvcWithResizeRequest *cache.PVCWithResizeRequest, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
|
||||
// Mark PVC as Resize Started
|
||||
progressCondition := v1.PersistentVolumeClaimCondition{
|
||||
Type: v1.PersistentVolumeClaimResizing,
|
||||
Status: v1.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
}
|
||||
conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
|
||||
|
||||
return util.UpdatePVCCondition(pvcWithResizeRequest.PVC, conditions, kubeClient)
|
||||
}
|
26
pkg/controller/volume/expand/util/BUILD
Normal file
26
pkg/controller/volume/expand/util/BUILD
Normal file
@ -0,0 +1,26 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["util.go"],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
46
pkg/controller/volume/expand/util/util.go
Normal file
46
pkg/controller/volume/expand/util/util.go
Normal file
@ -0,0 +1,46 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// ClaimToClaimKey return namespace/name string for pvc
|
||||
func ClaimToClaimKey(claim *v1.PersistentVolumeClaim) string {
|
||||
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
|
||||
}
|
||||
|
||||
// UpdatePVCCondition updates pvc with given condition status
|
||||
func UpdatePVCCondition(pvc *v1.PersistentVolumeClaim,
|
||||
pvcConditions []v1.PersistentVolumeClaimCondition,
|
||||
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
|
||||
|
||||
claimClone := pvc.DeepCopy()
|
||||
claimClone.Status.Conditions = pvcConditions
|
||||
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(claimClone.Namespace).UpdateStatus(claimClone)
|
||||
if updateErr != nil {
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s] status: failed: %v", ClaimToClaimKey(pvc), updateErr)
|
||||
return nil, updateErr
|
||||
}
|
||||
return updatedClaim, nil
|
||||
}
|
@ -127,6 +127,10 @@ func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (plugin *glusterfsPlugin) RequiresFSResize() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
|
||||
return []v1.PersistentVolumeAccessMode{
|
||||
v1.ReadWriteOnce,
|
||||
@ -1046,3 +1050,49 @@ func parseClassParameters(params map[string]string, kubeClient clientset.Interfa
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
func (plugin *glusterfsPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) {
|
||||
pvSpec := spec.PersistentVolume.Spec
|
||||
glog.V(2).Infof("Request to expand volume: %s ", pvSpec.Glusterfs.Path)
|
||||
volumeName := pvSpec.Glusterfs.Path
|
||||
|
||||
// Fetch the volume for expansion.
|
||||
volumeID := dstrings.TrimPrefix(volumeName, volPrefix)
|
||||
|
||||
//Get details of SC.
|
||||
class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume)
|
||||
if err != nil {
|
||||
return oldSize, err
|
||||
}
|
||||
cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient())
|
||||
if err != nil {
|
||||
return oldSize, err
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Expanding volume %q with configuration %+v", volumeID, cfg)
|
||||
|
||||
//Create REST server connection
|
||||
cli := gcli.NewClient(cfg.url, cfg.user, cfg.secretValue)
|
||||
if cli == nil {
|
||||
glog.Errorf("failed to create glusterfs rest client")
|
||||
return oldSize, fmt.Errorf("failed to create glusterfs rest client, REST server authentication failed")
|
||||
}
|
||||
|
||||
// Find out delta size
|
||||
expansionSize := (newSize.Value() - oldSize.Value())
|
||||
expansionSizeGB := int(volume.RoundUpSize(expansionSize, 1024*1024*1024))
|
||||
|
||||
// Make volume expansion request
|
||||
volumeExpandReq := &gapi.VolumeExpandRequest{Size: expansionSizeGB}
|
||||
|
||||
// Expand the volume
|
||||
volumeInfoRes, err := cli.VolumeExpand(volumeID, volumeExpandReq)
|
||||
if err != nil {
|
||||
glog.Errorf("error when expanding the volume :%v", err)
|
||||
return oldSize, err
|
||||
}
|
||||
|
||||
glog.V(2).Infof("volume %s expanded to new size %d successfully", volumeName, volumeInfoRes.Size)
|
||||
newVolumeSize := resource.MustParse(fmt.Sprintf("%dG", volumeInfoRes.Size))
|
||||
return newVolumeSize, nil
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@ -201,6 +202,14 @@ type AttachableVolumePlugin interface {
|
||||
GetDeviceMountRefs(deviceMountPath string) ([]string, error)
|
||||
}
|
||||
|
||||
// ExpandableVolumePlugin is an extended interface of VolumePlugin and is used for volumes that can be
|
||||
// expanded
|
||||
type ExpandableVolumePlugin interface {
|
||||
VolumePlugin
|
||||
ExpandVolumeDevice(spec *Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error)
|
||||
RequiresFSResize() bool
|
||||
}
|
||||
|
||||
// VolumeHost is an interface that plugins can use to access the kubelet.
|
||||
type VolumeHost interface {
|
||||
// GetPluginDir returns the absolute path to a directory under which
|
||||
@ -642,6 +651,32 @@ func (pm *VolumePluginMgr) FindAttachablePluginByName(name string) (AttachableVo
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// FindExpandablePluginBySpec fetches a persistent volume plugin by spec.
|
||||
func (pm *VolumePluginMgr) FindExpandablePluginBySpec(spec *Spec) (ExpandableVolumePlugin, error) {
|
||||
volumePlugin, err := pm.FindPluginBySpec(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok {
|
||||
return expandableVolumePlugin, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// FindExpandablePluginBySpec fetches a persistent volume plugin by name.
|
||||
func (pm *VolumePluginMgr) FindExpandablePluginByName(name string) (ExpandableVolumePlugin, error) {
|
||||
volumePlugin, err := pm.FindPluginByName(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if expandableVolumePlugin, ok := volumePlugin.(ExpandableVolumePlugin); ok {
|
||||
return expandableVolumePlugin, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler
|
||||
// pod. By default, a recycler pod simply runs "rm -rf" on a volume and tests
|
||||
// for emptiness. Most attributes of the template will be correct for most
|
||||
|
@ -13,6 +13,7 @@ go_library(
|
||||
"operation_generator.go",
|
||||
],
|
||||
deps = [
|
||||
"//pkg/controller/volume/expand/cache:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/kubelet/events:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
@ -37,6 +38,7 @@ go_test(
|
||||
srcs = ["operation_executor_test.go"],
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/controller/volume/expand/cache:go_default_library",
|
||||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/util/types:go_default_library",
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
@ -119,6 +120,8 @@ type OperationExecutor interface {
|
||||
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
|
||||
// otherwise it returns false
|
||||
IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
|
||||
// Expand Volume will grow size available to PVC
|
||||
ExpandVolume(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) error
|
||||
}
|
||||
|
||||
// NewOperationExecutor returns a new instance of OperationExecutor.
|
||||
@ -719,6 +722,17 @@ func (oe *operationExecutor) UnmountDevice(
|
||||
deviceToDetach.VolumeName, "" /* podName */, unmountDeviceFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) ExpandVolume(pvcWithResizeRequest *expandcache.PVCWithResizeRequest, resizeMap expandcache.VolumeResizeMap) error {
|
||||
expandFunc, pluginName, err := oe.operationGenerator.GenerateExpandVolumeFunc(pvcWithResizeRequest, resizeMap)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uniqueVolumeKey := v1.UniqueVolumeName(pvcWithResizeRequest.UniquePVCKey())
|
||||
opCompleteFunc := util.OperationCompleteHook(pluginName, "expand_volume")
|
||||
return oe.pendingOperations.Run(uniqueVolumeKey, "", expandFunc, opCompleteFunc)
|
||||
}
|
||||
|
||||
func (oe *operationExecutor) VerifyControllerAttachedVolume(
|
||||
volumeToMount VolumeToMount,
|
||||
nodeName types.NodeName,
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
|
||||
@ -282,6 +283,14 @@ func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(v
|
||||
}, "", nil
|
||||
}
|
||||
|
||||
func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvcWithResizeRequest *expandcache.PVCWithResizeRequest,
|
||||
resizeMap expandcache.VolumeResizeMap) (func() error, string, error) {
|
||||
return func() error {
|
||||
startOperationAndBlock(fopg.ch, fopg.quit)
|
||||
return nil
|
||||
}, "", nil
|
||||
}
|
||||
|
||||
func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
|
||||
pluginNodeVolumes map[types.NodeName][]*volume.Spec,
|
||||
pluginNane string,
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kevents "k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
@ -100,6 +101,8 @@ type OperationGenerator interface {
|
||||
map[types.NodeName][]*volume.Spec,
|
||||
string,
|
||||
map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (func() error, error)
|
||||
|
||||
GenerateExpandVolumeFunc(*expandcache.PVCWithResizeRequest, expandcache.VolumeResizeMap) (func() error, string, error)
|
||||
}
|
||||
|
||||
func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
|
||||
@ -726,6 +729,59 @@ func (og *operationGenerator) verifyVolumeIsSafeToDetach(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (og *operationGenerator) GenerateExpandVolumeFunc(
|
||||
pvcWithResizeRequest *expandcache.PVCWithResizeRequest,
|
||||
resizeMap expandcache.VolumeResizeMap) (func() error, string, error) {
|
||||
|
||||
volumeSpec := volume.NewSpecFromPersistentVolume(pvcWithResizeRequest.PersistentVolume, false)
|
||||
|
||||
volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
|
||||
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("Error finding plugin for expanding volume: %q with error %v", pvcWithResizeRequest.QualifiedName(), err)
|
||||
}
|
||||
|
||||
expandFunc := func() error {
|
||||
newSize := pvcWithResizeRequest.ExpectedSize
|
||||
pvSize := pvcWithResizeRequest.PersistentVolume.Spec.Capacity[v1.ResourceStorage]
|
||||
if pvSize.Cmp(newSize) < 0 {
|
||||
updatedSize, expandErr := volumePlugin.ExpandVolumeDevice(
|
||||
volumeSpec,
|
||||
pvcWithResizeRequest.ExpectedSize,
|
||||
pvcWithResizeRequest.CurrentSize)
|
||||
|
||||
if expandErr != nil {
|
||||
glog.Errorf("Error expanding volume %q of plugin %s : %v", pvcWithResizeRequest.QualifiedName(), volumePlugin.GetPluginName(), expandErr)
|
||||
og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, expandErr.Error())
|
||||
return expandErr
|
||||
}
|
||||
newSize = updatedSize
|
||||
updateErr := resizeMap.UpdatePVSize(pvcWithResizeRequest, newSize)
|
||||
|
||||
if updateErr != nil {
|
||||
glog.V(4).Infof("Error updating PV spec capacity for volume %q with : %v", pvcWithResizeRequest.QualifiedName(), updateErr)
|
||||
og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, updateErr.Error())
|
||||
return updateErr
|
||||
}
|
||||
}
|
||||
|
||||
// No Cloudprovider resize needed, lets mark resizing as done
|
||||
if !volumePlugin.RequiresFSResize() {
|
||||
glog.V(4).Infof("Controller resizing done for PVC %s", pvcWithResizeRequest.QualifiedName())
|
||||
err := resizeMap.MarkAsResized(pvcWithResizeRequest, newSize)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("Error marking pvc %s as resized : %v", pvcWithResizeRequest.QualifiedName(), err)
|
||||
og.recorder.Eventf(pvcWithResizeRequest.PVC, v1.EventTypeWarning, kevents.VolumeResizeFailed, err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
return expandFunc, volumePlugin.GetPluginName(), nil
|
||||
}
|
||||
|
||||
func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
|
||||
mountOptions := volume.MountOptionFromSpec(volumeToMount.VolumeSpec)
|
||||
|
||||
|
@ -21,3 +21,6 @@ import "k8s.io/apimachinery/pkg/types"
|
||||
|
||||
// UniquePodName defines the type to key pods off of
|
||||
type UniquePodName types.UID
|
||||
|
||||
// UniquePVCName defines the type to key pvc off
|
||||
type UniquePVCName types.UID
|
||||
|
Loading…
Reference in New Issue
Block a user