PersistentVolumeClaimBinder implementation

This commit is contained in:
markturansky
2015-04-14 17:14:39 -04:00
parent 832973c09e
commit 242567460d
10 changed files with 845 additions and 7 deletions

View File

@@ -0,0 +1,216 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"sync"
"time"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims.
type PersistentVolumeClaimBinder struct {
volumeStore *persistentVolumeOrderedIndex
claimStore cache.Store
client client.Interface
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolumeClaimBinder {
volumeStore := NewPersistentVolumeOrderedIndex()
volumeReflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolume{},
volumeStore,
0,
)
volumeReflector.Run()
claimStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
claimReflector := cache.NewReflector(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolumeClaim{},
claimStore,
0,
)
claimReflector.Run()
binder := &PersistentVolumeClaimBinder{
volumeStore: volumeStore,
claimStore: claimStore,
client: kubeClient,
}
return binder
}
func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
go util.Forever(func() { controller.synchronize() }, period)
}
// Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop,
// because we're reconciling two Kinds in this component and I didn't want to dupe the loop
type Synchronizer struct {
ListFunc func() []interface{}
ReconcileFunc func(interface{}) error
}
func (controller *PersistentVolumeClaimBinder) synchronize() {
volumeSynchronizer := Synchronizer{
ListFunc: controller.volumeStore.List,
ReconcileFunc: controller.syncPersistentVolume,
}
claimsSynchronizer := Synchronizer{
ListFunc: controller.claimStore.List,
ReconcileFunc: controller.syncPersistentVolumeClaim,
}
controller.reconcile(volumeSynchronizer, claimsSynchronizer)
}
func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchronizer) {
for _, synchronizer := range synchronizers {
items := synchronizer.ListFunc()
if len(items) == 0 {
continue
}
wg := sync.WaitGroup{}
wg.Add(len(items))
for ix := range items {
go func(ix int) {
defer wg.Done()
obj := items[ix]
glog.V(5).Infof("Reconciliation of %v", obj)
err := synchronizer.ReconcileFunc(obj)
if err != nil {
glog.Errorf("Error reconciling: %v", err)
}
}(ix)
}
wg.Wait()
}
}
// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists.
func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error {
volume := obj.(*api.PersistentVolume)
glog.V(5).Infof("Synchronizing persistent volume: %s\n", volume.Name)
// verify the volume is still claimed by a user
if volume.Spec.ClaimRef != nil {
if _, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil {
glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name)
} else {
//claim was deleted by user.
glog.V(3).Infof("PersistentVolumeClaim[UID=%s] unbound from PersistentVolume[UID=%s]\n", volume.Spec.ClaimRef.UID, volume.UID)
volume.Spec.ClaimRef = nil
volume.Status.Phase = api.VolumeReleased
volume, err = controller.client.PersistentVolumes().Update(volume)
if err != nil {
glog.V(3).Infof("Error updating volume: %+v\n", err)
}
}
}
return nil
}
func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error {
claim := obj.(*api.PersistentVolumeClaim)
glog.V(5).Infof("Synchronizing persistent volume claim: %s\n", claim.Name)
if claim.Status.VolumeRef != nil {
glog.V(5).Infof("PersistentVolumeClaim[UID=%s] is bound to PersistentVolume[UID=%s]\n", claim.Name, claim.Status.VolumeRef.Name)
return nil
}
volume, err := controller.volumeStore.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if volume != nil {
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
volumeRef, err := api.GetReference(volume)
if err != nil {
return fmt.Errorf("Unexpected error getting volume reference: %v\n", err)
}
// make a binding reference to the claim
volume.Spec.ClaimRef = claimRef
volume, err = controller.client.PersistentVolumes().Update(volume)
if err != nil {
glog.V(3).Infof("Error updating volume: %+v\n", err)
} else {
// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim
claim.Status.Phase = api.ClaimBound
claim.Status.VolumeRef = volumeRef
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
if err != nil {
glog.V(3).Infof("Error updating claim: %+v\n", err)
// uset ClaimRef on the pointer to make it available for binding again
volume.Spec.ClaimRef = nil
volume, err = controller.client.PersistentVolumes().Update(volume)
// unset VolumeRef on the pointer so this claim can be processed next sync loop
claim.Status.VolumeRef = nil
} else {
glog.V(2).Infof("PersistentVolumeClaim[UID=%s] bound to PersistentVolume[UID=%s]\n", claim.UID, volume.UID)
}
}
} else {
glog.V(5).Infof("No volume match found for %s\n", claim.UID)
}
return nil
}

