Merge pull request #6105 from markturansky/yoko_pv_mgr

PersistentVolumeClaimBinder
This commit is contained in:
Tim Hockin 2015-04-27 13:35:44 -07:00
commit 635c393a5a
29 changed files with 1520 additions and 53 deletions

View File

@ -41,6 +41,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
@ -58,6 +59,8 @@ type CMServer struct {
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
NamespaceSyncPeriod time.Duration
PVClaimBinderSyncPeriod time.Duration
EnablePVCClaimBinder bool
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
@ -90,6 +93,7 @@ func NewCMServer() *CMServer {
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 5 * time.Minute,
PVClaimBinderSyncPeriod: 10 * time.Second,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMilliCPU: 1000,
@ -113,6 +117,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder_sync_period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims")
fs.BoolVar(&s.EnablePVCClaimBinder, "enable_alpha_pvclaimbinder", s.EnablePVCClaimBinder, "Optionally enable persistent volume claim binding. This feature is experimental and expected to change.")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
@ -231,6 +237,11 @@ func (s *CMServer) Run(_ []string) error {
namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod)
namespaceManager.Run()
if s.EnablePVCClaimBinder {
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
}
select {}
return nil
}

View File

@ -12,7 +12,7 @@ A `PersistentVolumeClaim` (PVC) is a user's request for a persistent volume to u
One new system component:
`PersistentVolumeManager` is a singleton running in master that manages all PVs in the system, analogous to the node controller. The volume manager watches the API for newly created volumes to manage. The manager also watches for claims by users and binds them to available volumes.
`PersistentVolumeClaimBinder` is a singleton running in master that watches all PersistentVolumeClaims in the system and binds them to the closest matching available PersistentVolume. The volume manager watches the API for newly created volumes to manage.
One new volume:
@ -32,7 +32,7 @@ Kubernetes makes no guarantees at runtime that the underlying storage exists or
#### Describe available storage
Cluster administrators use the API to manage *PersistentVolumes*. The singleton PersistentVolumeManager watches the Kubernetes API for new volumes and adds them to its internal cache of volumes in the system. All persistent volumes are managed and made available by the volume manager. The manager also watches for new claims for storage and binds them to an available volume by matching the volume's characteristics (AccessModes and storage size) to the user's request.
Cluster administrators use the API to manage *PersistentVolumes*. A custom store ```NewPersistentVolumeOrderedIndex``` will index volumes by access modes and sort by storage capacity. The ```PersistentVolumeClaimBinder``` watches for new claims for storage and binds them to an available volume by matching the volume's characteristics (AccessModes and storage size) to the user's request.
PVs are system objects and, thus, have no namespace.
@ -151,7 +151,7 @@ myclaim-1 map[] pending
#### Matching and binding
The ```PersistentVolumeManager``` attempts to find an available volume that most closely matches the user's request. If one exists, they are bound by putting a reference on the PV to the PVC. Requests can go unfulfilled if a suitable match is not found.
The ```PersistentVolumeClaimBinder``` attempts to find an available volume that most closely matches the user's request. If one exists, they are bound by putting a reference on the PV to the PVC. Requests can go unfulfilled if a suitable match is not found.
```
@ -209,6 +209,6 @@ cluster/kubectl.sh delete pvc myclaim-1
```
The ```PersistentVolumeManager``` will reconcile this by removing the claim reference from the PV and change the PVs status to 'Released'.
The ```PersistentVolumeClaimBinder``` will reconcile this by removing the claim reference from the PV and change the PVs status to 'Released'.
Admins can script the recycling of released volumes. Future dynamic provisioners will understand how a volume should be recycled.

View File

@ -7,4 +7,4 @@ spec:
- ReadWriteOnce
resources:
requests:
storage: 3
storage: 3Gi

View File

@ -7,4 +7,4 @@ spec:
- ReadWriteOnce
resources:
requests:
storage: 8
storage: 8Gi

View File

@ -1,69 +1,112 @@
# How To Use Persistent Volumes
This guide assumes knowledge of Kubernetes fundamentals and that a user has a cluster up and running.
The purpose of this guide is to help you become familiar with Kubernetes Persistent Volumes. By the end of the guide, we'll have
nginx serving content from your persistent volume.
## Create volumes
This guide assumes knowledge of Kubernetes fundamentals and that you have a cluster up and running.
Persistent Volumes are intended for "network volumes", such as GCE Persistent Disks, NFS shares, and AWS EBS volumes.
## Provisioning
The `HostPath` VolumeSource was included in the Persistent Volumes implementation for ease of testing.
Create persistent volumes by posting them to the API server:
A PersistentVolume in Kubernetes represents a real piece of underlying storage capacity in the infrastructure. Cluster administrators
must first create storage (create their GCE disks, export their NFS shares, etc.) in order for Kubernetes to mount it.
PVs are intended for "network volumes" like GCE Persistent Disks, NFS shares, and AWS ElasticBlockStore volumes. ```HostPath``` was included
for ease of development and testing. You'll create a local ```HostPath``` for this example.
> IMPORTANT! For ```HostPath``` to work, you will need to run a single node cluster. Kubernetes does not
support local storage on the host at this time. There is no guarantee your pod ends up on the correct node where the ```HostPath``` resides.
```
// this will be nginx's webroot
mkdir /tmp/data01
echo 'I love Kubernetes storage!' > /tmp/data01/index.html
```
PVs are created by posting them to the API server.
```
cluster/kubectl.sh create -f examples/persistent-volumes/volumes/local-01.yaml
cluster/kubectl.sh create -f examples/persistent-volumes/volumes/local-02.yaml
cluster/kubectl.sh get pv
NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM
pv0001 map[] 10737418240 RWO
pv0002 map[] 5368709120 RWO
In the log:
I0302 10:20:45.663225 1920 persistent_volume_manager.go:115] Managing PersistentVolume[UID=b16e91d6-c0ef-11e4-8be4-80e6500a981e]
I0302 10:20:55.667945 1920 persistent_volume_manager.go:115] Managing PersistentVolume[UID=b41f4f0e-c0ef-11e4-8be4-80e6500a981e]
NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM
pv0001 map[] 10737418240 RWO Available
```
## Create claims
## Requesting storage
You must be in a namespace to create claims.
Users of Kubernetes request persistent storage for their pods. They don't know how the underlying cluster is provisioned.
They just know they can rely on their claim to storage and can manage its lifecycle independently from the many pods that may use it.
Claims must be created in the same namespace as the pods that use them.
```
cluster/kubectl.sh create -f examples/persistent-volumes/claims/claim-01.yaml
cluster/kubectl.sh create -f examples/persistent-volumes/claims/claim-02.yaml
cluster/kubectl.sh get pvc
NAME LABELS STATUS VOLUME
myclaim-1 map[]
myclaim-2 map[]
# A background process will attempt to match this claim to a volume.
# The eventual state of your claim will look something like this:
```
## Matching and binding
```
PersistentVolumeClaim[UID=f4b3d283-c0ef-11e4-8be4-80e6500a981e] bound to PersistentVolume[UID=b16e91d6-c0ef-11e4-8be4-80e6500a981e]
cluster/kubectl.sh get pvc
NAME LABELS STATUS VOLUME
myclaim-1 map[] Bound f5c3a89a-e50a-11e4-972f-80e6500a981e
cluster/kubectl.sh get pv
NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM
pv0001 map[] 10737418240 RWO myclaim-1 / f4b3d283-c0ef-11e4-8be4-80e6500a981e
pv0002 map[] 5368709120 RWO myclaim-2 / f70da891-c0ef-11e4-8be4-80e6500a981e
cluster/kubectl.sh get pvc
NAME LABELS STATUS VOLUME
myclaim-1 map[] b16e91d6-c0ef-11e4-8be4-80e6500a981e
myclaim-2 map[] b41f4f0e-c0ef-11e4-8be4-80e6500a981e
NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM
pv0001 map[] 10737418240 RWO Bound myclaim-1 / 6bef4c40-e50b-11e4-972f-80e6500a981e
```
## Using your claim as a volume
Claims are used as volumes in pods. Kubernetes uses the claim to look up its bound PV. The PV is then exposed to the pod.
```
cluster/kubectl.sh create -f examples/persistent-volumes/simpletest/pod.yaml
cluster/kubectl.sh get pods
POD IP CONTAINER(S) IMAGE(S) HOST LABELS STATUS CREATED
mypod 172.17.0.2 myfrontend nginx 127.0.0.1/127.0.0.1 <none> Running 12 minutes
cluster/kubectl.sh create -f examples/persistent-volumes/simpletest/service.json
cluster/kubectl.sh get services
NAME LABELS SELECTOR IP PORT(S)
frontendservice <none> name=frontendhttp 10.0.0.241 3000/TCP
kubernetes component=apiserver,provider=kubernetes <none> 10.0.0.2 443/TCP
kubernetes-ro component=apiserver,provider=kubernetes <none> 10.0.0.1 80/TCP
```
## Next steps
You should be able to query your service endpoint and see what content nginx is serving. A "forbidden" error might mean you
need to disable SELinux (setenforce 0).
```
curl 10.0.0.241:3000
I love Kubernetes storage!
```
Hopefully this simple guide is enough to get you started with PersistentVolumes. If you have any questions, join
```#google-containers``` on IRC and ask!
Enjoy!

View File

@ -2,17 +2,19 @@ kind: Pod
apiVersion: v1beta3
metadata:
name: mypod
labels:
name: frontendhttp
spec:
containers:
- image: nginx
name: myfrontend
- name: myfrontend
image: dockerfile/nginx
ports:
- containerPort: 80
name: "http-server"
volumeMounts:
- mountPath: "/var/www/html"
name: mypd
volumes:
- name: mypd
source:
persistentVolumeClaim:
accessMode: ReadWriteOnce
claimRef:
name: myclaim-1
persistentVolumeClaim:
claimName: myclaim-1

View File

@ -0,0 +1,9 @@
{
"apiVersion": "v1beta1",
"kind": "Service",
"id": "frontendservice",
"port": 3000,
"containerPort": "http-server",
"selector": { "name": "frontendhttp" },
"createExternalLoadBalancer": false
}

View File

@ -4,7 +4,10 @@ metadata:
name: pv0003
spec:
capacity:
storage: 10
storage: 10Gi
accessModes:
- ReadWriteOnce
ReadOnlyMany
gcePersistentDisk:
pdName: "abc123"
fsType: "ext4"

View File

@ -7,5 +7,7 @@ metadata:
spec:
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: "/tmp/data01"

View File

@ -7,5 +7,7 @@ metadata:
spec:
capacity:
storage: 5Gi
accessModes:
- ReadWriteOnce
hostPath:
path: "/tmp/data02"

View File

@ -215,6 +215,16 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
c.FuzzNoCustom(s) // fuzz self without calling this function again
s.Type = api.SecretTypeOpaque
},
func(pv *api.PersistentVolume, c fuzz.Continue) {
c.FuzzNoCustom(pv) // fuzz self without calling this function again
types := []api.PersistentVolumePhase{api.VolumePending, api.VolumeBound, api.VolumeReleased, api.VolumeAvailable}
pv.Status.Phase = types[c.Rand.Intn(len(types))]
},
func(pvc *api.PersistentVolumeClaim, c fuzz.Continue) {
c.FuzzNoCustom(pvc) // fuzz self without calling this function again
types := []api.PersistentVolumeClaimPhase{api.ClaimBound, api.ClaimPending}
pvc.Status.Phase = types[c.Rand.Intn(len(types))]
},
func(s *api.NamespaceSpec, c fuzz.Continue) {
s.Finalizers = []api.FinalizerName{api.FinalizerKubernetes}
},

