From 242567460db0627b7800969ead29c6a755397cea Mon Sep 17 00:00:00 2001 From: markturansky Date: Tue, 14 Apr 2015 17:14:39 -0400 Subject: [PATCH 01/12] PersistentVolumeClaimBinder implementation --- docs/design/persistent-storage.md | 8 +- .../persistent-volumes/claims/claim-01.yaml | 2 +- .../persistent-volumes/claims/claim-02.yaml | 2 +- examples/persistent-volumes/volumes/gce.yaml | 5 +- .../persistent-volumes/volumes/local-01.yaml | 2 + .../persistent-volumes/volumes/local-02.yaml | 2 + .../persistent_volume_claim_binder.go | 216 ++++++++++++++ .../persistent_volume_claim_binder_test.go | 200 +++++++++++++ .../persistent_volume_index_test.go | 268 ++++++++++++++++++ pkg/volumeclaimbinder/types.go | 147 ++++++++++ 10 files changed, 845 insertions(+), 7 deletions(-) create mode 100644 pkg/volumeclaimbinder/persistent_volume_claim_binder.go create mode 100644 pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go create mode 100644 pkg/volumeclaimbinder/persistent_volume_index_test.go create mode 100644 pkg/volumeclaimbinder/types.go diff --git a/docs/design/persistent-storage.md b/docs/design/persistent-storage.md index 45ab8d4284a..fb53ad1030f 100644 --- a/docs/design/persistent-storage.md +++ b/docs/design/persistent-storage.md @@ -12,7 +12,7 @@ A `PersistentVolumeClaim` (PVC) is a user's request for a persistent volume to u One new system component: -`PersistentVolumeManager` is a singleton running in master that manages all PVs in the system, analogous to the node controller. The volume manager watches the API for newly created volumes to manage. The manager also watches for claims by users and binds them to available volumes. +`PersistentVolumeClaimBinder` is a singleton running in master that watches all PersistentVolumeClaims in the system and binds them to the closest matching available PersistentVolume. The volume manager watches the API for newly created volumes to manage. One new volume: @@ -32,7 +32,7 @@ Kubernetes makes no guarantees at runtime that the underlying storage exists or #### Describe available storage -Cluster administrators use the API to manage *PersistentVolumes*. The singleton PersistentVolumeManager watches the Kubernetes API for new volumes and adds them to its internal cache of volumes in the system. All persistent volumes are managed and made available by the volume manager. The manager also watches for new claims for storage and binds them to an available volume by matching the volume's characteristics (AccessModes and storage size) to the user's request. +Cluster administrators use the API to manage *PersistentVolumes*. A custom store ```NewPersistentVolumeOrderedIndex``` will index volumes by access modes and sort by storage capacity. The ```PersistentVolumeClaimBinder``` watches for new claims for storage and binds them to an available volume by matching the volume's characteristics (AccessModes and storage size) to the user's request. PVs are system objects and, thus, have no namespace. @@ -151,7 +151,7 @@ myclaim-1 map[] pending #### Matching and binding - The ```PersistentVolumeManager``` attempts to find an available volume that most closely matches the user's request. If one exists, they are bound by putting a reference on the PV to the PVC. Requests can go unfulfilled if a suitable match is not found. + The ```PersistentVolumeClaimBinder``` attempts to find an available volume that most closely matches the user's request. If one exists, they are bound by putting a reference on the PV to the PVC. Requests can go unfulfilled if a suitable match is not found. ``` @@ -209,6 +209,6 @@ cluster/kubectl.sh delete pvc myclaim-1 ``` -The ```PersistentVolumeManager``` will reconcile this by removing the claim reference from the PV and change the PVs status to 'Released'. +The ```PersistentVolumeClaimBinder``` will reconcile this by removing the claim reference from the PV and change the PVs status to 'Released'. Admins can script the recycling of released volumes. Future dynamic provisioners will understand how a volume should be recycled. diff --git a/examples/persistent-volumes/claims/claim-01.yaml b/examples/persistent-volumes/claims/claim-01.yaml index cb0a2abb342..3c69d2e1b56 100644 --- a/examples/persistent-volumes/claims/claim-01.yaml +++ b/examples/persistent-volumes/claims/claim-01.yaml @@ -7,4 +7,4 @@ spec: - ReadWriteOnce resources: requests: - storage: 3 + storage: 3Gi diff --git a/examples/persistent-volumes/claims/claim-02.yaml b/examples/persistent-volumes/claims/claim-02.yaml index 134ae4cf972..48d48070b22 100644 --- a/examples/persistent-volumes/claims/claim-02.yaml +++ b/examples/persistent-volumes/claims/claim-02.yaml @@ -7,4 +7,4 @@ spec: - ReadWriteOnce resources: requests: - storage: 8 + storage: 8Gi diff --git a/examples/persistent-volumes/volumes/gce.yaml b/examples/persistent-volumes/volumes/gce.yaml index 3e124c59c9f..da3d56ad64f 100644 --- a/examples/persistent-volumes/volumes/gce.yaml +++ b/examples/persistent-volumes/volumes/gce.yaml @@ -4,7 +4,10 @@ metadata: name: pv0003 spec: capacity: - storage: 10 + storage: 10Gi + accessModes: + - ReadWriteOnce + ReadOnlyMany gcePersistentDisk: pdName: "abc123" fsType: "ext4" diff --git a/examples/persistent-volumes/volumes/local-01.yaml b/examples/persistent-volumes/volumes/local-01.yaml index 105e4393ebb..ce0fe9fbbe2 100644 --- a/examples/persistent-volumes/volumes/local-01.yaml +++ b/examples/persistent-volumes/volumes/local-01.yaml @@ -7,5 +7,7 @@ metadata: spec: capacity: storage: 10Gi + accessModes: + - ReadWriteOnce hostPath: path: "/tmp/data01" diff --git a/examples/persistent-volumes/volumes/local-02.yaml b/examples/persistent-volumes/volumes/local-02.yaml index 1f40d7a03e4..24bca725390 100644 --- a/examples/persistent-volumes/volumes/local-02.yaml +++ b/examples/persistent-volumes/volumes/local-02.yaml @@ -7,5 +7,7 @@ metadata: spec: capacity: storage: 5Gi + accessModes: + - ReadWriteOnce hostPath: path: "/tmp/data02" diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go new file mode 100644 index 00000000000..3109531fbbf --- /dev/null +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -0,0 +1,216 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeclaimbinder + +import ( + "sync" + "time" + + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" +) + +// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims. +type PersistentVolumeClaimBinder struct { + volumeStore *persistentVolumeOrderedIndex + claimStore cache.Store + client client.Interface +} + +// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder +func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolumeClaimBinder { + volumeStore := NewPersistentVolumeOrderedIndex() + volumeReflector := cache.NewReflector( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.PersistentVolume{}, + volumeStore, + 0, + ) + volumeReflector.Run() + + claimStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + claimReflector := cache.NewReflector( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.PersistentVolumeClaim{}, + claimStore, + 0, + ) + claimReflector.Run() + + binder := &PersistentVolumeClaimBinder{ + volumeStore: volumeStore, + claimStore: claimStore, + client: kubeClient, + } + + return binder +} + +func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { + glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") + go util.Forever(func() { controller.synchronize() }, period) +} + +// Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop, +// because we're reconciling two Kinds in this component and I didn't want to dupe the loop +type Synchronizer struct { + ListFunc func() []interface{} + ReconcileFunc func(interface{}) error +} + +func (controller *PersistentVolumeClaimBinder) synchronize() { + volumeSynchronizer := Synchronizer{ + ListFunc: controller.volumeStore.List, + ReconcileFunc: controller.syncPersistentVolume, + } + + claimsSynchronizer := Synchronizer{ + ListFunc: controller.claimStore.List, + ReconcileFunc: controller.syncPersistentVolumeClaim, + } + + controller.reconcile(volumeSynchronizer, claimsSynchronizer) +} + +func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchronizer) { + for _, synchronizer := range synchronizers { + items := synchronizer.ListFunc() + if len(items) == 0 { + continue + } + wg := sync.WaitGroup{} + wg.Add(len(items)) + for ix := range items { + go func(ix int) { + defer wg.Done() + obj := items[ix] + glog.V(5).Infof("Reconciliation of %v", obj) + err := synchronizer.ReconcileFunc(obj) + if err != nil { + glog.Errorf("Error reconciling: %v", err) + } + }(ix) + } + wg.Wait() + } +} + +// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. +func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { + volume := obj.(*api.PersistentVolume) + glog.V(5).Infof("Synchronizing persistent volume: %s\n", volume.Name) + + // verify the volume is still claimed by a user + if volume.Spec.ClaimRef != nil { + if _, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil { + glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) + } else { + //claim was deleted by user. + glog.V(3).Infof("PersistentVolumeClaim[UID=%s] unbound from PersistentVolume[UID=%s]\n", volume.Spec.ClaimRef.UID, volume.UID) + volume.Spec.ClaimRef = nil + volume.Status.Phase = api.VolumeReleased + volume, err = controller.client.PersistentVolumes().Update(volume) + if err != nil { + glog.V(3).Infof("Error updating volume: %+v\n", err) + } + } + } + return nil +} + +func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { + claim := obj.(*api.PersistentVolumeClaim) + glog.V(5).Infof("Synchronizing persistent volume claim: %s\n", claim.Name) + + if claim.Status.VolumeRef != nil { + glog.V(5).Infof("PersistentVolumeClaim[UID=%s] is bound to PersistentVolume[UID=%s]\n", claim.Name, claim.Status.VolumeRef.Name) + return nil + } + + volume, err := controller.volumeStore.FindBestMatchForClaim(claim) + if err != nil { + return err + } + + if volume != nil { + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) + } + + volumeRef, err := api.GetReference(volume) + if err != nil { + return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) + } + + // make a binding reference to the claim + volume.Spec.ClaimRef = claimRef + volume, err = controller.client.PersistentVolumes().Update(volume) + + if err != nil { + glog.V(3).Infof("Error updating volume: %+v\n", err) + } else { + + // all "actuals" are transferred from PV to PVC so the user knows what + // type of volume they actually got for their claim + claim.Status.Phase = api.ClaimBound + claim.Status.VolumeRef = volumeRef + claim.Status.AccessModes = volume.Spec.AccessModes + claim.Status.Capacity = volume.Spec.Capacity + + _, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) + if err != nil { + glog.V(3).Infof("Error updating claim: %+v\n", err) + + // uset ClaimRef on the pointer to make it available for binding again + volume.Spec.ClaimRef = nil + volume, err = controller.client.PersistentVolumes().Update(volume) + + // unset VolumeRef on the pointer so this claim can be processed next sync loop + claim.Status.VolumeRef = nil + } else { + glog.V(2).Infof("PersistentVolumeClaim[UID=%s] bound to PersistentVolume[UID=%s]\n", claim.UID, volume.UID) + } + } + } else { + glog.V(5).Infof("No volume match found for %s\n", claim.UID) + } + + return nil +} diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go new file mode 100644 index 00000000000..05dbe72f1f7 --- /dev/null +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeclaimbinder + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" +) + +func TestExampleObjects(t *testing.T) { + + scenarios := map[string]struct { + expected interface{} + }{ + "claims/claim-01.yaml": { + expected: &api.PersistentVolumeClaim{ + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), + }, + }, + }, + }, + }, + "claims/claim-02.yaml": { + expected: &api.PersistentVolumeClaim{ + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + "volumes/local-01.yaml": { + expected: &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data01", + }, + }, + }, + }, + }, + "volumes/local-02.yaml": { + expected: &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data02", + }, + }, + }, + }, + }, + } + + for name, scenario := range scenarios { + o := testclient.NewObjects(api.Scheme) + if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/"+name, o); err != nil { + t.Fatal(err) + } + + client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)} + + if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) { + pvc, err := client.PersistentVolumeClaims("ns").Get("doesntmatter") + if err != nil { + t.Errorf("Error retrieving object: %v", err) + } + + expected := scenario.expected.(*api.PersistentVolumeClaim) + if pvc.Spec.AccessModes[0] != expected.Spec.AccessModes[0] { + t.Errorf("Unexpected mismatch. Got %v wanted %v", pvc.Spec.AccessModes[0], expected.Spec.AccessModes[0]) + } + + aQty := pvc.Spec.Resources.Requests[api.ResourceStorage] + bQty := expected.Spec.Resources.Requests[api.ResourceStorage] + aSize := aQty.Value() + bSize := bQty.Value() + + if aSize != bSize { + t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize) + } + } + + if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) { + pv, err := client.PersistentVolumes().Get("doesntmatter") + if err != nil { + t.Errorf("Error retrieving object: %v", err) + } + + expected := scenario.expected.(*api.PersistentVolume) + if pv.Spec.AccessModes[0] != expected.Spec.AccessModes[0] { + t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.AccessModes[0], expected.Spec.AccessModes[0]) + } + + aQty := pv.Spec.Capacity[api.ResourceStorage] + bQty := expected.Spec.Capacity[api.ResourceStorage] + aSize := aQty.Value() + bSize := bQty.Value() + + if aSize != bSize { + t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize) + } + + if pv.Spec.HostPath.Path != expected.Spec.HostPath.Path { + t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.HostPath.Path, expected.Spec.HostPath.Path) + } + } + } +} + +func TestBindingWithExamples(t *testing.T) { + + api.ForTesting_ReferencesAllowBlankSelfLinks = true + o := testclient.NewObjects(api.Scheme) + if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/claims/claim-01.yaml", o); err != nil { + t.Fatal(err) + } + if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/volumes/local-01.yaml", o); err != nil { + t.Fatal(err) + } + + client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)} + + pv, err := client.PersistentVolumes().Get("any") + if err != nil { + t.Error("Unexpected error getting PV from client: %v", err) + } + + claim, error := client.PersistentVolumeClaims("ns").Get("any") + if error != nil { + t.Error("Unexpected error getting PVC from client: %v", err) + } + + controller := NewPersistentVolumeClaimBinder(client) + err = controller.volumeStore.Add(pv) + if err != nil { + t.Error("Unexpected error: %v", err) + } + + if _, exists, _ := controller.volumeStore.Get(pv); !exists { + t.Error("Expected to find volume in the index") + } + + err = controller.syncPersistentVolumeClaim(claim) + if err != nil { + t.Error("Unexpected error: %v", err) + } + + if claim.Status.VolumeRef == nil { + t.Error("Expected claim to be bound to volume") + } + + if claim.Status.Phase != api.ClaimBound { + t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) + } + if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) { + t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) + } + if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] { + t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) + } + if claim.Status.Phase != api.ClaimBound { + t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) + } +} diff --git a/pkg/volumeclaimbinder/persistent_volume_index_test.go b/pkg/volumeclaimbinder/persistent_volume_index_test.go new file mode 100644 index 00000000000..4ee00d6d1f1 --- /dev/null +++ b/pkg/volumeclaimbinder/persistent_volume_index_test.go @@ -0,0 +1,268 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeclaimbinder + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "testing" +) + +func TestMatchVolume(t *testing.T) { + volList := NewPersistentVolumeOrderedIndex() + for _, pv := range createTestVolumes() { + volList.Add(pv) + } + + scenarios := map[string]struct { + expectedMatch string + claim *api.PersistentVolumeClaim + }{ + "successful-match-gce-10": { + expectedMatch: "gce-pd-10", + claim: &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "claim01", + Namespace: "myns", + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("8G"), + }, + }, + }, + }, + }, + "successful-match-nfs-5": { + expectedMatch: "nfs-5", + claim: &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "claim01", + Namespace: "myns", + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + }, + }, + }, + }, + "successful-skip-1g-bound-volume": { + expectedMatch: "gce-pd-5", + claim: &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "claim01", + Namespace: "myns", + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("1G"), + }, + }, + }, + }, + }, + "successful-no-match": { + expectedMatch: "", + claim: &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "claim01", + Namespace: "myns", + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("999G"), + }, + }, + }, + }, + }, + } + + for name, scenario := range scenarios { + volume, err := volList.FindBestMatchForClaim(scenario.claim) + if err != nil { + t.Errorf("Unexpected error matching volume by claim: %v", err) + } + if scenario.expectedMatch != "" && volume == nil { + t.Errorf("Expected match but received nil volume for scenario: %s", name) + } + if scenario.expectedMatch != "" && volume != nil && string(volume.UID) != scenario.expectedMatch { + t.Errorf("Expected %s but got volume %s instead", scenario.expectedMatch, volume.UID) + } + if scenario.expectedMatch == "" && volume != nil { + t.Errorf("Unexpected match for scenario: %s", name) + } + } +} + +func TestSort(t *testing.T) { + volList := NewPersistentVolumeOrderedIndex() + for _, pv := range createTestVolumes() { + volList.Add(pv) + } + + volumes, err := volList.ListByAccessModes([]api.AccessModeType{api.ReadWriteOnce, api.ReadOnlyMany}) + if err != nil { + t.Error("Unexpected error retrieving volumes by access modes:", err) + } + + for i, expected := range []string{"gce-pd-1", "gce-pd-5", "gce-pd-10"} { + if string(volumes[i].UID) != expected { + t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID) + } + } + + volumes, err = volList.ListByAccessModes([]api.AccessModeType{api.ReadWriteOnce, api.ReadOnlyMany, api.ReadWriteMany}) + if err != nil { + t.Error("Unexpected error retrieving volumes by access modes:", err) + } + + for i, expected := range []string{"nfs-1", "nfs-5", "nfs-10"} { + if string(volumes[i].UID) != expected { + t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID) + } + } +} + +func createTestVolumes() []*api.PersistentVolume { + // these volumes are deliberately out-of-order to test indexing and sorting + return []*api.PersistentVolume{ + { + ObjectMeta: api.ObjectMeta{ + UID: "gce-pd-10", + Name: "gce003", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{}, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "nfs-5", + Name: "nfs002", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{}, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "gce-pd-1", + Name: "gce001", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("1G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{}, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + }, + // this one we're pretending is already bound + ClaimRef: &api.ObjectReference{UID: "abc123"}, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "nfs-10", + Name: "nfs003", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{}, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "gce-pd-5", + Name: "gce002", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{}, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "nfs-1", + Name: "nfs001", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("1G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{}, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + }, + } +} diff --git a/pkg/volumeclaimbinder/types.go b/pkg/volumeclaimbinder/types.go new file mode 100644 index 00000000000..ee4c1412798 --- /dev/null +++ b/pkg/volumeclaimbinder/types.go @@ -0,0 +1,147 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeclaimbinder + +import ( + "fmt" + "sort" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" +) + +// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity. +type persistentVolumeOrderedIndex struct { + cache.Indexer +} + +var _ cache.Store = &persistentVolumeOrderedIndex{} // persistentVolumeOrderedIndex is a Store + +func NewPersistentVolumeOrderedIndex() *persistentVolumeOrderedIndex { + return &persistentVolumeOrderedIndex{ + cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc}), + } +} + +// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string +func accessModesIndexFunc(obj interface{}) (string, error) { + if pv, ok := obj.(*api.PersistentVolume); ok { + modes := volume.GetAccessModesAsString(pv.Spec.AccessModes) + return modes, nil + } + return "", fmt.Errorf("object is not a persistent volume: %v", obj) +} + +// ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high) +func (pvstore *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.AccessModeType) ([]*api.PersistentVolume, error) { + pv := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: modes, + }, + } + + objs, err := pvstore.Index("accessmodes", pv) + if err != nil { + return nil, err + } + + volumes := make([]*api.PersistentVolume, len(objs)) + for i, obj := range objs { + volumes[i] = obj.(*api.PersistentVolume) + } + + sort.Sort(byCapacity{volumes}) + return volumes, nil +} + +// matchPredicate is a function that indicates that a persistent volume matches another +type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool + +// Find returns the nearest PV from the ordered list or nil if a match is not found +func (pvstore *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) { + volumes, err := pvstore.ListByAccessModes(pv.Spec.AccessModes) + if err != nil { + return nil, err + } + + i := sort.Search(len(volumes), func(i int) bool { return matchPredicate(pv, volumes[i]) }) + if i < len(volumes) { + return volumes[i], nil + } + return nil, nil +} + +// FindByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage +func (pvstore *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity(modes []api.AccessModeType, qty resource.Quantity) (*api.PersistentVolume, error) { + pv := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: modes, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): qty, + }, + }, + } + + return pvstore.Find(pv, filterBoundVolumes) +} + +// FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage +func (pvstore *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) { + return pvstore.FindByAccessModesAndStorageCapacity(claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)]) +} + +// byCapacity is used to order volumes by ascending storage size +type byCapacity struct { + volumes []*api.PersistentVolume +} + +func (c byCapacity) Less(i, j int) bool { + return matchStorageCapacity(c.volumes[i], c.volumes[j]) +} + +func (c byCapacity) Swap(i, j int) { + c.volumes[i], c.volumes[j] = c.volumes[j], c.volumes[i] +} + +func (c byCapacity) Len() int { + return len(c.volumes) +} + +// matchStorageCapacity is a matchPredicate used to sort and find volumes +func matchStorageCapacity(pvA, pvB *api.PersistentVolume) bool { + + // skip already claimed volumes + if pvA.Spec.ClaimRef != nil { + return false + } + + aQty := pvA.Spec.Capacity[api.ResourceStorage] + bQty := pvB.Spec.Capacity[api.ResourceStorage] + aSize := aQty.Value() + bSize := bQty.Value() + return aSize <= bSize +} + +// filterBoundVolumes is a matchPredicate that filters bound volumes before comparing storage capacity +func filterBoundVolumes(compareThis, toThis *api.PersistentVolume) bool { + if toThis.Spec.ClaimRef != nil || compareThis.Spec.ClaimRef != nil { + return false + } + return matchStorageCapacity(compareThis, toThis) +} From fb412e47e4ec905500b183590c980090c38e9841 Mon Sep 17 00:00:00 2001 From: markturansky Date: Thu, 16 Apr 2015 13:26:08 -0400 Subject: [PATCH 02/12] Addressed feedback, improved flow and comments --- .../app/controllermanager.go | 7 + .../persistent_volume_claim_binder.go | 188 ++++++++++-------- 2 files changed, 110 insertions(+), 85 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 6b74e2cd61f..80386764f47 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -45,6 +45,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" ) // CMServer is the main context object for the controller manager. @@ -58,6 +59,7 @@ type CMServer struct { NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration NamespaceSyncPeriod time.Duration + PVClaimBinderSyncPeriod time.Duration RegisterRetryCount int MachineList util.StringList SyncNodeList bool @@ -90,6 +92,7 @@ func NewCMServer() *CMServer { NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, NamespaceSyncPeriod: 5 * time.Minute, + PVClaimBinderSyncPeriod: 10 * time.Second, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, NodeMilliCPU: 1000, @@ -113,6 +116,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") + fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder_sync_period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims") fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.") fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.") fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") @@ -231,6 +235,9 @@ func (s *CMServer) Run(_ []string) error { namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) namespaceManager.Run() + pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient) + pvclaimBinder.Run(s.PVClaimBinderSyncPeriod) + select {} return nil } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index 3109531fbbf..4fb23498fad 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -17,10 +17,10 @@ limitations under the License. package volumeclaimbinder import ( + "fmt" "sync" "time" - "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" @@ -37,6 +37,8 @@ type PersistentVolumeClaimBinder struct { volumeStore *persistentVolumeOrderedIndex claimStore cache.Store client client.Interface + // protects access to binding + lock sync.RWMutex } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder @@ -82,6 +84,106 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu return binder } +// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. +func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { + volume := obj.(*api.PersistentVolume) + glog.V(5).Infof("Synchronizing PersistentVolume[%s]%s\n", volume.Name) + + if volume.Spec.ClaimRef != nil { + if volume.Status.Phase == api.VolumeAvailable { + volume.Status.Phase = api.VolumeBound + _, err := controller.client.PersistentVolumes().Update(volume) + if err != nil { + return fmt.Errorf("Error updating pv.status: %v\n", err) + } + } + + // verify the volume is still claimed by a user + if claim, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil { + glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) + controller.syncPersistentVolumeClaimStatus(volume, claim) + } else { + //claim was deleted by user. + glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) + // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly release volume + volume.Status.Phase = api.VolumeReleased + volume, err = controller.client.PersistentVolumes().UpdateStatus(volume) + if err != nil { + return fmt.Errorf("Error updating pv: %+v\n", err) + } + } + } + return nil +} + +func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { + claim := obj.(*api.PersistentVolumeClaim) + glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) + + if claim.Status.VolumeRef != nil { + glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) + return nil + } + + pv, err := controller.volumeStore.FindBestMatchForClaim(claim) + if err != nil { + return err + } + + if pv != nil { + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) + } + + // make a binding reference to the claim + pv.Spec.ClaimRef = claimRef + pv, err = controller.client.PersistentVolumes().Update(pv) + + if err != nil { + // volume no longer bound + pv.Spec.ClaimRef = nil + return fmt.Errorf("Error updating volume: %+v\n", err) + } else { + glog.V(3).Infof("PersistentVolumeClaim[%s] bound to PersistentVolume[%s]\n", claim.Name, pv.Name) + pv.Status.Phase = api.VolumeBound + err := controller.syncPersistentVolumeClaimStatus(pv, claim) + if err != nil { + return fmt.Errorf("Error updating pvclaim.status: %v\n", err) + } + } + } else { + glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) + } + return nil +} + +// syncPersistentVolumeClaimStatus builds and persistens a PVClaim's Status, rolling back to empty values if the update fails +func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) error { + volumeRef, err := api.GetReference(volume) + if err != nil { + return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) + } + // all "actuals" are transferred from PV to PVC so the user knows what + // type of volume they actually got for their claim + claim.Status.Phase = api.ClaimBound + claim.Status.VolumeRef = volumeRef + claim.Status.AccessModes = volume.Spec.AccessModes + claim.Status.Capacity = volume.Spec.Capacity + + _, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) + + if err != nil { + claim.Status.Phase = api.ClaimPending + claim.Status.VolumeRef = nil + claim.Status.AccessModes = nil + claim.Status.Capacity = nil + } + + return err +} + + func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") go util.Forever(func() { controller.synchronize() }, period) @@ -130,87 +232,3 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr wg.Wait() } } - -// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. -func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { - volume := obj.(*api.PersistentVolume) - glog.V(5).Infof("Synchronizing persistent volume: %s\n", volume.Name) - - // verify the volume is still claimed by a user - if volume.Spec.ClaimRef != nil { - if _, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil { - glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) - } else { - //claim was deleted by user. - glog.V(3).Infof("PersistentVolumeClaim[UID=%s] unbound from PersistentVolume[UID=%s]\n", volume.Spec.ClaimRef.UID, volume.UID) - volume.Spec.ClaimRef = nil - volume.Status.Phase = api.VolumeReleased - volume, err = controller.client.PersistentVolumes().Update(volume) - if err != nil { - glog.V(3).Infof("Error updating volume: %+v\n", err) - } - } - } - return nil -} - -func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { - claim := obj.(*api.PersistentVolumeClaim) - glog.V(5).Infof("Synchronizing persistent volume claim: %s\n", claim.Name) - - if claim.Status.VolumeRef != nil { - glog.V(5).Infof("PersistentVolumeClaim[UID=%s] is bound to PersistentVolume[UID=%s]\n", claim.Name, claim.Status.VolumeRef.Name) - return nil - } - - volume, err := controller.volumeStore.FindBestMatchForClaim(claim) - if err != nil { - return err - } - - if volume != nil { - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - volumeRef, err := api.GetReference(volume) - if err != nil { - return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) - } - - // make a binding reference to the claim - volume.Spec.ClaimRef = claimRef - volume, err = controller.client.PersistentVolumes().Update(volume) - - if err != nil { - glog.V(3).Infof("Error updating volume: %+v\n", err) - } else { - - // all "actuals" are transferred from PV to PVC so the user knows what - // type of volume they actually got for their claim - claim.Status.Phase = api.ClaimBound - claim.Status.VolumeRef = volumeRef - claim.Status.AccessModes = volume.Spec.AccessModes - claim.Status.Capacity = volume.Spec.Capacity - - _, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) - if err != nil { - glog.V(3).Infof("Error updating claim: %+v\n", err) - - // uset ClaimRef on the pointer to make it available for binding again - volume.Spec.ClaimRef = nil - volume, err = controller.client.PersistentVolumes().Update(volume) - - // unset VolumeRef on the pointer so this claim can be processed next sync loop - claim.Status.VolumeRef = nil - } else { - glog.V(2).Infof("PersistentVolumeClaim[UID=%s] bound to PersistentVolume[UID=%s]\n", claim.UID, volume.UID) - } - } - } else { - glog.V(5).Infof("No volume match found for %s\n", claim.UID) - } - - return nil -} From 6c6aab60abf61dff2fc4eb0ca03bcd6f7e06e996 Mon Sep 17 00:00:00 2001 From: markturansky Date: Thu, 16 Apr 2015 13:28:45 -0400 Subject: [PATCH 03/12] goformatted --- cmd/kube-controller-manager/app/controllermanager.go | 2 +- pkg/volumeclaimbinder/persistent_volume_claim_binder.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 80386764f47..55d7cf66e9e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -42,10 +42,10 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" - "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" ) // CMServer is the main context object for the controller manager. diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index 4fb23498fad..c98831e3703 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -183,7 +183,6 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v return err } - func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") go util.Forever(func() { controller.synchronize() }, period) From f26df6a98389b8ba94b2dbb3e20c74575cae96d1 Mon Sep 17 00:00:00 2001 From: markturansky Date: Thu, 16 Apr 2015 13:50:56 -0400 Subject: [PATCH 04/12] renamed exported symbol for pvOrderedIndex, added locks, remove go routine for sync funcs --- .../persistent_volume_claim_binder.go | 5 ++++- pkg/volumeclaimbinder/types.go | 17 ++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index c98831e3703..e55ca6ed894 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -117,6 +117,9 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interfac } func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { + controller.lock.Lock() + defer controller.lock.Unlock() + claim := obj.(*api.PersistentVolumeClaim) glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) @@ -218,7 +221,7 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr wg := sync.WaitGroup{} wg.Add(len(items)) for ix := range items { - go func(ix int) { + func(ix int) { defer wg.Done() obj := items[ix] glog.V(5).Infof("Reconciliation of %v", obj) diff --git a/pkg/volumeclaimbinder/types.go b/pkg/volumeclaimbinder/types.go index ee4c1412798..2ffd53dcc1e 100644 --- a/pkg/volumeclaimbinder/types.go +++ b/pkg/volumeclaimbinder/types.go @@ -49,14 +49,14 @@ func accessModesIndexFunc(obj interface{}) (string, error) { } // ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high) -func (pvstore *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.AccessModeType) ([]*api.PersistentVolume, error) { +func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.AccessModeType) ([]*api.PersistentVolume, error) { pv := &api.PersistentVolume{ Spec: api.PersistentVolumeSpec{ AccessModes: modes, }, } - objs, err := pvstore.Index("accessmodes", pv) + objs, err := pvIndex.Index("accessmodes", pv) if err != nil { return nil, err } @@ -74,8 +74,8 @@ func (pvstore *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.Acces type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool // Find returns the nearest PV from the ordered list or nil if a match is not found -func (pvstore *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) { - volumes, err := pvstore.ListByAccessModes(pv.Spec.AccessModes) +func (pvIndex *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) { + volumes, err := pvIndex.ListByAccessModes(pv.Spec.AccessModes) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func (pvstore *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matc } // FindByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage -func (pvstore *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity(modes []api.AccessModeType, qty resource.Quantity) (*api.PersistentVolume, error) { +func (pvIndex *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity(modes []api.AccessModeType, qty resource.Quantity) (*api.PersistentVolume, error) { pv := &api.PersistentVolume{ Spec: api.PersistentVolumeSpec{ AccessModes: modes, @@ -98,12 +98,12 @@ func (pvstore *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity }, } - return pvstore.Find(pv, filterBoundVolumes) + return pvIndex.Find(pv, filterBoundVolumes) } // FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage -func (pvstore *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) { - return pvstore.FindByAccessModesAndStorageCapacity(claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)]) +func (pvIndex *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) { + return pvIndex.FindByAccessModesAndStorageCapacity(claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)]) } // byCapacity is used to order volumes by ascending storage size @@ -125,7 +125,6 @@ func (c byCapacity) Len() int { // matchStorageCapacity is a matchPredicate used to sort and find volumes func matchStorageCapacity(pvA, pvB *api.PersistentVolume) bool { - // skip already claimed volumes if pvA.Spec.ClaimRef != nil { return false From e1b885c9adb37775f1796466f8e6a136087254f7 Mon Sep 17 00:00:00 2001 From: markturansky Date: Thu, 16 Apr 2015 22:09:20 -0400 Subject: [PATCH 05/12] narrowed client interface to allow easier testing. added PVC deletion test case. --- .../persistent_volume_claim_binder.go | 77 ++++++++++++++++--- .../persistent_volume_claim_binder_test.go | 71 ++++++++++++++--- 2 files changed, 125 insertions(+), 23 deletions(-) diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index e55ca6ed894..e2a22d5b305 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -36,7 +36,8 @@ import ( type PersistentVolumeClaimBinder struct { volumeStore *persistentVolumeOrderedIndex claimStore cache.Store - client client.Interface + client binderClient + // protects access to binding lock sync.RWMutex } @@ -78,7 +79,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu binder := &PersistentVolumeClaimBinder{ volumeStore: volumeStore, claimStore: claimStore, - client: kubeClient, + client: NewBinderClient(kubeClient), } return binder @@ -87,31 +88,37 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu // syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { volume := obj.(*api.PersistentVolume) - glog.V(5).Infof("Synchronizing PersistentVolume[%s]%s\n", volume.Name) + glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) if volume.Spec.ClaimRef != nil { if volume.Status.Phase == api.VolumeAvailable { volume.Status.Phase = api.VolumeBound - _, err := controller.client.PersistentVolumes().Update(volume) + _, err := controller.client.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv.status: %v\n", err) } } // verify the volume is still claimed by a user - if claim, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil { + if claim, err := controller.client.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) controller.syncPersistentVolumeClaimStatus(volume, claim) } else { //claim was deleted by user. glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) - // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly release volume + // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume volume.Status.Phase = api.VolumeReleased - volume, err = controller.client.PersistentVolumes().UpdateStatus(volume) + volume, err = controller.client.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv: %+v\n", err) } } + } else { + volume.Status.Phase = api.VolumeAvailable + _, err := controller.client.UpdatePersistentVolumeStatus(volume) + if err != nil { + return fmt.Errorf("Error updating pv.status: %v\n", err) + } } return nil } @@ -141,7 +148,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj int // make a binding reference to the claim pv.Spec.ClaimRef = claimRef - pv, err = controller.client.PersistentVolumes().Update(pv) + pv, err = controller.client.UpdatePersistentVolume(pv) if err != nil { // volume no longer bound @@ -157,6 +164,11 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj int } } else { glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) + claim.Status.Phase = api.ClaimPending + _, err := controller.client.UpdatePersistentVolumeClaimStatus(claim) + if err != nil { + return fmt.Errorf("Error updating pvclaim.status: %v\n", err) + } } return nil } @@ -174,7 +186,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v claim.Status.AccessModes = volume.Spec.AccessModes claim.Status.Capacity = volume.Spec.Capacity - _, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) + _, err = controller.client.UpdatePersistentVolumeClaimStatus(claim) if err != nil { claim.Status.Phase = api.ClaimPending @@ -192,7 +204,8 @@ func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { } // Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop, -// because we're reconciling two Kinds in this component and I didn't want to dupe the loop +// because we're reconciling two Kinds in this component and don't want to dupe the loop +// TODO MarkT - refactor to new DeltaFifo and new controller framework type Synchronizer struct { ListFunc func() []interface{} ReconcileFunc func(interface{}) error @@ -221,7 +234,7 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr wg := sync.WaitGroup{} wg.Add(len(items)) for ix := range items { - func(ix int) { + go func(ix int) { defer wg.Done() obj := items[ix] glog.V(5).Infof("Reconciliation of %v", obj) @@ -234,3 +247,45 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr wg.Wait() } } + +// binderClient abstracts access to PVs and PVCs +type binderClient interface { + GetPersistentVolume(name string) (*api.PersistentVolume, error) + UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) + UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) + GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) + UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) + UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) +} + +func NewBinderClient(c client.Interface) binderClient { + return &realBinderClient{c} +} + +type realBinderClient struct { + client client.Interface +} + +func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Get(name) +} + +func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Update(volume) +} + +func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().UpdateStatus(volume) +} + +func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { + return c.client.PersistentVolumeClaims(namespace).Get(name) +} + +func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim) +} + +func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim) +} diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index 05dbe72f1f7..a310e86c6fc 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -20,6 +20,7 @@ import ( "reflect" "testing" + "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" @@ -166,19 +167,27 @@ func TestBindingWithExamples(t *testing.T) { t.Error("Unexpected error getting PVC from client: %v", err) } - controller := NewPersistentVolumeClaimBinder(client) - err = controller.volumeStore.Add(pv) - if err != nil { - t.Error("Unexpected error: %v", err) + mockClient := &mockBinderClient{ + volume: pv, + claim: claim, } - if _, exists, _ := controller.volumeStore.Get(pv); !exists { - t.Error("Expected to find volume in the index") + controller := PersistentVolumeClaimBinder{ + volumeStore: NewPersistentVolumeOrderedIndex(), + client: mockClient, } - err = controller.syncPersistentVolumeClaim(claim) - if err != nil { - t.Error("Unexpected error: %v", err) + controller.volumeStore.Add(pv) + controller.syncPersistentVolume(pv) + + if pv.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + } + + controller.syncPersistentVolumeClaim(claim) + + if pv.Status.Phase != api.VolumeBound { + t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) } if claim.Status.VolumeRef == nil { @@ -192,9 +201,47 @@ func TestBindingWithExamples(t *testing.T) { t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) } if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) + t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0]) } - if claim.Status.Phase != api.ClaimBound { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) + + // pretend the user deleted their claim + mockClient.claim = nil + + controller.syncPersistentVolume(pv) + if pv.Status.Phase != api.VolumeReleased { + t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) } } + +type mockBinderClient struct { + volume *api.PersistentVolume + claim *api.PersistentVolumeClaim +} + +func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { + return c.volume, nil +} + +func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return volume, nil +} + +func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return volume, nil +} + +func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { + if c.claim != nil { + return c.claim, nil + } else { + return nil, fmt.Errorf("Claim does not exist") + } +} + +func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return claim, nil +} + +func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { + return claim, nil +} From 49883e7d01876f35bd122d623bff154763e18290 Mon Sep 17 00:00:00 2001 From: markturansky Date: Fri, 17 Apr 2015 10:42:25 -0400 Subject: [PATCH 06/12] Edited README --- .../persistent-volumes/simpletest/README.md | 92 ++++++++++++------- pkg/volumeclaimbinder/types.go | 2 +- 2 files changed, 58 insertions(+), 36 deletions(-) diff --git a/examples/persistent-volumes/simpletest/README.md b/examples/persistent-volumes/simpletest/README.md index 026920d172b..1911c6e4fea 100644 --- a/examples/persistent-volumes/simpletest/README.md +++ b/examples/persistent-volumes/simpletest/README.md @@ -1,69 +1,91 @@ # How To Use Persistent Volumes +The purpose of this guide is to help you become familiar with Kubernetes Persistent Volumes. By the end of the guide, we'll have +nginx serving content from your persistent volume. + This guide assumes knowledge of Kubernetes fundamentals and that a user has a cluster up and running. -## Create volumes +## Provisioning -Persistent Volumes are intended for "network volumes", such as GCE Persistent Disks, NFS shares, and AWS EBS volumes. +A PersistentVolume in Kubernetes represents a real piece of underlying storage capacity in the infrastructure. Cluster administrators +must first create storage (create their GCE disks, export their NFS shares, etc.) in order for Kubernetes to mount it. -The `HostPath` VolumeSource was included in the Persistent Volumes implementation for ease of testing. - -Create persistent volumes by posting them to the API server: +PVs are intended for "network volumes" like GCE Persistent Disks, NFS shares, and AWS ElasticBlockStore volumes. `HostPath` was included +for ease of development and testing. You'll create a local `HostPath` for this example. + +``` + +// this will be nginx's webroot +mkdir /tmp/data01 +echo 'I love Kubernetes storage!' > /tmp/data01/index.html + +``` + +PVs are created by posting them to the API server. ``` cluster/kubectl.sh create -f examples/persistent-volumes/volumes/local-01.yaml -cluster/kubectl.sh create -f examples/persistent-volumes/volumes/local-02.yaml - cluster/kubectl.sh get pv NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM -pv0001 map[] 10737418240 RWO -pv0002 map[] 5368709120 RWO - - -In the log: - -I0302 10:20:45.663225 1920 persistent_volume_manager.go:115] Managing PersistentVolume[UID=b16e91d6-c0ef-11e4-8be4-80e6500a981e] -I0302 10:20:55.667945 1920 persistent_volume_manager.go:115] Managing PersistentVolume[UID=b41f4f0e-c0ef-11e4-8be4-80e6500a981e] +pv0001 map[] 10737418240 RWO Available ``` -## Create claims +## Requesting storage + +Users of Kubernetes request persistent storage for their pods. They don't know how the underlying cluster is provisioned. +They just know they can rely on their claim to storage and they can manage its lifecycle independently from the many pods that may use it. You must be in a namespace to create claims. ``` cluster/kubectl.sh create -f examples/persistent-volumes/claims/claim-01.yaml -cluster/kubectl.sh create -f examples/persistent-volumes/claims/claim-02.yaml +cluster/kubectl.sh get pvc NAME LABELS STATUS VOLUME myclaim-1 map[] -myclaim-2 map[] + + +# A background process will attempt to match this claim to a volume. +# The eventual state of your claim will look something like this: -``` - - -## Matching and binding - -``` - -PersistentVolumeClaim[UID=f4b3d283-c0ef-11e4-8be4-80e6500a981e] bound to PersistentVolume[UID=b16e91d6-c0ef-11e4-8be4-80e6500a981e] +cluster/kubectl.sh get pvc +NAME LABELS STATUS VOLUME +myclaim-1 map[] Bound f5c3a89a-e50a-11e4-972f-80e6500a981e cluster/kubectl.sh get pv NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM -pv0001 map[] 10737418240 RWO myclaim-1 / f4b3d283-c0ef-11e4-8be4-80e6500a981e -pv0002 map[] 5368709120 RWO myclaim-2 / f70da891-c0ef-11e4-8be4-80e6500a981e - - -cluster/kubectl.sh get pvc - -NAME LABELS STATUS VOLUME -myclaim-1 map[] b16e91d6-c0ef-11e4-8be4-80e6500a981e -myclaim-2 map[] b41f4f0e-c0ef-11e4-8be4-80e6500a981e +pv0001 map[] 10737418240 RWO Bound myclaim-1 / 6bef4c40-e50b-11e4-972f-80e6500a981e ``` + +## Using your claim as a volume + +Claims are used as volumes in pods. Kubernetes uses the claim to look up its bound PV. The PV is then exposed to the pod. + +``` +k create -f examples/persistent-volumes/simpletest/pod.yaml + +cluster/kubectl.sh get pods + +POD IP CONTAINER(S) IMAGE(S) HOST LABELS STATUS CREATED +mypod 172.17.0.2 myfrontend nginx 127.0.0.1/127.0.0.1 Running 12 minutes + + +k create -f examples/persistent-volumes/simpletest/service.json +cluster/kubectl.sh get services +Running: cluster/../cluster/gce/../../cluster/../_output/local/bin/darwin/amd64/kubectl --v=5 get services +NAME LABELS SELECTOR IP PORT(S) +kubernetes component=apiserver,provider=kubernetes 10.0.0.2 443/TCP +kubernetes-ro component=apiserver,provider=kubernetes 10.0.0.1 80/TCP + + +curl 10.0.0.168:3000 +I love Kubernetes storage! +``` diff --git a/pkg/volumeclaimbinder/types.go b/pkg/volumeclaimbinder/types.go index 2ffd53dcc1e..d50135c5eb3 100644 --- a/pkg/volumeclaimbinder/types.go +++ b/pkg/volumeclaimbinder/types.go @@ -139,7 +139,7 @@ func matchStorageCapacity(pvA, pvB *api.PersistentVolume) bool { // filterBoundVolumes is a matchPredicate that filters bound volumes before comparing storage capacity func filterBoundVolumes(compareThis, toThis *api.PersistentVolume) bool { - if toThis.Spec.ClaimRef != nil || compareThis.Spec.ClaimRef != nil { + if compareThis.Spec.ClaimRef != nil || toThis.Spec.ClaimRef != nil { return false } return matchStorageCapacity(compareThis, toThis) From b634f17ca705bbef2db47d6655c5ba0c01d76e00 Mon Sep 17 00:00:00 2001 From: markturansky Date: Sat, 18 Apr 2015 06:54:33 -0400 Subject: [PATCH 07/12] Refactored to DeltaFifo --- .../app/controllermanager.go | 4 +- .../persistent_volume_claim_binder.go | 253 +++++++++--------- .../persistent_volume_claim_binder_test.go | 78 +++++- 3 files changed, 190 insertions(+), 145 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 55d7cf66e9e..1a1094ad40d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -235,8 +235,8 @@ func (s *CMServer) Run(_ []string) error { namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) namespaceManager.Run() - pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient) - pvclaimBinder.Run(s.PVClaimBinderSyncPeriod) + pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) + pvclaimBinder.Run() select {} return nil diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index e2a22d5b305..b655fe6a3a0 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -18,34 +18,35 @@ package volumeclaimbinder import ( "fmt" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" + "reflect" ) // PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims. type PersistentVolumeClaimBinder struct { - volumeStore *persistentVolumeOrderedIndex - claimStore cache.Store - client binderClient - - // protects access to binding - lock sync.RWMutex + volumeIndex *persistentVolumeOrderedIndex + volumeController *framework.Controller + claimController *framework.Controller + client binderClient } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder -func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolumeClaimBinder { - volumeStore := NewPersistentVolumeOrderedIndex() - volumeReflector := cache.NewReflector( +func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { + volumeIndex := NewPersistentVolumeOrderedIndex() + binderClient := NewBinderClient(kubeClient) + + _, volumeController := framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) @@ -55,13 +56,28 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu }, }, &api.PersistentVolume{}, - volumeStore, - 0, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + volume := obj.(*api.PersistentVolume) + volumeIndex.Indexer.Add(volume) + syncVolume(binderClient, volume) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldVolume := oldObj.(*api.PersistentVolume) + newVolume := newObj.(*api.PersistentVolume) + volumeIndex.Indexer.Update(newVolume) + if updateRequired(oldVolume, newVolume) { + syncVolume(binderClient, newVolume) + } + }, + DeleteFunc: func(obj interface{}) { + volume := obj.(*api.PersistentVolume) + volumeIndex.Indexer.Delete(volume) + }, + }, ) - volumeReflector.Run() - - claimStore := cache.NewStore(cache.MetaNamespaceKeyFunc) - claimReflector := cache.NewReflector( + _, claimController := framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything()) @@ -71,51 +87,75 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu }, }, &api.PersistentVolumeClaim{}, - claimStore, - 0, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + claim := obj.(*api.PersistentVolumeClaim) + syncClaim(volumeIndex, binderClient, claim) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // oldClaim := newObj.(*api.PersistentVolumeClaim) + newClaim := newObj.(*api.PersistentVolumeClaim) + if newClaim.Status.VolumeRef == nil { + syncClaim(volumeIndex, binderClient, newClaim) + } + }, + }, ) - claimReflector.Run() binder := &PersistentVolumeClaimBinder{ - volumeStore: volumeStore, - claimStore: claimStore, - client: NewBinderClient(kubeClient), + volumeController: volumeController, + claimController: claimController, + volumeIndex: volumeIndex, + client: binderClient, } return binder } -// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists. -func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error { - volume := obj.(*api.PersistentVolume) +func updateRequired(oldVolume, newVolume *api.PersistentVolume) bool { + // Spec changes affect indexing and sorting volumes + if !reflect.DeepEqual(oldVolume.Spec, newVolume.Spec) { + return true + } + if !reflect.DeepEqual(oldVolume.Status, newVolume.Status) { + return true + } + return false +} + +func syncVolume(binderClient binderClient, volume *api.PersistentVolume) (err error) { glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) if volume.Spec.ClaimRef != nil { if volume.Status.Phase == api.VolumeAvailable { volume.Status.Phase = api.VolumeBound - _, err := controller.client.UpdatePersistentVolumeStatus(volume) + _, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv.status: %v\n", err) } } // verify the volume is still claimed by a user - if claim, err := controller.client.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { + if claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) - controller.syncPersistentVolumeClaimStatus(volume, claim) + // rebuild the Claim's Status as needed + if claim.Status.VolumeRef == nil { + syncClaimStatus(binderClient, volume, claim) + } } else { //claim was deleted by user. glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume volume.Status.Phase = api.VolumeReleased - volume, err = controller.client.UpdatePersistentVolumeStatus(volume) + volume, err = binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv: %+v\n", err) } } } else { volume.Status.Phase = api.VolumeAvailable - _, err := controller.client.UpdatePersistentVolumeStatus(volume) + _, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { return fmt.Errorf("Error updating pv.status: %v\n", err) } @@ -123,58 +163,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interfac return nil } -func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error { - controller.lock.Lock() - defer controller.lock.Unlock() - - claim := obj.(*api.PersistentVolumeClaim) - glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) - - if claim.Status.VolumeRef != nil { - glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) - return nil - } - - pv, err := controller.volumeStore.FindBestMatchForClaim(claim) - if err != nil { - return err - } - - if pv != nil { - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - // make a binding reference to the claim - pv.Spec.ClaimRef = claimRef - pv, err = controller.client.UpdatePersistentVolume(pv) - - if err != nil { - // volume no longer bound - pv.Spec.ClaimRef = nil - return fmt.Errorf("Error updating volume: %+v\n", err) - } else { - glog.V(3).Infof("PersistentVolumeClaim[%s] bound to PersistentVolume[%s]\n", claim.Name, pv.Name) - pv.Status.Phase = api.VolumeBound - err := controller.syncPersistentVolumeClaimStatus(pv, claim) - if err != nil { - return fmt.Errorf("Error updating pvclaim.status: %v\n", err) - } - } - } else { - glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) - claim.Status.Phase = api.ClaimPending - _, err := controller.client.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { - return fmt.Errorf("Error updating pvclaim.status: %v\n", err) - } - } - return nil -} - -// syncPersistentVolumeClaimStatus builds and persistens a PVClaim's Status, rolling back to empty values if the update fails -func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) error { +func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) (err error) { volumeRef, err := api.GetReference(volume) if err != nil { return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) @@ -186,7 +175,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v claim.Status.AccessModes = volume.Spec.AccessModes claim.Status.Capacity = volume.Spec.Capacity - _, err = controller.client.UpdatePersistentVolumeClaimStatus(claim) + _, err = binderClient.UpdatePersistentVolumeClaimStatus(claim) if err != nil { claim.Status.Phase = api.ClaimPending @@ -198,54 +187,56 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v return err } -func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) { +func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { + glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) + + if claim.Status.VolumeRef != nil { + glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) + return nil + } + + volume, err := volumeIndex.FindBestMatchForClaim(claim) + if err != nil { + return err + } + + if volume != nil { + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) + } + + // make a binding reference to the claim + volume.Spec.ClaimRef = claimRef + volume, err = binderClient.UpdatePersistentVolume(volume) + + if err != nil { + // volume no longer bound + volume.Spec.ClaimRef = nil + return fmt.Errorf("Error updating volume: %+v\n", err) + } else { + err = syncClaimStatus(binderClient, volume, claim) + if err != nil { + return fmt.Errorf("Error update claim.status: %+v\n", err) + } + } + } else { + glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) + if claim.Status.Phase != api.ClaimPending { + claim.Status.Phase = api.ClaimPending + _, err := binderClient.UpdatePersistentVolumeClaimStatus(claim) + if err != nil { + return fmt.Errorf("Error updating pvclaim.status: %v\n", err) + } + } + } + return nil +} + +func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") - go util.Forever(func() { controller.synchronize() }, period) -} - -// Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop, -// because we're reconciling two Kinds in this component and don't want to dupe the loop -// TODO MarkT - refactor to new DeltaFifo and new controller framework -type Synchronizer struct { - ListFunc func() []interface{} - ReconcileFunc func(interface{}) error -} - -func (controller *PersistentVolumeClaimBinder) synchronize() { - volumeSynchronizer := Synchronizer{ - ListFunc: controller.volumeStore.List, - ReconcileFunc: controller.syncPersistentVolume, - } - - claimsSynchronizer := Synchronizer{ - ListFunc: controller.claimStore.List, - ReconcileFunc: controller.syncPersistentVolumeClaim, - } - - controller.reconcile(volumeSynchronizer, claimsSynchronizer) -} - -func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchronizer) { - for _, synchronizer := range synchronizers { - items := synchronizer.ListFunc() - if len(items) == 0 { - continue - } - wg := sync.WaitGroup{} - wg.Add(len(items)) - for ix := range items { - go func(ix int) { - defer wg.Done() - obj := items[ix] - glog.V(5).Infof("Reconciliation of %v", obj) - err := synchronizer.ReconcileFunc(obj) - if err != nil { - glog.Errorf("Error reconciling: %v", err) - } - }(ix) - } - wg.Wait() - } + go controller.claimController.Run(make(chan struct{})) + go controller.volumeController.Run(make(chan struct{})) } // binderClient abstracts access to PVs and PVCs diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index a310e86c6fc..4b6eab019e8 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -144,6 +144,54 @@ func TestExampleObjects(t *testing.T) { } } +func TestRequiresUpdate(t *testing.T) { + old := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data02", + }, + }, + }, + } + + new := &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data02", + }, + }, + ClaimRef: &api.ObjectReference{Name: "foo"}, + }, + } + + if !updateRequired(old, new) { + t.Errorf("Update expected for the new volume with added ClaimRef") + } + + old.Spec.ClaimRef = new.Spec.ClaimRef + old.Status.Phase = api.VolumeBound + + if !updateRequired(old, new) { + t.Errorf("Update expected for the new volume with added Status") + } + + new.Status.Phase = old.Status.Phase + + if updateRequired(old, new) { + t.Errorf("No updated expected for identical objects") + } +} + func TestBindingWithExamples(t *testing.T) { api.ForTesting_ReferencesAllowBlankSelfLinks = true @@ -167,33 +215,39 @@ func TestBindingWithExamples(t *testing.T) { t.Error("Unexpected error getting PVC from client: %v", err) } + volumeIndex := NewPersistentVolumeOrderedIndex() mockClient := &mockBinderClient{ volume: pv, claim: claim, } - controller := PersistentVolumeClaimBinder{ - volumeStore: NewPersistentVolumeOrderedIndex(), - client: mockClient, - } - - controller.volumeStore.Add(pv) - controller.syncPersistentVolume(pv) + volumeIndex.Add(pv) + syncVolume(mockClient, pv) if pv.Status.Phase != api.VolumeAvailable { t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) } - controller.syncPersistentVolumeClaim(claim) - - if pv.Status.Phase != api.VolumeBound { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + if pv.Spec.ClaimRef != nil { + t.Errorf("Expected nil ClaimRef but got %+v\n", pv.Spec.ClaimRef) } + syncClaim(volumeIndex, mockClient, claim) + + if pv.Spec.ClaimRef == nil { + t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv) + } + + syncVolume(mockClient, pv) + if claim.Status.VolumeRef == nil { t.Error("Expected claim to be bound to volume") } + if pv.Status.Phase != api.VolumeBound { + t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + } + if claim.Status.Phase != api.ClaimBound { t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) } @@ -206,8 +260,8 @@ func TestBindingWithExamples(t *testing.T) { // pretend the user deleted their claim mockClient.claim = nil + syncVolume(mockClient, pv) - controller.syncPersistentVolume(pv) if pv.Status.Phase != api.VolumeReleased { t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) } From 2cf4e6564bae086e1939ecb3d57caab833c3c459 Mon Sep 17 00:00:00 2001 From: markturansky Date: Sat, 18 Apr 2015 07:16:07 -0400 Subject: [PATCH 08/12] Added more to README, tweaked pod, added service, fixed validation --- .../persistent-volumes/simpletest/README.md | 53 +++++++++++++------ .../persistent-volumes/simpletest/pod.yaml | 16 +++--- .../simpletest/service.json | 9 ++++ pkg/api/validation/validation.go | 13 +++++ .../persistent_volume_claim_binder.go | 3 +- .../persistent_volume_claim_binder_test.go | 2 +- .../persistent_volume_index_test.go | 3 +- 7 files changed, 72 insertions(+), 27 deletions(-) create mode 100644 examples/persistent-volumes/simpletest/service.json diff --git a/examples/persistent-volumes/simpletest/README.md b/examples/persistent-volumes/simpletest/README.md index 1911c6e4fea..93489a40e04 100644 --- a/examples/persistent-volumes/simpletest/README.md +++ b/examples/persistent-volumes/simpletest/README.md @@ -3,15 +3,19 @@ The purpose of this guide is to help you become familiar with Kubernetes Persistent Volumes. By the end of the guide, we'll have nginx serving content from your persistent volume. -This guide assumes knowledge of Kubernetes fundamentals and that a user has a cluster up and running. +This guide assumes knowledge of Kubernetes fundamentals and that you have a cluster up and running. ## Provisioning A PersistentVolume in Kubernetes represents a real piece of underlying storage capacity in the infrastructure. Cluster administrators must first create storage (create their GCE disks, export their NFS shares, etc.) in order for Kubernetes to mount it. -PVs are intended for "network volumes" like GCE Persistent Disks, NFS shares, and AWS ElasticBlockStore volumes. `HostPath` was included -for ease of development and testing. You'll create a local `HostPath` for this example. +PVs are intended for "network volumes" like GCE Persistent Disks, NFS shares, and AWS ElasticBlockStore volumes. ```HostPath``` was included +for ease of development and testing. You'll create a local ```HostPath``` for this example. + +> IMPORTANT! For ```HostPath``` to work, you will need to run a single node cluster. Kubernetes does not +support local storage on the host at this time. There is no guarantee your pod ends up on the correct node where the ```HostPath``` resides. + ``` @@ -28,17 +32,17 @@ PVs are created by posting them to the API server. cluster/kubectl.sh create -f examples/persistent-volumes/volumes/local-01.yaml cluster/kubectl.sh get pv -NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM -pv0001 map[] 10737418240 RWO Available +NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM +pv0001 map[] 10737418240 RWO Available ``` ## Requesting storage Users of Kubernetes request persistent storage for their pods. They don't know how the underlying cluster is provisioned. -They just know they can rely on their claim to storage and they can manage its lifecycle independently from the many pods that may use it. +They just know they can rely on their claim to storage and can manage its lifecycle independently from the many pods that may use it. -You must be in a namespace to create claims. +Claims must be created in the same namespace as the pods that use them. ``` @@ -60,7 +64,7 @@ myclaim-1 map[] Bound f5c3a89a-e50a-11e4-972f-80e6500a981e cluster/kubectl.sh get pv -NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM +NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM pv0001 map[] 10737418240 RWO Bound myclaim-1 / 6bef4c40-e50b-11e4-972f-80e6500a981e ``` @@ -70,7 +74,8 @@ pv0001 map[] 10737418240 RWO Claims are used as volumes in pods. Kubernetes uses the claim to look up its bound PV. The PV is then exposed to the pod. ``` -k create -f examples/persistent-volumes/simpletest/pod.yaml + +cluster/kubectl.sh create -f examples/persistent-volumes/simpletest/pod.yaml cluster/kubectl.sh get pods @@ -78,14 +83,30 @@ POD IP CONTAINER(S) IMAGE(S) HOST LABELS mypod 172.17.0.2 myfrontend nginx 127.0.0.1/127.0.0.1 Running 12 minutes -k create -f examples/persistent-volumes/simpletest/service.json +cluster/kubectl.sh create -f examples/persistent-volumes/simpletest/service.json cluster/kubectl.sh get services -Running: cluster/../cluster/gce/../../cluster/../_output/local/bin/darwin/amd64/kubectl --v=5 get services -NAME LABELS SELECTOR IP PORT(S) -kubernetes component=apiserver,provider=kubernetes 10.0.0.2 443/TCP -kubernetes-ro component=apiserver,provider=kubernetes 10.0.0.1 80/TCP + +NAME LABELS SELECTOR IP PORT(S) +frontendservice name=frontendhttp 10.0.0.241 3000/TCP +kubernetes component=apiserver,provider=kubernetes 10.0.0.2 443/TCP +kubernetes-ro component=apiserver,provider=kubernetes 10.0.0.1 80/TCP -curl 10.0.0.168:3000 -I love Kubernetes storage! ``` + +## Next steps + +You should be able to query your service endpoint and see what content nginx is serving. A "forbidden" error might mean you +need to disable SELinux (setenforce 0). + +``` + +curl 10.0.0.241:3000 +I love Kubernetes storage! + +``` + +Hopefully this simple guide is enough to get you started with PersistentVolumes. If you have any questions, join +```#google-containers``` on IRC and ask! + +Enjoy! diff --git a/examples/persistent-volumes/simpletest/pod.yaml b/examples/persistent-volumes/simpletest/pod.yaml index aea1502242b..f9bab5316ab 100644 --- a/examples/persistent-volumes/simpletest/pod.yaml +++ b/examples/persistent-volumes/simpletest/pod.yaml @@ -2,17 +2,19 @@ kind: Pod apiVersion: v1beta3 metadata: name: mypod + labels: + name: frontendhttp spec: containers: - - image: nginx - name: myfrontend + - name: myfrontend + image: dockerfile/nginx + ports: + - containerPort: 80 + name: "http-server" volumeMounts: - mountPath: "/var/www/html" name: mypd volumes: - name: mypd - source: - persistentVolumeClaim: - accessMode: ReadWriteOnce - claimRef: - name: myclaim-1 + persistentVolumeClaim: + claimName: myclaim-1 \ No newline at end of file diff --git a/examples/persistent-volumes/simpletest/service.json b/examples/persistent-volumes/simpletest/service.json new file mode 100644 index 00000000000..3fbbe5de485 --- /dev/null +++ b/examples/persistent-volumes/simpletest/service.json @@ -0,0 +1,9 @@ +{ + "apiVersion": "v1beta1", + "kind": "Service", + "id": "frontendservice", + "port": 3000, + "containerPort": "http-server", + "selector": { "name": "frontendhttp" }, + "createExternalLoadBalancer": false +} diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index d047164588c..dfe8d6c4b6e 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -319,6 +319,11 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList { numVolumes++ allErrs = append(allErrs, validateGlusterfs(source.Glusterfs).Prefix("glusterfs")...) } + if source.PersistentVolumeClaimVolumeSource != nil { + numVolumes++ + allErrs = append(allErrs, validatePersistentClaimVolumeSource(source.PersistentVolumeClaimVolumeSource).Prefix("glusterfs")...) + } + if numVolumes != 1 { allErrs = append(allErrs, errs.NewFieldInvalid("", source, "exactly 1 volume type is required")) } @@ -394,6 +399,14 @@ func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.Valid return allErrs } +func validatePersistentClaimVolumeSource(claim *api.PersistentVolumeClaimVolumeSource) errs.ValidationErrorList { + allErrs := errs.ValidationErrorList{} + if claim.ClaimName == "" { + allErrs = append(allErrs, errs.NewFieldRequired("claimName")) + } + return allErrs +} + func validateNFS(nfs *api.NFSVolumeSource) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} if nfs.Server == "" { diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index b655fe6a3a0..da62b57cda8 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -18,6 +18,7 @@ package volumeclaimbinder import ( "fmt" + "reflect" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -30,7 +31,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" - "reflect" ) // PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims. @@ -94,7 +94,6 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time syncClaim(volumeIndex, binderClient, claim) }, UpdateFunc: func(oldObj, newObj interface{}) { - // oldClaim := newObj.(*api.PersistentVolumeClaim) newClaim := newObj.(*api.PersistentVolumeClaim) if newClaim.Status.VolumeRef == nil { syncClaim(volumeIndex, binderClient, newClaim) diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index 4b6eab019e8..c6bc047afaa 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -17,10 +17,10 @@ limitations under the License. package volumeclaimbinder import ( + "fmt" "reflect" "testing" - "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" diff --git a/pkg/volumeclaimbinder/persistent_volume_index_test.go b/pkg/volumeclaimbinder/persistent_volume_index_test.go index 4ee00d6d1f1..4d6dccf7e3c 100644 --- a/pkg/volumeclaimbinder/persistent_volume_index_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_index_test.go @@ -17,9 +17,10 @@ limitations under the License. package volumeclaimbinder import ( + "testing" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" - "testing" ) func TestMatchVolume(t *testing.T) { From 37d7f3f4f16dc038b323a22c05ae691e168a87b5 Mon Sep 17 00:00:00 2001 From: markturansky Date: Sat, 18 Apr 2015 09:31:24 -0400 Subject: [PATCH 09/12] Added integration test, fixed a validation issue --- .../app/controllermanager.go | 2 +- .../persistent-volumes/simpletest/pod.yaml | 2 +- pkg/api/validation/validation.go | 4 + .../persistent_volume_claim_binder.go | 24 ++- .../persistent_volume_claim_binder_test.go | 23 +++ test/integration/persistent_volumes_test.go | 190 ++++++++++++++++++ 6 files changed, 241 insertions(+), 4 deletions(-) create mode 100644 test/integration/persistent_volumes_test.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 1a1094ad40d..56f11a47b97 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -41,8 +41,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" + "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" diff --git a/examples/persistent-volumes/simpletest/pod.yaml b/examples/persistent-volumes/simpletest/pod.yaml index f9bab5316ab..f7f686c0404 100644 --- a/examples/persistent-volumes/simpletest/pod.yaml +++ b/examples/persistent-volumes/simpletest/pod.yaml @@ -17,4 +17,4 @@ spec: volumes: - name: mypd persistentVolumeClaim: - claimName: myclaim-1 \ No newline at end of file + claimName: myclaim-1 diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index dfe8d6c4b6e..df82dc97a2f 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -461,6 +461,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList numVolumes++ allErrs = append(allErrs, validateAWSElasticBlockStoreVolumeSource(pv.Spec.AWSElasticBlockStore).Prefix("awsElasticBlockStore")...) } + if pv.Spec.Glusterfs != nil { + numVolumes++ + allErrs = append(allErrs, validateGlusterfs(pv.Spec.Glusterfs).Prefix("glusterfs")...) + } if numVolumes != 1 { allErrs = append(allErrs, errs.NewFieldInvalid("", pv.Spec.PersistentVolumeSource, "exactly 1 volume type is required")) } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index da62b57cda8..90576f74913 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -39,6 +39,7 @@ type PersistentVolumeClaimBinder struct { volumeController *framework.Controller claimController *framework.Controller client binderClient + stopChannels map[string]chan struct{} } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder @@ -234,8 +235,27 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") - go controller.claimController.Run(make(chan struct{})) - go controller.volumeController.Run(make(chan struct{})) + if controller.stopChannels == nil { + controller.stopChannels = make(map[string]chan struct{}) + } + + if _, exists := controller.stopChannels["volumes"]; !exists { + controller.stopChannels["volumes"] = make(chan struct{}) + go controller.volumeController.Run(controller.stopChannels["volumes"]) + } + + if _, exists := controller.stopChannels["claims"]; !exists { + controller.stopChannels["claims"] = make(chan struct{}) + go controller.claimController.Run(controller.stopChannels["claims"]) + } +} + +func (controller *PersistentVolumeClaimBinder) Stop() { + glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n") + for name, stopChan := range controller.stopChannels { + close(stopChan) + delete(controller.stopChannels, name) + } } // binderClient abstracts access to PVs and PVCs diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index c6bc047afaa..d1999a4a5dd 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" @@ -27,6 +28,28 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" ) +func TestRunStop(t *testing.T) { + o := testclient.NewObjects(api.Scheme) + client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)} + binder := NewPersistentVolumeClaimBinder(client, 1*time.Second) + + if len(binder.stopChannels) != 0 { + t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels)) + } + + binder.Run() + + if len(binder.stopChannels) != 2 { + t.Errorf("Running binder should have exactly 2 stopChannels. Got %v", len(binder.stopChannels)) + } + + binder.Stop() + + if len(binder.stopChannels) != 0 { + t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels)) + } +} + func TestExampleObjects(t *testing.T) { scenarios := map[string]struct { diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go new file mode 100644 index 00000000000..87be82de4e8 --- /dev/null +++ b/test/integration/persistent_volumes_test.go @@ -0,0 +1,190 @@ +// +build integration,!no-etcd + +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder" +) + +func init() { + requireEtcd() +} + +func TestPersistentVolumeClaimBinder(t *testing.T) { + _, s := runAMaster(t) + defer s.Close() + + deleteAllEtcdKeys() + client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) + + binder := volumeclaimbinder.NewPersistentVolumeClaimBinder(client, 1*time.Second) + binder.Run() + defer binder.Stop() + + for _, volume := range createTestVolumes() { + _, err := client.PersistentVolumes().Create(volume) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + volumes, err := client.PersistentVolumes().List(labels.Everything(), fields.Everything()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(volumes.Items) != 2 { + t.Errorf("expected 2 PVs, got %#v", len(volumes.Items)) + } + + for _, claim := range createTestClaims() { + _, err := client.PersistentVolumeClaims(api.NamespaceDefault).Create(claim) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + claims, err := client.PersistentVolumeClaims(api.NamespaceDefault).List(labels.Everything(), fields.Everything()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(claims.Items) != 3 { + t.Errorf("expected 3 PVCs, got %#v", len(claims.Items)) + } + + // make sure the binder has caught up + time.Sleep(2 * time.Second) + + for _, claim := range createTestClaims() { + claim, err := client.PersistentVolumeClaims(api.NamespaceDefault).Get(claim.Name) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if (claim.Name == "claim01" || claim.Name == "claim02") && claim.Status.VolumeRef == nil { + t.Errorf("Expected claim to be bound: %v", claim) + } + if claim.Name == "claim03" && claim.Status.VolumeRef != nil { + t.Errorf("Expected claim03 to be unbound: %v", claim) + } + } +} + +func createTestClaims() []*api.PersistentVolumeClaim { + return []*api.PersistentVolumeClaim{ + { + ObjectMeta: api.ObjectMeta{ + Name: "claim03", + Namespace: api.NamespaceDefault, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("500G"), + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "claim01", + Namespace: api.NamespaceDefault, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("8G"), + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "claim02", + Namespace: api.NamespaceDefault, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.AccessModeType{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + }, + }, + }, + } +} + +func createTestVolumes() []*api.PersistentVolume { + return []*api.PersistentVolume{ + { + ObjectMeta: api.ObjectMeta{ + UID: "gce-pd-10", + Name: "gce003", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "gce123123123", + FSType: "foo", + }, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "nfs-5", + Name: "nfs002", + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("5G"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + Glusterfs: &api.GlusterfsVolumeSource{ + EndpointsName: "andintheend", + Path: "theloveyoutakeisequaltotheloveyoumake", + }, + }, + AccessModes: []api.AccessModeType{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + }, + } +} From a04420e548997a2f2e8a0bef3b59c268b3d21fb2 Mon Sep 17 00:00:00 2001 From: markturansky Date: Tue, 21 Apr 2015 11:05:15 -0400 Subject: [PATCH 10/12] Added pending phase for volumes. added defaults for PV/PVC. refactored to better phase transitioning in control loops --- pkg/api/testing/fuzzer.go | 8 + pkg/api/types.go | 2 + pkg/api/v1beta1/defaults.go | 10 + pkg/api/v1beta1/defaults_test.go | 20 ++ pkg/api/v1beta1/types.go | 2 + pkg/api/v1beta2/defaults.go | 10 + pkg/api/v1beta2/defaults_test.go | 20 ++ pkg/api/v1beta2/types.go | 2 + pkg/api/v1beta3/defaults.go | 10 + pkg/api/v1beta3/defaults_test.go | 20 ++ pkg/api/v1beta3/types.go | 2 + .../persistentvolume/etcd/etcd_test.go | 3 + .../persistentvolumeclaim/etcd/etcd_test.go | 3 + .../persistent_volume_claim_binder.go | 296 +++++++++++------- .../persistent_volume_claim_binder_test.go | 65 +--- test/integration/persistent_volumes_test.go | 23 +- 16 files changed, 318 insertions(+), 178 deletions(-) diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index 774bda50e8e..e62d790a2a6 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -201,6 +201,14 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { c.FuzzNoCustom(s) // fuzz self without calling this function again s.Type = api.SecretTypeOpaque }, + func(pv *api.PersistentVolume, c fuzz.Continue) { + c.FuzzNoCustom(pv) // fuzz self without calling this function again + pv.Status.Phase = api.VolumePending + }, + func(pvc *api.PersistentVolumeClaim, c fuzz.Continue) { + c.FuzzNoCustom(pvc) // fuzz self without calling this function again + pvc.Status.Phase = api.ClaimPending + }, func(s *api.NamespaceSpec, c fuzz.Continue) { s.Finalizers = []api.FinalizerName{api.FinalizerKubernetes} }, diff --git a/pkg/api/types.go b/pkg/api/types.go index ccbd8d2957d..f80f7fb3321 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -317,6 +317,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index 4d8a9b9cdfe..d56ba77090d 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -111,6 +111,16 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *PersistentVolume) { + if obj.Status.Phase == "" { + obj.Status.Phase = VolumePending + } + }, + func(obj *PersistentVolumeClaim) { + if obj.Status.Phase == "" { + obj.Status.Phase = ClaimPending + } + }, func(obj *Endpoints) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index 3f304d1a512..5822d69bc05 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -164,6 +164,26 @@ func TestSetDefaultSecret(t *testing.T) { } } +func TestSetDefaultPersistentVolume(t *testing.T) { + pv := ¤t.PersistentVolume{} + obj2 := roundTrip(t, runtime.Object(pv)) + pv2 := obj2.(*current.PersistentVolume) + + if pv2.Status.Phase != current.VolumePending { + t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase) + } +} + +func TestSetDefaultPersistentVolumeClaim(t *testing.T) { + pvc := ¤t.PersistentVolumeClaim{} + obj2 := roundTrip(t, runtime.Object(pvc)) + pvc2 := obj2.(*current.PersistentVolumeClaim) + + if pvc2.Status.Phase != current.ClaimPending { + t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase) + } +} + // Test that we use "legacy" fields if "modern" fields are not provided. func TestSetDefaulEndpointsLegacy(t *testing.T) { in := ¤t.Endpoints{ diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index bf977cbac70..3480b530cdf 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -229,6 +229,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index 27516e4b61a..f8830ed46e5 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -112,6 +112,16 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *PersistentVolume) { + if obj.Status.Phase == "" { + obj.Status.Phase = VolumePending + } + }, + func(obj *PersistentVolumeClaim) { + if obj.Status.Phase == "" { + obj.Status.Phase = ClaimPending + } + }, func(obj *Endpoints) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index 112a362c9e2..0c9b7173da5 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -154,6 +154,26 @@ func TestSetDefaultService(t *testing.T) { } } +func TestSetDefaultPersistentVolume(t *testing.T) { + pv := ¤t.PersistentVolume{} + obj2 := roundTrip(t, runtime.Object(pv)) + pv2 := obj2.(*current.PersistentVolume) + + if pv2.Status.Phase != current.VolumePending { + t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase) + } +} + +func TestSetDefaultPersistentVolumeClaim(t *testing.T) { + pvc := ¤t.PersistentVolumeClaim{} + obj2 := roundTrip(t, runtime.Object(pvc)) + pvc2 := obj2.(*current.PersistentVolumeClaim) + + if pvc2.Status.Phase != current.ClaimPending { + t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase) + } +} + func TestSetDefaultSecret(t *testing.T) { s := ¤t.Secret{} obj2 := roundTrip(t, runtime.Object(s)) diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 8b86646ec3c..59f2e527326 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -198,6 +198,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index a5a21b61a21..9a3e859b11f 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -102,6 +102,16 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *PersistentVolume) { + if obj.Status.Phase == "" { + obj.Status.Phase = VolumePending + } + }, + func(obj *PersistentVolumeClaim) { + if obj.Status.Phase == "" { + obj.Status.Phase = ClaimPending + } + }, func(obj *Endpoints) { for i := range obj.Subsets { ss := &obj.Subsets[i] diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index 1d5e07b89fc..bbd7d3ba818 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -174,6 +174,26 @@ func TestSetDefaultSecret(t *testing.T) { } } +func TestSetDefaultPersistentVolume(t *testing.T) { + pv := ¤t.PersistentVolume{} + obj2 := roundTrip(t, runtime.Object(pv)) + pv2 := obj2.(*current.PersistentVolume) + + if pv2.Status.Phase != current.VolumePending { + t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase) + } +} + +func TestSetDefaultPersistentVolumeClaim(t *testing.T) { + pvc := ¤t.PersistentVolumeClaim{} + obj2 := roundTrip(t, runtime.Object(pvc)) + pvc2 := obj2.(*current.PersistentVolumeClaim) + + if pvc2.Status.Phase != current.ClaimPending { + t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase) + } +} + func TestSetDefaulEndpointsProtocol(t *testing.T) { in := ¤t.Endpoints{Subsets: []current.EndpointSubset{ {Ports: []current.EndpointPort{{}, {Protocol: "UDP"}, {}}}, diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 4c2a62a61c9..a252437c580 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -334,6 +334,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/registry/persistentvolume/etcd/etcd_test.go b/pkg/registry/persistentvolume/etcd/etcd_test.go index 8a839222058..5ad2ebf6d69 100644 --- a/pkg/registry/persistentvolume/etcd/etcd_test.go +++ b/pkg/registry/persistentvolume/etcd/etcd_test.go @@ -58,6 +58,9 @@ func validNewPersistentVolume(name string) *api.PersistentVolume { HostPath: &api.HostPathVolumeSource{Path: "/foo"}, }, }, + Status: api.PersistentVolumeStatus{ + Phase: api.VolumePending, + }, } return pv } diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go index e3c44ce4ee3..4307724d028 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go @@ -59,6 +59,9 @@ func validNewPersistentVolumeClaim(name, ns string) *api.PersistentVolumeClaim { }, }, }, + Status: api.PersistentVolumeClaimStatus{ + Phase: api.ClaimPending, + }, } return pv } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index 90576f74913..bc097f260f5 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -18,10 +18,11 @@ package volumeclaimbinder import ( "fmt" - "reflect" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" @@ -40,12 +41,17 @@ type PersistentVolumeClaimBinder struct { claimController *framework.Controller client binderClient stopChannels map[string]chan struct{} + lock sync.RWMutex } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { volumeIndex := NewPersistentVolumeOrderedIndex() binderClient := NewBinderClient(kubeClient) + binder := &PersistentVolumeClaimBinder{ + volumeIndex: volumeIndex, + client: binderClient, + } _, volumeController := framework.NewInformer( &cache.ListWatch{ @@ -59,23 +65,9 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time &api.PersistentVolume{}, syncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - volume := obj.(*api.PersistentVolume) - volumeIndex.Indexer.Add(volume) - syncVolume(binderClient, volume) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldVolume := oldObj.(*api.PersistentVolume) - newVolume := newObj.(*api.PersistentVolume) - volumeIndex.Indexer.Update(newVolume) - if updateRequired(oldVolume, newVolume) { - syncVolume(binderClient, newVolume) - } - }, - DeleteFunc: func(obj interface{}) { - volume := obj.(*api.PersistentVolume) - volumeIndex.Indexer.Delete(volume) - }, + AddFunc: binder.addVolume, + UpdateFunc: binder.updateVolume, + DeleteFunc: binder.deleteVolume, }, ) _, claimController := framework.NewInformer( @@ -90,75 +82,186 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time &api.PersistentVolumeClaim{}, syncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - claim := obj.(*api.PersistentVolumeClaim) - syncClaim(volumeIndex, binderClient, claim) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - newClaim := newObj.(*api.PersistentVolumeClaim) - if newClaim.Status.VolumeRef == nil { - syncClaim(volumeIndex, binderClient, newClaim) - } - }, + AddFunc: binder.addClaim, + UpdateFunc: binder.updateClaim, }, ) - binder := &PersistentVolumeClaimBinder{ - volumeController: volumeController, - claimController: claimController, - volumeIndex: volumeIndex, - client: binderClient, - } + binder.claimController = claimController + binder.volumeController = volumeController return binder } -func updateRequired(oldVolume, newVolume *api.PersistentVolume) bool { - // Spec changes affect indexing and sorting volumes - if !reflect.DeepEqual(oldVolume.Spec, newVolume.Spec) { - return true - } - if !reflect.DeepEqual(oldVolume.Status, newVolume.Status) { - return true - } - return false +func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + volume := obj.(*api.PersistentVolume) + syncVolume(binder.volumeIndex, binder.client, volume) } -func syncVolume(binderClient binderClient, volume *api.PersistentVolume) (err error) { +func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + newVolume := newObj.(*api.PersistentVolume) + binder.volumeIndex.Update(newVolume) + syncVolume(binder.volumeIndex, binder.client, newVolume) +} + +func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + volume := obj.(*api.PersistentVolume) + binder.volumeIndex.Delete(volume) +} + +func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + claim := obj.(*api.PersistentVolumeClaim) + syncClaim(binder.volumeIndex, binder.client, claim) +} + +func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + newClaim := newObj.(*api.PersistentVolumeClaim) + syncClaim(binder.volumeIndex, binder.client, newClaim) +} + +func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) { glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) - if volume.Spec.ClaimRef != nil { - if volume.Status.Phase == api.VolumeAvailable { - volume.Status.Phase = api.VolumeBound - _, err := binderClient.UpdatePersistentVolumeStatus(volume) - if err != nil { - return fmt.Errorf("Error updating pv.status: %v\n", err) + // volumes can be in one of the following states: + // + // VolumePending -- default value -- not bound to a claim and not yet processed through this controller. + // VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex. + // VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct. + // VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user. + currentPhase := volume.Status.Phase + nextPhase := currentPhase + + switch currentPhase { + // pending volumes are available only after indexing in order to be matched to claims. + case api.VolumePending: + _, exists, err := volumeIndex.Get(volume) + if err != nil { + return err + } + if !exists { + volumeIndex.Add(volume) + } + glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name) + nextPhase = api.VolumeAvailable + + // available volumes await a claim + case api.VolumeAvailable: + if volume.Spec.ClaimRef != nil { + _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) + if err == nil { + // change of phase will trigger an update event with the newly bound volume + glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name) + nextPhase = api.VolumeBound + } else { + if errors.IsNotFound(err) { + nextPhase = api.VolumeReleased + } } } + //bound volumes require verification of their bound claims + case api.VolumeBound: + if volume.Spec.ClaimRef == nil { + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) + } else { + claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) + if err == nil { + // bound and active. Build claim status as needed. + if claim.Status.VolumeRef == nil { + syncClaimStatus(binderClient, volume, claim) + } + } else { + if errors.IsNotFound(err) { + nextPhase = api.VolumeReleased + } else { + return err + } + } + } + // released volumes require recycling + case api.VolumeReleased: + if volume.Spec.ClaimRef == nil { + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) + } else { + // TODO: implement Recycle method on plugins + } + } + + if currentPhase != nextPhase { + volume.Status.Phase = nextPhase + volume, err := binderClient.UpdatePersistentVolumeStatus(volume) + if err != nil { + // Rollback to previous phase + volume.Status.Phase = currentPhase + } + volumeIndex.Update(volume) + } + + return nil +} + +func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { + glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) + + // claims can be in one of the following states: + // + // ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist. + // ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil + currentPhase := claim.Status.Phase + nextPhase := currentPhase + + switch currentPhase { + + // pending claims await a matching volume + case api.ClaimPending: + volume, err := volumeIndex.FindBestMatchForClaim(claim) + if err != nil { + return err + } + if volume == nil { + return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name) + } - // verify the volume is still claimed by a user - if claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { - glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) - // rebuild the Claim's Status as needed - if claim.Status.VolumeRef == nil { - syncClaimStatus(binderClient, volume, claim) - } - } else { - //claim was deleted by user. - glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) - // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume - volume.Status.Phase = api.VolumeReleased - volume, err = binderClient.UpdatePersistentVolumeStatus(volume) - if err != nil { - return fmt.Errorf("Error updating pv: %+v\n", err) - } - } - } else { - volume.Status.Phase = api.VolumeAvailable - _, err := binderClient.UpdatePersistentVolumeStatus(volume) + claimRef, err := api.GetReference(claim) if err != nil { - return fmt.Errorf("Error updating pv.status: %v\n", err) + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) } + + // make a binding reference to the claim. + // triggers update of the volume in this controller, which builds claim status + volume.Spec.ClaimRef = claimRef + volume, err = binderClient.UpdatePersistentVolume(volume) + if err == nil { + nextPhase = api.ClaimBound + } + if err != nil { + // Rollback by unsetting the ClaimRef on the volume pointer. + // the volume in the index will be unbound again and ready to be matched. + volume.Spec.ClaimRef = nil + // Rollback by restoring original phase to claim pointer + nextPhase = api.ClaimPending + return fmt.Errorf("Error updating volume: %+v\n", err) + } + + // bound claims requires no maintenance. Deletion by the user is the last lifecycle phase. + case api.ClaimBound: + // This is the end of a claim's lifecycle. + // After claim deletion, a volume is recycled when it verifies its claim is unbound + glog.V(5).Infof("PersistentVolumeClaime[%s] is bound\n", claim.Name) + } + + if currentPhase != nextPhase { + claim.Status.Phase = nextPhase + binderClient.UpdatePersistentVolumeClaimStatus(claim) } return nil } @@ -168,6 +271,7 @@ func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, cl if err != nil { return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) } + // all "actuals" are transferred from PV to PVC so the user knows what // type of volume they actually got for their claim claim.Status.Phase = api.ClaimBound @@ -176,63 +280,16 @@ func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, cl claim.Status.Capacity = volume.Spec.Capacity _, err = binderClient.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { claim.Status.Phase = api.ClaimPending claim.Status.VolumeRef = nil claim.Status.AccessModes = nil claim.Status.Capacity = nil } - return err } -func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { - glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) - - if claim.Status.VolumeRef != nil { - glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) - return nil - } - - volume, err := volumeIndex.FindBestMatchForClaim(claim) - if err != nil { - return err - } - - if volume != nil { - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - // make a binding reference to the claim - volume.Spec.ClaimRef = claimRef - volume, err = binderClient.UpdatePersistentVolume(volume) - - if err != nil { - // volume no longer bound - volume.Spec.ClaimRef = nil - return fmt.Errorf("Error updating volume: %+v\n", err) - } else { - err = syncClaimStatus(binderClient, volume, claim) - if err != nil { - return fmt.Errorf("Error update claim.status: %+v\n", err) - } - } - } else { - glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) - if claim.Status.Phase != api.ClaimPending { - claim.Status.Phase = api.ClaimPending - _, err := binderClient.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { - return fmt.Errorf("Error updating pvclaim.status: %v\n", err) - } - } - } - return nil -} - +// Run starts all of this binder's control loops func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") if controller.stopChannels == nil { @@ -250,6 +307,7 @@ func (controller *PersistentVolumeClaimBinder) Run() { } } +// Stop gracefully shuts down this binder func (controller *PersistentVolumeClaimBinder) Stop() { glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n") for name, stopChan := range controller.stopChannels { diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index d1999a4a5dd..985ec257db0 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -17,12 +17,12 @@ limitations under the License. package volumeclaimbinder import ( - "fmt" "reflect" "testing" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" @@ -51,7 +51,6 @@ func TestRunStop(t *testing.T) { } func TestExampleObjects(t *testing.T) { - scenarios := map[string]struct { expected interface{} }{ @@ -167,56 +166,7 @@ func TestExampleObjects(t *testing.T) { } } -func TestRequiresUpdate(t *testing.T) { - old := &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.AccessModeType{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/tmp/data02", - }, - }, - }, - } - - new := &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.AccessModeType{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/tmp/data02", - }, - }, - ClaimRef: &api.ObjectReference{Name: "foo"}, - }, - } - - if !updateRequired(old, new) { - t.Errorf("Update expected for the new volume with added ClaimRef") - } - - old.Spec.ClaimRef = new.Spec.ClaimRef - old.Status.Phase = api.VolumeBound - - if !updateRequired(old, new) { - t.Errorf("Update expected for the new volume with added Status") - } - - new.Status.Phase = old.Status.Phase - - if updateRequired(old, new) { - t.Errorf("No updated expected for identical objects") - } -} - func TestBindingWithExamples(t *testing.T) { - api.ForTesting_ReferencesAllowBlankSelfLinks = true o := testclient.NewObjects(api.Scheme) if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/claims/claim-01.yaml", o); err != nil { @@ -245,7 +195,7 @@ func TestBindingWithExamples(t *testing.T) { } volumeIndex.Add(pv) - syncVolume(mockClient, pv) + syncVolume(volumeIndex, mockClient, pv) if pv.Status.Phase != api.VolumeAvailable { t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) @@ -261,10 +211,13 @@ func TestBindingWithExamples(t *testing.T) { t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv) } - syncVolume(mockClient, pv) + // first sync verifies the new bound claim, advances state, triggering update + syncVolume(volumeIndex, mockClient, pv) + // second sync verifies claim, sees missing claim status and builds it + syncVolume(volumeIndex, mockClient, pv) if claim.Status.VolumeRef == nil { - t.Error("Expected claim to be bound to volume") + t.Fatalf("Expected claim to be bound to volume") } if pv.Status.Phase != api.VolumeBound { @@ -283,7 +236,7 @@ func TestBindingWithExamples(t *testing.T) { // pretend the user deleted their claim mockClient.claim = nil - syncVolume(mockClient, pv) + syncVolume(volumeIndex, mockClient, pv) if pv.Status.Phase != api.VolumeReleased { t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) @@ -311,7 +264,7 @@ func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*ap if c.claim != nil { return c.claim, nil } else { - return nil, fmt.Errorf("Claim does not exist") + return nil, errors.NewNotFound("persistentVolume", name) } } diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index 87be82de4e8..6801b5ad406 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -76,8 +76,25 @@ func TestPersistentVolumeClaimBinder(t *testing.T) { t.Errorf("expected 3 PVCs, got %#v", len(claims.Items)) } - // make sure the binder has caught up - time.Sleep(2 * time.Second) + // the binder will eventually catch up and set status on Claims + watch, err := client.PersistentVolumeClaims(api.NamespaceDefault).Watch(labels.Everything(), fields.Everything(), "0") + if err != nil { + t.Fatalf("Couldn't subscribe to PersistentVolumeClaims: %v", err) + } + defer watch.Stop() + + boundCount := 0 + expectedBoundCount := 2 + for { + event := <-watch.ResultChan() + claim := event.Object.(*api.PersistentVolumeClaim) + if claim.Status.VolumeRef != nil { + boundCount++ + } + if boundCount == expectedBoundCount { + break + } + } for _, claim := range createTestClaims() { claim, err := client.PersistentVolumeClaims(api.NamespaceDefault).Get(claim.Name) @@ -86,7 +103,7 @@ func TestPersistentVolumeClaimBinder(t *testing.T) { } if (claim.Name == "claim01" || claim.Name == "claim02") && claim.Status.VolumeRef == nil { - t.Errorf("Expected claim to be bound: %v", claim) + t.Errorf("Expected claim to be bound: %+v", claim) } if claim.Name == "claim03" && claim.Status.VolumeRef != nil { t.Errorf("Expected claim03 to be unbound: %v", claim) From 2fe8ed53583ea3280e7f24c60336414c5f534553 Mon Sep 17 00:00:00 2001 From: markturansky Date: Sat, 25 Apr 2015 04:35:27 -0400 Subject: [PATCH 11/12] fixed prefix name in validation --- pkg/api/validation/validation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index df82dc97a2f..1d85ac9176d 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -321,7 +321,7 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList { } if source.PersistentVolumeClaimVolumeSource != nil { numVolumes++ - allErrs = append(allErrs, validatePersistentClaimVolumeSource(source.PersistentVolumeClaimVolumeSource).Prefix("glusterfs")...) + allErrs = append(allErrs, validatePersistentClaimVolumeSource(source.PersistentVolumeClaimVolumeSource).Prefix("persistentVolumeClaim")...) } if numVolumes != 1 { From beacd8722af7d1c7e6a72360fdbee00b75e03d4b Mon Sep 17 00:00:00 2001 From: markturansky Date: Mon, 27 Apr 2015 14:57:07 -0400 Subject: [PATCH 12/12] addressed feedback. added opt-in cmd line flag --- cmd/kube-controller-manager/app/controllermanager.go | 8 ++++++-- pkg/api/testing/fuzzer.go | 6 ++++-- pkg/volumeclaimbinder/persistent_volume_claim_binder.go | 8 +++++++- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 56f11a47b97..bb985ea14b2 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -60,6 +60,7 @@ type CMServer struct { ResourceQuotaSyncPeriod time.Duration NamespaceSyncPeriod time.Duration PVClaimBinderSyncPeriod time.Duration + EnablePVCClaimBinder bool RegisterRetryCount int MachineList util.StringList SyncNodeList bool @@ -117,6 +118,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder_sync_period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims") + fs.BoolVar(&s.EnablePVCClaimBinder, "enable_alpha_pvclaimbinder", s.EnablePVCClaimBinder, "Optionally enable persistent volume claim binding. This feature is experimental and expected to change.") fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.") fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.") fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") @@ -235,8 +237,10 @@ func (s *CMServer) Run(_ []string) error { namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod) namespaceManager.Run() - pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) - pvclaimBinder.Run() + if s.EnablePVCClaimBinder { + pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) + pvclaimBinder.Run() + } select {} return nil diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index e62d790a2a6..340656b5348 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -203,11 +203,13 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { }, func(pv *api.PersistentVolume, c fuzz.Continue) { c.FuzzNoCustom(pv) // fuzz self without calling this function again - pv.Status.Phase = api.VolumePending + types := []api.PersistentVolumePhase{api.VolumePending, api.VolumeBound, api.VolumeReleased, api.VolumeAvailable} + pv.Status.Phase = types[c.Rand.Intn(len(types))] }, func(pvc *api.PersistentVolumeClaim, c fuzz.Continue) { c.FuzzNoCustom(pvc) // fuzz self without calling this function again - pvc.Status.Phase = api.ClaimPending + types := []api.PersistentVolumeClaimPhase{api.ClaimBound, api.ClaimPending} + pvc.Status.Phase = types[c.Rand.Intn(len(types))] }, func(s *api.NamespaceSpec, c fuzz.Continue) { s.Finalizers = []api.FinalizerName{api.FinalizerKubernetes} diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index bc097f260f5..8cef18fc45c 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -84,6 +84,8 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time framework.ResourceEventHandlerFuncs{ AddFunc: binder.addClaim, UpdateFunc: binder.updateClaim, + // no DeleteFunc needed. a claim requires no clean-up. + // the missing claim itself is the release of the resource. }, ) @@ -177,6 +179,8 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl if err == nil { // bound and active. Build claim status as needed. if claim.Status.VolumeRef == nil { + // syncClaimStatus sets VolumeRef, attempts to persist claim status, + // and does a rollback as needed on claim.Status syncClaimStatus(binderClient, volume, claim) } } else { @@ -198,6 +202,9 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl if currentPhase != nextPhase { volume.Status.Phase = nextPhase + + // a change in state will trigger another update through this controller. + // each pass through this controller evaluates current phase and decides whether or not to change to the next phase volume, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { // Rollback to previous phase @@ -220,7 +227,6 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli nextPhase := currentPhase switch currentPhase { - // pending claims await a matching volume case api.ClaimPending: volume, err := volumeIndex.FindBestMatchForClaim(claim)