View File

@@ -0,0 +1,200 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
)
func TestExampleObjects(t *testing.T) {
scenarios := map[string]struct {
expected interface{}
}{
"claims/claim-01.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
},
},
"claims/claim-02.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
},
},
},
},
"volumes/local-01.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data01",
},
},
},
},
},
"volumes/local-02.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
},
},
},
}
for name, scenario := range scenarios {
o := testclient.NewObjects(api.Scheme)
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/"+name, o); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) {
pvc, err := client.PersistentVolumeClaims("ns").Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolumeClaim)
if pvc.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pvc.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pvc.Spec.Resources.Requests[api.ResourceStorage]
bQty := expected.Spec.Resources.Requests[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) {
pv, err := client.PersistentVolumes().Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolume)
if pv.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pv.Spec.Capacity[api.ResourceStorage]
bQty := expected.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
if pv.Spec.HostPath.Path != expected.Spec.HostPath.Path {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.HostPath.Path, expected.Spec.HostPath.Path)
}
}
}
}
func TestBindingWithExamples(t *testing.T) {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
o := testclient.NewObjects(api.Scheme)
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/claims/claim-01.yaml", o); err != nil {
t.Fatal(err)
}
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/volumes/local-01.yaml", o); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
pv, err := client.PersistentVolumes().Get("any")
if err != nil {
t.Error("Unexpected error getting PV from client: %v", err)
}
claim, error := client.PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Error("Unexpected error getting PVC from client: %v", err)
}
controller := NewPersistentVolumeClaimBinder(client)
err = controller.volumeStore.Add(pv)
if err != nil {
t.Error("Unexpected error: %v", err)
}
if _, exists, _ := controller.volumeStore.Get(pv); !exists {
t.Error("Expected to find volume in the index")
}
err = controller.syncPersistentVolumeClaim(claim)
if err != nil {
t.Error("Unexpected error: %v", err)
}
if claim.Status.VolumeRef == nil {
t.Error("Expected claim to be bound to volume")
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
}

View File

@@ -0,0 +1,268 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"testing"
)
func TestMatchVolume(t *testing.T) {
volList := NewPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
volList.Add(pv)
}
scenarios := map[string]struct {
expectedMatch string
claim *api.PersistentVolumeClaim
}{
"successful-match-gce-10": {
expectedMatch: "gce-pd-10",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
},
},
},
},
},
"successful-match-nfs-5": {
expectedMatch: "nfs-5",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
},
},
},
},
"successful-skip-1g-bound-volume": {
expectedMatch: "gce-pd-5",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
},
},
},
},
"successful-no-match": {
expectedMatch: "",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("999G"),
},
},
},
},
},
}
for name, scenario := range scenarios {
volume, err := volList.FindBestMatchForClaim(scenario.claim)
if err != nil {
t.Errorf("Unexpected error matching volume by claim: %v", err)
}
if scenario.expectedMatch != "" && volume == nil {
t.Errorf("Expected match but received nil volume for scenario: %s", name)
}
if scenario.expectedMatch != "" && volume != nil && string(volume.UID) != scenario.expectedMatch {
t.Errorf("Expected %s but got volume %s instead", scenario.expectedMatch, volume.UID)
}
if scenario.expectedMatch == "" && volume != nil {
t.Errorf("Unexpected match for scenario: %s", name)
}
}
}
func TestSort(t *testing.T) {
volList := NewPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
volList.Add(pv)
}
volumes, err := volList.ListByAccessModes([]api.AccessModeType{api.ReadWriteOnce, api.ReadOnlyMany})
if err != nil {
t.Error("Unexpected error retrieving volumes by access modes:", err)
}
for i, expected := range []string{"gce-pd-1", "gce-pd-5", "gce-pd-10"} {
if string(volumes[i].UID) != expected {
t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID)
}
}
volumes, err = volList.ListByAccessModes([]api.AccessModeType{api.ReadWriteOnce, api.ReadOnlyMany, api.ReadWriteMany})
if err != nil {
t.Error("Unexpected error retrieving volumes by access modes:", err)
}
for i, expected := range []string{"nfs-1", "nfs-5", "nfs-10"} {
if string(volumes[i].UID) != expected {
t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID)
}
}
}
func createTestVolumes() []*api.PersistentVolume {
// these volumes are deliberately out-of-order to test indexing and sorting
return []*api.PersistentVolume{
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-10",
Name: "gce003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-5",
Name: "nfs002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-1",
Name: "gce001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
// this one we're pretending is already bound
ClaimRef: &api.ObjectReference{UID: "abc123"},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-10",
Name: "nfs003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-5",
Name: "gce002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-1",
Name: "nfs001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
}
}