View File

@ -317,6 +317,8 @@ const (
type PersistentVolumePhase string
const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound

View File

@ -111,6 +111,16 @@ func init() {
obj.Type = SecretTypeOpaque
}
},
func(obj *PersistentVolume) {
if obj.Status.Phase == "" {
obj.Status.Phase = VolumePending
}
},
func(obj *PersistentVolumeClaim) {
if obj.Status.Phase == "" {
obj.Status.Phase = ClaimPending
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
obj.Protocol = ProtocolTCP

View File

@ -164,6 +164,26 @@ func TestSetDefaultSecret(t *testing.T) {
}
}
func TestSetDefaultPersistentVolume(t *testing.T) {
pv := &current.PersistentVolume{}
obj2 := roundTrip(t, runtime.Object(pv))
pv2 := obj2.(*current.PersistentVolume)
if pv2.Status.Phase != current.VolumePending {
t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase)
}
}
func TestSetDefaultPersistentVolumeClaim(t *testing.T) {
pvc := &current.PersistentVolumeClaim{}
obj2 := roundTrip(t, runtime.Object(pvc))
pvc2 := obj2.(*current.PersistentVolumeClaim)
if pvc2.Status.Phase != current.ClaimPending {
t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase)
}
}
// Test that we use "legacy" fields if "modern" fields are not provided.
func TestSetDefaulEndpointsLegacy(t *testing.T) {
in := &current.Endpoints{

View File

@ -229,6 +229,8 @@ const (
type PersistentVolumePhase string
const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound

View File

@ -112,6 +112,16 @@ func init() {
obj.Type = SecretTypeOpaque
}
},
func(obj *PersistentVolume) {
if obj.Status.Phase == "" {
obj.Status.Phase = VolumePending
}
},
func(obj *PersistentVolumeClaim) {
if obj.Status.Phase == "" {
obj.Status.Phase = ClaimPending
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
obj.Protocol = ProtocolTCP

View File

@ -154,6 +154,26 @@ func TestSetDefaultService(t *testing.T) {
}
}
func TestSetDefaultPersistentVolume(t *testing.T) {
pv := &current.PersistentVolume{}
obj2 := roundTrip(t, runtime.Object(pv))
pv2 := obj2.(*current.PersistentVolume)
if pv2.Status.Phase != current.VolumePending {
t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase)
}
}
func TestSetDefaultPersistentVolumeClaim(t *testing.T) {
pvc := &current.PersistentVolumeClaim{}
obj2 := roundTrip(t, runtime.Object(pvc))
pvc2 := obj2.(*current.PersistentVolumeClaim)
if pvc2.Status.Phase != current.ClaimPending {
t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase)
}
}
func TestSetDefaultSecret(t *testing.T) {
s := &current.Secret{}
obj2 := roundTrip(t, runtime.Object(s))

View File

@ -198,6 +198,8 @@ const (
type PersistentVolumePhase string
const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound

View File

@ -102,6 +102,16 @@ func init() {
obj.Type = SecretTypeOpaque
}
},
func(obj *PersistentVolume) {
if obj.Status.Phase == "" {
obj.Status.Phase = VolumePending
}
},
func(obj *PersistentVolumeClaim) {
if obj.Status.Phase == "" {
obj.Status.Phase = ClaimPending
}
},
func(obj *Endpoints) {
for i := range obj.Subsets {
ss := &obj.Subsets[i]

View File

@ -174,6 +174,26 @@ func TestSetDefaultSecret(t *testing.T) {
}
}
func TestSetDefaultPersistentVolume(t *testing.T) {
pv := &current.PersistentVolume{}
obj2 := roundTrip(t, runtime.Object(pv))
pv2 := obj2.(*current.PersistentVolume)
if pv2.Status.Phase != current.VolumePending {
t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase)
}
}
func TestSetDefaultPersistentVolumeClaim(t *testing.T) {
pvc := &current.PersistentVolumeClaim{}
obj2 := roundTrip(t, runtime.Object(pvc))
pvc2 := obj2.(*current.PersistentVolumeClaim)
if pvc2.Status.Phase != current.ClaimPending {
t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{Subsets: []current.EndpointSubset{
{Ports: []current.EndpointPort{{}, {Protocol: "UDP"}, {}}},

View File

@ -334,6 +334,8 @@ const (
type PersistentVolumePhase string
const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound

View File

@ -319,6 +319,11 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList {
numVolumes++
allErrs = append(allErrs, validateGlusterfs(source.Glusterfs).Prefix("glusterfs")...)
}
if source.PersistentVolumeClaimVolumeSource != nil {
numVolumes++
allErrs = append(allErrs, validatePersistentClaimVolumeSource(source.PersistentVolumeClaimVolumeSource).Prefix("persistentVolumeClaim")...)
}
if numVolumes != 1 {
allErrs = append(allErrs, errs.NewFieldInvalid("", source, "exactly 1 volume type is required"))
}
@ -394,6 +399,14 @@ func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.Valid
return allErrs
}
func validatePersistentClaimVolumeSource(claim *api.PersistentVolumeClaimVolumeSource) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
if claim.ClaimName == "" {
allErrs = append(allErrs, errs.NewFieldRequired("claimName"))
}
return allErrs
}
func validateNFS(nfs *api.NFSVolumeSource) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
if nfs.Server == "" {
@ -448,6 +461,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList
numVolumes++
allErrs = append(allErrs, validateAWSElasticBlockStoreVolumeSource(pv.Spec.AWSElasticBlockStore).Prefix("awsElasticBlockStore")...)
}
if pv.Spec.Glusterfs != nil {
numVolumes++
allErrs = append(allErrs, validateGlusterfs(pv.Spec.Glusterfs).Prefix("glusterfs")...)
}
if numVolumes != 1 {
allErrs = append(allErrs, errs.NewFieldInvalid("", pv.Spec.PersistentVolumeSource, "exactly 1 volume type is required"))
}

View File

@ -58,6 +58,9 @@ func validNewPersistentVolume(name string) *api.PersistentVolume {
HostPath: &api.HostPathVolumeSource{Path: "/foo"},
},
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumePending,
},
}
return pv
}

View File

@ -59,6 +59,9 @@ func validNewPersistentVolumeClaim(name, ns string) *api.PersistentVolumeClaim {
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimPending,
},
}
return pv
}

View File

@ -0,0 +1,365 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims.
type PersistentVolumeClaimBinder struct {
volumeIndex *persistentVolumeOrderedIndex
volumeController *framework.Controller
claimController *framework.Controller
client binderClient
stopChannels map[string]chan struct{}
lock sync.RWMutex
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient)
binder := &PersistentVolumeClaimBinder{
volumeIndex: volumeIndex,
client: binderClient,
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addVolume,
UpdateFunc: binder.updateVolume,
DeleteFunc: binder.deleteVolume,
},
)
_, claimController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolumeClaim{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,
UpdateFunc: binder.updateClaim,
// no DeleteFunc needed. a claim requires no clean-up.
// the missing claim itself is the release of the resource.
},
)
binder.claimController = claimController
binder.volumeController = volumeController
return binder
}
func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume := obj.(*api.PersistentVolume)
syncVolume(binder.volumeIndex, binder.client, volume)
}
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newVolume := newObj.(*api.PersistentVolume)
binder.volumeIndex.Update(newVolume)
syncVolume(binder.volumeIndex, binder.client, newVolume)
}
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume := obj.(*api.PersistentVolume)
binder.volumeIndex.Delete(volume)
}
func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
claim := obj.(*api.PersistentVolumeClaim)
syncClaim(binder.volumeIndex, binder.client, claim)
}
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newClaim := newObj.(*api.PersistentVolumeClaim)
syncClaim(binder.volumeIndex, binder.client, newClaim)
}
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name)
// volumes can be in one of the following states:
//
// VolumePending -- default value -- not bound to a claim and not yet processed through this controller.
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
currentPhase := volume.Status.Phase
nextPhase := currentPhase
switch currentPhase {
// pending volumes are available only after indexing in order to be matched to claims.
case api.VolumePending:
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name)
nextPhase = api.VolumeAvailable
// available volumes await a claim
case api.VolumeAvailable:
if volume.Spec.ClaimRef != nil {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
// change of phase will trigger an update event with the newly bound volume
glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name)
nextPhase = api.VolumeBound
} else {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
}
}
}
//bound volumes require verification of their bound claims
case api.VolumeBound:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
} else {
claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
// bound and active. Build claim status as needed.
if claim.Status.VolumeRef == nil {
// syncClaimStatus sets VolumeRef, attempts to persist claim status,
// and does a rollback as needed on claim.Status
syncClaimStatus(binderClient, volume, claim)
}
} else {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
} else {
return err
}
}
}
// released volumes require recycling
case api.VolumeReleased:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
} else {
// TODO: implement Recycle method on plugins
}
}
if currentPhase != nextPhase {
volume.Status.Phase = nextPhase
// a change in state will trigger another update through this controller.
// each pass through this controller evaluates current phase and decides whether or not to change to the next phase
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
// Rollback to previous phase
volume.Status.Phase = currentPhase
}
volumeIndex.Update(volume)
}
return nil
}
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
// claims can be in one of the following states:
//
// ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist.
// ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil
currentPhase := claim.Status.Phase
nextPhase := currentPhase
switch currentPhase {
// pending claims await a matching volume
case api.ClaimPending:
volume, err := volumeIndex.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name)
}
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// make a binding reference to the claim.
// triggers update of the volume in this controller, which builds claim status
volume.Spec.ClaimRef = claimRef
volume, err = binderClient.UpdatePersistentVolume(volume)
if err == nil {
nextPhase = api.ClaimBound
}
if err != nil {
// Rollback by unsetting the ClaimRef on the volume pointer.
// the volume in the index will be unbound again and ready to be matched.
volume.Spec.ClaimRef = nil
// Rollback by restoring original phase to claim pointer
nextPhase = api.ClaimPending
return fmt.Errorf("Error updating volume: %+v\n", err)
}
// bound claims requires no maintenance. Deletion by the user is the last lifecycle phase.
case api.ClaimBound:
// This is the end of a claim's lifecycle.
// After claim deletion, a volume is recycled when it verifies its claim is unbound
glog.V(5).Infof("PersistentVolumeClaime[%s] is bound\n", claim.Name)
}
if currentPhase != nextPhase {
claim.Status.Phase = nextPhase
binderClient.UpdatePersistentVolumeClaimStatus(claim)
}
return nil
}
func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) (err error) {
volumeRef, err := api.GetReference(volume)
if err != nil {
return fmt.Errorf("Unexpected error getting volume reference: %v\n", err)
}
// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim
claim.Status.Phase = api.ClaimBound
claim.Status.VolumeRef = volumeRef
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
claim.Status.Phase = api.ClaimPending
claim.Status.VolumeRef = nil
claim.Status.AccessModes = nil
claim.Status.Capacity = nil
}
return err
}
// Run starts all of this binder's control loops
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
if controller.stopChannels == nil {
controller.stopChannels = make(map[string]chan struct{})
}
if _, exists := controller.stopChannels["volumes"]; !exists {
controller.stopChannels["volumes"] = make(chan struct{})
go controller.volumeController.Run(controller.stopChannels["volumes"])
}
if _, exists := controller.stopChannels["claims"]; !exists {
controller.stopChannels["claims"] = make(chan struct{})
go controller.claimController.Run(controller.stopChannels["claims"])
}
}
// Stop gracefully shuts down this binder
func (controller *PersistentVolumeClaimBinder) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n")
for name, stopChan := range controller.stopChannels {
close(stopChan)
delete(controller.stopChannels, name)
}
}
// binderClient abstracts access to PVs and PVCs
type binderClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
}
func NewBinderClient(c client.Interface) binderClient {
return &realBinderClient{c}
}
type realBinderClient struct {
client client.Interface
}
func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
}
func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}