View File

@@ -0,0 +1,147 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"sort"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
)
// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity.
type persistentVolumeOrderedIndex struct {
cache.Indexer
}
var _ cache.Store = &persistentVolumeOrderedIndex{} // persistentVolumeOrderedIndex is a Store
func NewPersistentVolumeOrderedIndex() *persistentVolumeOrderedIndex {
return &persistentVolumeOrderedIndex{
cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc}),
}
}
// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string
func accessModesIndexFunc(obj interface{}) (string, error) {
if pv, ok := obj.(*api.PersistentVolume); ok {
modes := volume.GetAccessModesAsString(pv.Spec.AccessModes)
return modes, nil
}
return "", fmt.Errorf("object is not a persistent volume: %v", obj)
}
// ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high)
func (pvstore *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.AccessModeType) ([]*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
},
}
objs, err := pvstore.Index("accessmodes", pv)
if err != nil {
return nil, err
}
volumes := make([]*api.PersistentVolume, len(objs))
for i, obj := range objs {
volumes[i] = obj.(*api.PersistentVolume)
}
sort.Sort(byCapacity{volumes})
return volumes, nil
}
// matchPredicate is a function that indicates that a persistent volume matches another
type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool
// Find returns the nearest PV from the ordered list or nil if a match is not found
func (pvstore *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
volumes, err := pvstore.ListByAccessModes(pv.Spec.AccessModes)
if err != nil {
return nil, err
}
i := sort.Search(len(volumes), func(i int) bool { return matchPredicate(pv, volumes[i]) })
if i < len(volumes) {
return volumes[i], nil
}
return nil, nil
}
// FindByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage
func (pvstore *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity(modes []api.AccessModeType, qty resource.Quantity) (*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): qty,
},
},
}
return pvstore.Find(pv, filterBoundVolumes)
}
// FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvstore *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
return pvstore.FindByAccessModesAndStorageCapacity(claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)])
}
// byCapacity is used to order volumes by ascending storage size
type byCapacity struct {
volumes []*api.PersistentVolume
}
func (c byCapacity) Less(i, j int) bool {
return matchStorageCapacity(c.volumes[i], c.volumes[j])
}
func (c byCapacity) Swap(i, j int) {
c.volumes[i], c.volumes[j] = c.volumes[j], c.volumes[i]
}
func (c byCapacity) Len() int {
return len(c.volumes)
}
// matchStorageCapacity is a matchPredicate used to sort and find volumes
func matchStorageCapacity(pvA, pvB *api.PersistentVolume) bool {
// skip already claimed volumes
if pvA.Spec.ClaimRef != nil {
return false
}
aQty := pvA.Spec.Capacity[api.ResourceStorage]
bQty := pvB.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
return aSize <= bSize
}
// filterBoundVolumes is a matchPredicate that filters bound volumes before comparing storage capacity
func filterBoundVolumes(compareThis, toThis *api.PersistentVolume) bool {
if toThis.Spec.ClaimRef != nil || compareThis.Spec.ClaimRef != nil {
return false
}
return matchStorageCapacity(compareThis, toThis)
}