View File

@ -0,0 +1,277 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
)
func TestRunStop(t *testing.T) {
o := testclient.NewObjects(api.Scheme)
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
binder := NewPersistentVolumeClaimBinder(client, 1*time.Second)
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
}
binder.Run()
if len(binder.stopChannels) != 2 {
t.Errorf("Running binder should have exactly 2 stopChannels. Got %v", len(binder.stopChannels))
}
binder.Stop()
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
}
}
func TestExampleObjects(t *testing.T) {
scenarios := map[string]struct {
expected interface{}
}{
"claims/claim-01.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
},
},
"claims/claim-02.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
},
},
},
},
"volumes/local-01.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data01",
},
},
},
},
},
"volumes/local-02.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
},
},
},
}
for name, scenario := range scenarios {
o := testclient.NewObjects(api.Scheme)
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/"+name, o); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) {
pvc, err := client.PersistentVolumeClaims("ns").Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolumeClaim)
if pvc.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pvc.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pvc.Spec.Resources.Requests[api.ResourceStorage]
bQty := expected.Spec.Resources.Requests[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) {
pv, err := client.PersistentVolumes().Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolume)
if pv.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pv.Spec.Capacity[api.ResourceStorage]
bQty := expected.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
if pv.Spec.HostPath.Path != expected.Spec.HostPath.Path {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.HostPath.Path, expected.Spec.HostPath.Path)
}
}
}
}
func TestBindingWithExamples(t *testing.T) {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
o := testclient.NewObjects(api.Scheme)
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/claims/claim-01.yaml", o); err != nil {
t.Fatal(err)
}
if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/volumes/local-01.yaml", o); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
pv, err := client.PersistentVolumes().Get("any")
if err != nil {
t.Error("Unexpected error getting PV from client: %v", err)
}
claim, error := client.PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Error("Unexpected error getting PVC from client: %v", err)
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
volumeIndex.Add(pv)
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if pv.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef but got %+v\n", pv.Spec.ClaimRef)
}
syncClaim(volumeIndex, mockClient, claim)
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv)
}
// first sync verifies the new bound claim, advances state, triggering update
syncVolume(volumeIndex, mockClient, pv)
// second sync verifies claim, sees missing claim status and builds it
syncVolume(volumeIndex, mockClient, pv)
if claim.Status.VolumeRef == nil {
t.Fatalf("Expected claim to be bound to volume")
}
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0])
}
// pretend the user deleted their claim
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
}
}
type mockBinderClient struct {
volume *api.PersistentVolume
claim *api.PersistentVolumeClaim
}
func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.volume, nil
}
func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
if c.claim != nil {
return c.claim, nil
} else {
return nil, errors.NewNotFound("persistentVolume", name)
}
}
func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}

View File

@ -0,0 +1,269 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
)
func TestMatchVolume(t *testing.T) {
volList := NewPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
volList.Add(pv)
}
scenarios := map[string]struct {
expectedMatch string
claim *api.PersistentVolumeClaim
}{
"successful-match-gce-10": {
expectedMatch: "gce-pd-10",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
},
},
},
},
},
"successful-match-nfs-5": {
expectedMatch: "nfs-5",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
},
},
},
},
"successful-skip-1g-bound-volume": {
expectedMatch: "gce-pd-5",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
},
},
},
},
"successful-no-match": {
expectedMatch: "",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("999G"),
},
},
},
},
},
}
for name, scenario := range scenarios {
volume, err := volList.FindBestMatchForClaim(scenario.claim)
if err != nil {
t.Errorf("Unexpected error matching volume by claim: %v", err)
}
if scenario.expectedMatch != "" && volume == nil {
t.Errorf("Expected match but received nil volume for scenario: %s", name)
}
if scenario.expectedMatch != "" && volume != nil && string(volume.UID) != scenario.expectedMatch {
t.Errorf("Expected %s but got volume %s instead", scenario.expectedMatch, volume.UID)
}
if scenario.expectedMatch == "" && volume != nil {
t.Errorf("Unexpected match for scenario: %s", name)
}
}
}
func TestSort(t *testing.T) {
volList := NewPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
volList.Add(pv)
}
volumes, err := volList.ListByAccessModes([]api.AccessModeType{api.ReadWriteOnce, api.ReadOnlyMany})
if err != nil {
t.Error("Unexpected error retrieving volumes by access modes:", err)
}
for i, expected := range []string{"gce-pd-1", "gce-pd-5", "gce-pd-10"} {
if string(volumes[i].UID) != expected {
t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID)
}
}
volumes, err = volList.ListByAccessModes([]api.AccessModeType{api.ReadWriteOnce, api.ReadOnlyMany, api.ReadWriteMany})
if err != nil {
t.Error("Unexpected error retrieving volumes by access modes:", err)
}
for i, expected := range []string{"nfs-1", "nfs-5", "nfs-10"} {
if string(volumes[i].UID) != expected {
t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID)
}
}
}
func createTestVolumes() []*api.PersistentVolume {
// these volumes are deliberately out-of-order to test indexing and sorting
return []*api.PersistentVolume{
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-10",
Name: "gce003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-5",
Name: "nfs002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-1",
Name: "gce001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
// this one we're pretending is already bound
ClaimRef: &api.ObjectReference{UID: "abc123"},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-10",
Name: "nfs003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-5",
Name: "gce002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-1",
Name: "nfs001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
}
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"sort"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
)
// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity.
type persistentVolumeOrderedIndex struct {
cache.Indexer
}
var _ cache.Store = &persistentVolumeOrderedIndex{} // persistentVolumeOrderedIndex is a Store
func NewPersistentVolumeOrderedIndex() *persistentVolumeOrderedIndex {
return &persistentVolumeOrderedIndex{
cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc}),
}
}
// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string
func accessModesIndexFunc(obj interface{}) (string, error) {
if pv, ok := obj.(*api.PersistentVolume); ok {
modes := volume.GetAccessModesAsString(pv.Spec.AccessModes)
return modes, nil
}
return "", fmt.Errorf("object is not a persistent volume: %v", obj)
}
// ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high)
func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.AccessModeType) ([]*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
},
}
objs, err := pvIndex.Index("accessmodes", pv)
if err != nil {
return nil, err
}
volumes := make([]*api.PersistentVolume, len(objs))
for i, obj := range objs {
volumes[i] = obj.(*api.PersistentVolume)
}
sort.Sort(byCapacity{volumes})
return volumes, nil
}
// matchPredicate is a function that indicates that a persistent volume matches another
type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool
// Find returns the nearest PV from the ordered list or nil if a match is not found
func (pvIndex *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
volumes, err := pvIndex.ListByAccessModes(pv.Spec.AccessModes)
if err != nil {
return nil, err
}
i := sort.Search(len(volumes), func(i int) bool { return matchPredicate(pv, volumes[i]) })
if i < len(volumes) {
return volumes[i], nil
}
return nil, nil
}
// FindByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage
func (pvIndex *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity(modes []api.AccessModeType, qty resource.Quantity) (*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): qty,
},
},
}
return pvIndex.Find(pv, filterBoundVolumes)
}
// FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
return pvIndex.FindByAccessModesAndStorageCapacity(claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)])
}
// byCapacity is used to order volumes by ascending storage size
type byCapacity struct {
volumes []*api.PersistentVolume
}
func (c byCapacity) Less(i, j int) bool {
return matchStorageCapacity(c.volumes[i], c.volumes[j])
}
func (c byCapacity) Swap(i, j int) {
c.volumes[i], c.volumes[j] = c.volumes[j], c.volumes[i]
}
func (c byCapacity) Len() int {
return len(c.volumes)
}
// matchStorageCapacity is a matchPredicate used to sort and find volumes
func matchStorageCapacity(pvA, pvB *api.PersistentVolume) bool {
// skip already claimed volumes
if pvA.Spec.ClaimRef != nil {
return false
}
aQty := pvA.Spec.Capacity[api.ResourceStorage]
bQty := pvB.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
return aSize <= bSize
}
// filterBoundVolumes is a matchPredicate that filters bound volumes before comparing storage capacity
func filterBoundVolumes(compareThis, toThis *api.PersistentVolume) bool {
if compareThis.Spec.ClaimRef != nil || toThis.Spec.ClaimRef != nil {
return false
}
return matchStorageCapacity(compareThis, toThis)
}

View File

@ -0,0 +1,207 @@
// +build integration,!no-etcd
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder"
)
func init() {
requireEtcd()
}
func TestPersistentVolumeClaimBinder(t *testing.T) {
_, s := runAMaster(t)
defer s.Close()
deleteAllEtcdKeys()
client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()})
binder := volumeclaimbinder.NewPersistentVolumeClaimBinder(client, 1*time.Second)
binder.Run()
defer binder.Stop()
for _, volume := range createTestVolumes() {
_, err := client.PersistentVolumes().Create(volume)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
volumes, err := client.PersistentVolumes().List(labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(volumes.Items) != 2 {
t.Errorf("expected 2 PVs, got %#v", len(volumes.Items))
}
for _, claim := range createTestClaims() {
_, err := client.PersistentVolumeClaims(api.NamespaceDefault).Create(claim)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
claims, err := client.PersistentVolumeClaims(api.NamespaceDefault).List(labels.Everything(), fields.Everything())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(claims.Items) != 3 {
t.Errorf("expected 3 PVCs, got %#v", len(claims.Items))
}
// the binder will eventually catch up and set status on Claims
watch, err := client.PersistentVolumeClaims(api.NamespaceDefault).Watch(labels.Everything(), fields.Everything(), "0")
if err != nil {
t.Fatalf("Couldn't subscribe to PersistentVolumeClaims: %v", err)
}
defer watch.Stop()
boundCount := 0
expectedBoundCount := 2
for {
event := <-watch.ResultChan()
claim := event.Object.(*api.PersistentVolumeClaim)
if claim.Status.VolumeRef != nil {
boundCount++
}
if boundCount == expectedBoundCount {
break
}
}
for _, claim := range createTestClaims() {
claim, err := client.PersistentVolumeClaims(api.NamespaceDefault).Get(claim.Name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if (claim.Name == "claim01" || claim.Name == "claim02") && claim.Status.VolumeRef == nil {
t.Errorf("Expected claim to be bound: %+v", claim)
}
if claim.Name == "claim03" && claim.Status.VolumeRef != nil {
t.Errorf("Expected claim03 to be unbound: %v", claim)
}
}
}
func createTestClaims() []*api.PersistentVolumeClaim {
return []*api.PersistentVolumeClaim{
{
ObjectMeta: api.ObjectMeta{
Name: "claim03",
Namespace: api.NamespaceDefault,
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("500G"),
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: api.NamespaceDefault,
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "claim02",
Namespace: api.NamespaceDefault,
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
},
},
},
}
}
func createTestVolumes() []*api.PersistentVolume {
return []*api.PersistentVolume{
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-10",
Name: "gce003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "gce123123123",
FSType: "foo",
},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-5",
Name: "nfs002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{
EndpointsName: "andintheend",
Path: "theloveyoutakeisequaltotheloveyoumake",
},
},
AccessModes: []api.AccessModeType{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
}
}