diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index 48fff95a992..6d115073ab9 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" @@ -159,44 +160,53 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl currentPhase := volume.Status.Phase nextPhase := currentPhase + _, exists, err := volumeIndex.Get(volume) + if err != nil { + return err + } + if !exists { + volumeIndex.Add(volume) + } + switch currentPhase { - // pending volumes are available only after indexing in order to be matched to claims. case api.VolumePending: + + // 3 possible states: + // 1. ClaimRef != nil and Claim exists: Prebound to claim. Make volume available for binding (it will match PVC). + // 2. ClaimRef != nil and Claim !exists: Recently recycled. Remove bind. Make volume available for new claim. + // 3. ClaimRef == nil: Neither recycled nor prebound. Make volume available for binding. + nextPhase = api.VolumeAvailable + if volume.Spec.ClaimRef != nil { - // Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending - // to start the volume again at the beginning of this lifecycle. - // ClaimRef is the last bind between persistent volume and claim. - // The claim has already been deleted by the user at this point - oldClaimRef := volume.Spec.ClaimRef - volume.Spec.ClaimRef = nil - _, err = binderClient.UpdatePersistentVolume(volume) - if err != nil { - // rollback on error, keep the ClaimRef until we can successfully update the volume - volume.Spec.ClaimRef = oldClaimRef - return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err) + _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) + if errors.IsNotFound(err) { + // Pending volumes that have a ClaimRef where the claim is missing were recently recycled. + // The Recycler set the phase to VolumePending to start the volume at the beginning of this lifecycle. + // removing ClaimRef unbinds the volume + clone, err := conversion.NewCloner().DeepCopy(volume) + if err != nil { + return fmt.Errorf("Error cloning pv: %v", err) + } + volumeClone, ok := clone.(*api.PersistentVolume) + if !ok { + return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) + } + volumeClone.Spec.ClaimRef = nil + + if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil { + return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err) + } else { + volume = updatedVolume + volumeIndex.Update(volume) + } + } else if err != nil { + return fmt.Errorf("Error getting PersistentVolumeClaim[%s/%s]: %v", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, err) } } - - _, exists, err := volumeIndex.Get(volume) - if err != nil { - return err - } - if !exists { - volumeIndex.Add(volume) - } - glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name) - nextPhase = api.VolumeAvailable + glog.V(5).Infof("PersistentVolume[%s] is available\n", volume.Name) // available volumes await a claim case api.VolumeAvailable: - // TODO: remove api.VolumePending phase altogether - _, exists, err := volumeIndex.Get(volume) - if err != nil { - return err - } - if !exists { - volumeIndex.Add(volume) - } if volume.Spec.ClaimRef != nil { _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) if err == nil { @@ -265,79 +275,71 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) - // claims can be in one of the following states: - // - // ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist. - // ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil - currentPhase := claim.Status.Phase - nextPhase := currentPhase - - switch currentPhase { - // pending claims await a matching volume + switch claim.Status.Phase { case api.ClaimPending: - volume, err := volumeIndex.FindBestMatchForClaim(claim) + volume, err := volumeIndex.findBestMatchForClaim(claim) if err != nil { return err } if volume == nil { - return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name) + glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name) + return nil } - // make a binding reference to the claim. - // triggers update of the claim in this controller, which builds claim status - claim.Spec.VolumeName = volume.Name - // TODO: make this similar to Pod's binding both with BindingREST subresource and GuaranteedUpdate helper in etcd.go - claim, err = binderClient.UpdatePersistentVolumeClaim(claim) - if err == nil { - nextPhase = api.ClaimBound - glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name) + // create a reference to the claim and assign it to the volume being bound. + // the volume is a pointer and assigning the reference fixes a race condition where another + // claim might match this volume but before the claimRef is persistent in the next case statement + claimRef, err := api.GetReference(claim) + if err != nil { + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) + } + + // make a binding reference to the claim and ensure to update the local index to prevent dupe bindings + clone, err := conversion.NewCloner().DeepCopy(volume) + if err != nil { + return fmt.Errorf("Error cloning pv: %v", err) + } + volumeClone, ok := clone.(*api.PersistentVolume) + if !ok { + return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone) + } + volumeClone.Spec.ClaimRef = claimRef + if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil { + return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err) } else { - // Rollback by unsetting the ClaimRef on the volume pointer. - // the volume in the index will be unbound again and ready to be matched. - claim.Spec.VolumeName = "" - // Rollback by restoring original phase to claim pointer - nextPhase = api.ClaimPending - return fmt.Errorf("Error updating volume: %+v\n", err) + volume = updatedVolume + volumeIndex.Update(updatedVolume) + } + + // the bind is persisted on the volume above and will always match the claim in a search. + // claim would remain Pending if the update fails, so processing this state is idempotent. + // this only needs to be processed once. + if claim.Spec.VolumeName != volume.Name { + claim.Spec.VolumeName = volume.Name + claim, err = binderClient.UpdatePersistentVolumeClaim(claim) + if err != nil { + return fmt.Errorf("Error updating claim with VolumeName %s: %+v\n", volume.Name, err) + } + } + + claim.Status.Phase = api.ClaimBound + claim.Status.AccessModes = volume.Spec.AccessModes + claim.Status.Capacity = volume.Spec.Capacity + _, err = binderClient.UpdatePersistentVolumeClaimStatus(claim) + if err != nil { + return fmt.Errorf("Unexpected error saving claim status: %+v", err) } case api.ClaimBound: - volume, err := binderClient.GetPersistentVolume(claim.Spec.VolumeName) - if err != nil { - return fmt.Errorf("Unexpected error getting persistent volume: %v\n", err) - } + // no-op. Claim is bound, values from PV are set. PVCs are technically mutable in the API server + // and we don't want to handle those changes at this time. - if volume.Spec.ClaimRef == nil { - glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n") - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - volume.Spec.ClaimRef = claimRef - _, err = binderClient.UpdatePersistentVolume(volume) - if err != nil { - return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err) - } - } + default: + return fmt.Errorf("Unknown state for PVC: %#v", claim) - // all "actuals" are transferred from PV to PVC so the user knows what - // type of volume they actually got for their claim. - // Volumes cannot have zero AccessModes, so checking that a claim has access modes - // is sufficient to tell us if these values have already been set. - if len(claim.Status.AccessModes) == 0 { - claim.Status.Phase = api.ClaimBound - claim.Status.AccessModes = volume.Spec.AccessModes - claim.Status.Capacity = volume.Spec.Capacity - _, err := binderClient.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { - return fmt.Errorf("Unexpected error saving claim status: %+v", err) - } - } } - if currentPhase != nextPhase { - claim.Status.Phase = nextPhase - binderClient.UpdatePersistentVolumeClaimStatus(claim) - } + glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name) return nil } diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go index 9b613c45c47..87612bc0437 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller_test.go @@ -51,6 +51,97 @@ func TestRunStop(t *testing.T) { } } +func TestClaimRace(t *testing.T) { + c1 := &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "c1", + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), + }, + }, + }, + Status: api.PersistentVolumeClaimStatus{ + Phase: api.ClaimPending, + }, + } + c1.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") + + c2 := &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + Name: "c2", + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"), + }, + }, + }, + Status: api.PersistentVolumeClaimStatus{ + Phase: api.ClaimPending, + }, + } + c2.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") + + v := &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + }, + Spec: api.PersistentVolumeSpec{ + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + HostPath: &api.HostPathVolumeSource{ + Path: "/tmp/data01", + }, + }, + }, + Status: api.PersistentVolumeStatus{ + Phase: api.VolumePending, + }, + } + + volumeIndex := NewPersistentVolumeOrderedIndex() + mockClient := &mockBinderClient{} + + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + // adds the volume to the index, making the volume available + syncVolume(volumeIndex, mockClient, v) + if mockClient.volume.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) + } + if _, exists, _ := volumeIndex.Get(v); !exists { + t.Errorf("Expected to find volume in index but it did not exist") + } + + // an initial sync for a claim matches the volume + err := syncClaim(volumeIndex, mockClient, c1) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if c1.Status.Phase != api.ClaimBound { + t.Errorf("Expected phase %s but got %s", api.ClaimBound, c1.Status.Phase) + } + + // before the volume gets updated w/ claimRef, a 2nd claim can attempt to bind and find the same volume + err = syncClaim(volumeIndex, mockClient, c2) + if err != nil { + t.Errorf("unexpected error for unmatched claim: %v", err) + } + if c2.Status.Phase != api.ClaimPending { + t.Errorf("Expected phase %s but got %s", api.ClaimPending, c2.Status.Phase) + } +} + func TestExampleObjects(t *testing.T) { scenarios := map[string]struct { expected interface{} @@ -188,6 +279,11 @@ func TestBindingWithExamples(t *testing.T) { } pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "") + // the default value of the PV is Pending. if processed at least once, its status in etcd is Available. + // There was a bug where only Pending volumes were being indexed and made ready for claims. + // Test that !Pending gets correctly added + pv.Status.Phase = api.VolumeAvailable + claim, error := client.PersistentVolumeClaims("ns").Get("any") if error != nil { t.Errorf("Unexpected error getting PVC from client: %v", err) @@ -211,145 +307,65 @@ func TestBindingWithExamples(t *testing.T) { // adds the volume to the index, making the volume available syncVolume(volumeIndex, mockClient, pv) - if pv.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + if mockClient.volume.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) } - // an initial sync for a claim will bind it to an unbound volume, triggers state change + // an initial sync for a claim will bind it to an unbound volume syncClaim(volumeIndex, mockClient, claim) - // state change causes another syncClaim to update statuses - syncClaim(volumeIndex, mockClient, claim) - // claim updated volume's status, causing an update and syncVolume call - syncVolume(volumeIndex, mockClient, pv) - if pv.Spec.ClaimRef == nil { - t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef: %+v\n", pv) + // bind expected on pv.Spec but status update hasn't happened yet + if mockClient.volume.Spec.ClaimRef == nil { + t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef\n") } - - if pv.Status.Phase != api.VolumeBound { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) + if mockClient.volume.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) } - - if claim.Status.Phase != api.ClaimBound { + if mockClient.claim.Spec.VolumeName != pv.Name { + t.Errorf("Expected claim.Spec.VolumeName %s but got %s", mockClient.claim.Spec.VolumeName, pv.Name) + } + if mockClient.claim.Status.Phase != api.ClaimBound { t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) } - if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) - } - if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] { - t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0]) + + // state changes in pvc triggers sync that sets pv attributes to pvc.Status + syncClaim(volumeIndex, mockClient, claim) + if len(mockClient.claim.Status.AccessModes) == 0 { + t.Errorf("Expected %d access modes but got 0", len(pv.Spec.AccessModes)) } - // pretend the user deleted their claim + // persisting the bind to pv.Spec.ClaimRef triggers a sync + syncVolume(volumeIndex, mockClient, mockClient.volume) + if mockClient.volume.Status.Phase != api.VolumeBound { + t.Errorf("Expected phase %s but got %s", api.VolumeBound, mockClient.volume.Status.Phase) + } + + // pretend the user deleted their claim. periodic resync picks it up. mockClient.claim = nil - syncVolume(volumeIndex, mockClient, pv) + syncVolume(volumeIndex, mockClient, mockClient.volume) - if pv.Status.Phase != api.VolumeReleased { - t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) + if mockClient.volume.Status.Phase != api.VolumeReleased { + t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase) } - if pv.Spec.ClaimRef == nil { - t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec) - } - - mockClient.volume = pv // released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing - err = recycler.reclaimVolume(pv) + err = recycler.reclaimVolume(mockClient.volume) if err != nil { t.Errorf("Unexpected error reclaiming volume: %+v", err) } - if pv.Status.Phase != api.VolumePending { - t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase) + if mockClient.volume.Status.Phase != api.VolumePending { + t.Errorf("Expected phase %s but got %s", api.VolumePending, mockClient.volume.Status.Phase) } // after the recycling changes the phase to Pending, the binder picks up again // to remove any vestiges of binding and make the volume Available again - syncVolume(volumeIndex, mockClient, pv) + syncVolume(volumeIndex, mockClient, mockClient.volume) - if pv.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase) + if mockClient.volume.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase) } - if pv.Spec.ClaimRef != nil { - t.Errorf("Expected nil ClaimRef: %+v", pv.Spec) - } -} - -func TestMissingFromIndex(t *testing.T) { - o := testclient.NewObjects(api.Scheme, api.Scheme) - if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, api.Scheme); err != nil { - t.Fatal(err) - } - if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, api.Scheme); err != nil { - t.Fatal(err) - } - - client := &testclient.Fake{} - client.AddReactor("*", "*", testclient.ObjectReaction(o, api.RESTMapper)) - - pv, err := client.PersistentVolumes().Get("any") - if err != nil { - t.Errorf("Unexpected error getting PV from client: %v", err) - } - pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "") - - claim, error := client.PersistentVolumeClaims("ns").Get("any") - if error != nil { - t.Errorf("Unexpected error getting PVC from client: %v", err) - } - claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "") - - volumeIndex := NewPersistentVolumeOrderedIndex() - mockClient := &mockBinderClient{ - volume: pv, - claim: claim, - } - - // the default value of the PV is Pending. - // if has previously been processed by the binder, it's status in etcd would be Available. - // Only Pending volumes were being indexed and made ready for claims. - pv.Status.Phase = api.VolumeAvailable - - // adds the volume to the index, making the volume available - syncVolume(volumeIndex, mockClient, pv) - if pv.Status.Phase != api.VolumeAvailable { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) - } - - // an initial sync for a claim will bind it to an unbound volume, triggers state change - err = syncClaim(volumeIndex, mockClient, claim) - if err != nil { - t.Fatalf("Expected Clam to be bound, instead got an error: %+v\n", err) - } - - // state change causes another syncClaim to update statuses - syncClaim(volumeIndex, mockClient, claim) - // claim updated volume's status, causing an update and syncVolume call - syncVolume(volumeIndex, mockClient, pv) - - if pv.Spec.ClaimRef == nil { - t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef: %+v\n", pv) - } - - if pv.Status.Phase != api.VolumeBound { - t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) - } - - if claim.Status.Phase != api.ClaimBound { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) - } - if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) { - t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase) - } - if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] { - t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0]) - } - - // pretend the user deleted their claim - mockClient.claim = nil - syncVolume(volumeIndex, mockClient, pv) - - if pv.Status.Phase != api.VolumeReleased { - t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) + if mockClient.volume.Spec.ClaimRef != nil { + t.Errorf("Expected nil ClaimRef: %+v", mockClient.volume.Spec.ClaimRef) } } @@ -363,7 +379,8 @@ func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolu } func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return volume, nil + c.volume = volume + return c.volume, nil } func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { @@ -372,7 +389,8 @@ func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) } func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { - return volume, nil + c.volume = volume + return c.volume, nil } func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) { @@ -384,11 +402,13 @@ func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*ap } func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return claim, nil + c.claim = claim + return c.claim, nil } func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { - return claim, nil + c.claim = claim + return c.claim, nil } func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { diff --git a/pkg/controller/persistentvolume/persistentvolume_index_test.go b/pkg/controller/persistentvolume/persistentvolume_index_test.go index 154d47e3f42..7bb6c5387bd 100644 --- a/pkg/controller/persistentvolume/persistentvolume_index_test.go +++ b/pkg/controller/persistentvolume/persistentvolume_index_test.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/testapi" ) func TestMatchVolume(t *testing.T) { @@ -104,17 +105,17 @@ func TestMatchVolume(t *testing.T) { } for name, scenario := range scenarios { - volume, err := volList.FindBestMatchForClaim(scenario.claim) + volume, err := volList.findBestMatchForClaim(scenario.claim) if err != nil { t.Errorf("Unexpected error matching volume by claim: %v", err) } - if scenario.expectedMatch != "" && volume == nil { + if len(scenario.expectedMatch) != 0 && volume == nil { t.Errorf("Expected match but received nil volume for scenario: %s", name) } - if scenario.expectedMatch != "" && volume != nil && string(volume.UID) != scenario.expectedMatch { + if len(scenario.expectedMatch) != 0 && volume != nil && string(volume.UID) != scenario.expectedMatch { t.Errorf("Expected %s but got volume %s in scenario %s", scenario.expectedMatch, volume.UID, name) } - if scenario.expectedMatch == "" && volume != nil { + if len(scenario.expectedMatch) == 0 && volume != nil { t.Errorf("Unexpected match for scenario: %s", name) } } @@ -175,7 +176,7 @@ func TestMatchingWithBoundVolumes(t *testing.T) { }, } - volume, err := volumeIndex.FindBestMatchForClaim(claim) + volume, err := volumeIndex.findBestMatchForClaim(claim) if err != nil { t.Fatalf("Unexpected error matching volume by claim: %v", err) } @@ -296,27 +297,27 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) { index.Add(ebs) index.Add(nfs) - volume, _ := index.FindBestMatchForClaim(claim) + volume, _ := index.findBestMatchForClaim(claim) if volume.Name != ebs.Name { t.Errorf("Expected %s but got volume %s instead", ebs.Name, volume.Name) } claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany} - volume, _ = index.FindBestMatchForClaim(claim) + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != gce.Name { t.Errorf("Expected %s but got volume %s instead", gce.Name, volume.Name) } // order of the requested modes should not matter claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteMany, api.ReadWriteOnce, api.ReadOnlyMany} - volume, _ = index.FindBestMatchForClaim(claim) + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != nfs.Name { t.Errorf("Expected %s but got volume %s instead", nfs.Name, volume.Name) } // fewer modes requested should still match claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteMany} - volume, _ = index.FindBestMatchForClaim(claim) + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != nfs.Name { t.Errorf("Expected %s but got volume %s instead", nfs.Name, volume.Name) } @@ -324,7 +325,7 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) { // pretend the exact match is bound. should get the next level up of modes. ebs.Spec.ClaimRef = &api.ObjectReference{} claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteOnce} - volume, _ = index.FindBestMatchForClaim(claim) + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != gce.Name { t.Errorf("Expected %s but got volume %s instead", gce.Name, volume.Name) } @@ -332,7 +333,7 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) { // continue up the levels of modes. gce.Spec.ClaimRef = &api.ObjectReference{} claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteOnce} - volume, _ = index.FindBestMatchForClaim(claim) + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != nfs.Name { t.Errorf("Expected %s but got volume %s instead", nfs.Name, volume.Name) } @@ -340,7 +341,7 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) { // partial mode request gce.Spec.ClaimRef = nil claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadOnlyMany} - volume, _ = index.FindBestMatchForClaim(claim) + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != gce.Name { t.Errorf("Expected %s but got volume %s instead", gce.Name, volume.Name) } @@ -485,53 +486,40 @@ func createTestVolumes() []*api.PersistentVolume { } } +func testVolume(name, size string) *api.PersistentVolume { + return &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Annotations: map[string]string{}, + }, + Spec: api.PersistentVolumeSpec{ + Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse(size)}, + PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}}, + AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, + }, + } +} + func TestFindingPreboundVolumes(t *testing.T) { - pv1 := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Name: "pv1", - Annotations: map[string]string{}, - }, - Spec: api.PersistentVolumeSpec{ - Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi")}, - PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}}, - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - }, - } - - pv5 := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Name: "pv5", - Annotations: map[string]string{}, - }, - Spec: api.PersistentVolumeSpec{ - Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi")}, - PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}}, - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - }, - } - - pv8 := &api.PersistentVolume{ - ObjectMeta: api.ObjectMeta{ - Name: "pv8", - Annotations: map[string]string{}, - }, - Spec: api.PersistentVolumeSpec{ - Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi")}, - PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}}, - AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, - }, - } - claim := &api.PersistentVolumeClaim{ ObjectMeta: api.ObjectMeta{ Name: "claim01", Namespace: "myns", + SelfLink: testapi.Default.SelfLink("pvc", ""), }, Spec: api.PersistentVolumeClaimSpec{ AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, Resources: api.ResourceRequirements{Requests: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi")}}, }, } + claimRef, err := api.GetReference(claim) + if err != nil { + t.Errorf("error getting claimRef: %v", err) + } + + pv1 := testVolume("pv1", "1Gi") + pv5 := testVolume("pv5", "5Gi") + pv8 := testVolume("pv8", "8Gi") index := NewPersistentVolumeOrderedIndex() index.Add(pv1) @@ -539,22 +527,22 @@ func TestFindingPreboundVolumes(t *testing.T) { index.Add(pv8) // expected exact match on size - volume, _ := index.FindBestMatchForClaim(claim) + volume, _ := index.findBestMatchForClaim(claim) if volume.Name != pv1.Name { t.Errorf("Expected %s but got volume %s instead", pv1.Name, volume.Name) } // pretend the exact match is pre-bound. should get the next size up. - pv1.Annotations[createdForKey] = "some/other/claim" - volume, _ = index.FindBestMatchForClaim(claim) + pv1.Spec.ClaimRef = &api.ObjectReference{Name: "foo", Namespace: "bar"} + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != pv5.Name { t.Errorf("Expected %s but got volume %s instead", pv5.Name, volume.Name) } // pretend the exact match is available but the largest volume is pre-bound to the claim. - delete(pv1.Annotations, createdForKey) - pv8.Annotations[createdForKey] = "myns/claim01" - volume, _ = index.FindBestMatchForClaim(claim) + pv1.Spec.ClaimRef = nil + pv8.Spec.ClaimRef = claimRef + volume, _ = index.findBestMatchForClaim(claim) if volume.Name != pv8.Name { t.Errorf("Expected %s but got volume %s instead", pv8.Name, volume.Name) } diff --git a/pkg/controller/persistentvolume/types.go b/pkg/controller/persistentvolume/types.go index 6f38c36b86d..c022f97a3c5 100644 --- a/pkg/controller/persistentvolume/types.go +++ b/pkg/controller/persistentvolume/types.go @@ -79,8 +79,8 @@ func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.Persi // matchPredicate is a function that indicates that a persistent volume matches another type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool -// Find returns the nearest PV from the ordered list or nil if a match is not found -func (pvIndex *persistentVolumeOrderedIndex) Find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) { +// find returns the nearest PV from the ordered list or nil if a match is not found +func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) { // the 'searchPV' argument is a synthetic PV with capacity and accessmodes set according to the user's PersistentVolumeClaim. // the synthetic pv arg is, therefore, a request for a storage resource. // @@ -94,6 +94,16 @@ func (pvIndex *persistentVolumeOrderedIndex) Find(searchPV *api.PersistentVolume // potential matches (the GCEPD example above). allPossibleModes := pvIndex.allPossibleMatchingAccessModes(searchPV.Spec.AccessModes) + // the searchPV should contain an annotation that allows pre-binding to a claim. + // we can use the same annotation value (pvc's namespace/name) and check against + // existing volumes to find an exact match. It is possible that a bind is made (ClaimRef persisted to PV) + // but the fail to update claim.Spec.VolumeName fails. This check allows the claim to find the volume + // that's already bound to the claim. + preboundClaim := "" + if createdFor, ok := searchPV.Annotations[createdForKey]; ok { + preboundClaim = createdFor + } + for _, modes := range allPossibleModes { volumes, err := pvIndex.ListByAccessModes(modes) if err != nil { @@ -105,23 +115,17 @@ func (pvIndex *persistentVolumeOrderedIndex) Find(searchPV *api.PersistentVolume // return the exact pre-binding match, if found unboundVolumes := []*api.PersistentVolume{} for _, volume := range volumes { - // check for current binding - if volume.Spec.ClaimRef != nil { + if volume.Spec.ClaimRef == nil { + // volume isn't currently bound or pre-bound. + unboundVolumes = append(unboundVolumes, volume) continue } - // check for pre-bind where the volume is intended for one specific claim - if createdFor, ok := volume.Annotations[createdForKey]; ok { - if createdFor != searchPV.Annotations[createdForKey] { - // the volume is pre-bound and does not match the search criteria. - continue - } - // exact annotation match! No search required. + boundClaim := fmt.Sprintf("%s/%s", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) + if boundClaim == preboundClaim { + // exact match! No search required. return volume, nil } - - // volume isn't currently bound or pre-bound. - unboundVolumes = append(unboundVolumes, volume) } i := sort.Search(len(unboundVolumes), func(i int) bool { return matchPredicate(searchPV, unboundVolumes[i]) }) @@ -147,11 +151,11 @@ func (pvIndex *persistentVolumeOrderedIndex) findByAccessModesAndStorageCapacity }, }, } - return pvIndex.Find(pv, matchStorageCapacity) + return pvIndex.find(pv, matchStorageCapacity) } -// FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage -func (pvIndex *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) { +// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage +func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) { return pvIndex.findByAccessModesAndStorageCapacity(fmt.Sprintf("%s/%s", claim.Namespace, claim.Name), claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)]) } diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index e064505545f..a9f1912a033 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -19,6 +19,8 @@ limitations under the License. package integration import ( + "fmt" + "math/rand" "testing" "time" @@ -28,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" + "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/volume" @@ -48,11 +51,11 @@ func TestPersistentVolumeRecycler(t *testing.T) { recyclerClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) testClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) - binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 1*time.Second) + binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Minute) binder.Run() defer binder.Stop() - recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 1*time.Second, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}}) + recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Minute, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}}) recycler.Run() defer recycler.Stop() @@ -122,6 +125,50 @@ func TestPersistentVolumeRecycler(t *testing.T) { break } } + + // test the race between claims and volumes. ensure only a volume only binds to a single claim. + deleteAllEtcdKeys() + counter := 0 + maxClaims := 100 + claims := []*api.PersistentVolumeClaim{} + for counter <= maxClaims { + counter += 1 + clone, _ := conversion.NewCloner().DeepCopy(pvc) + newPvc, _ := clone.(*api.PersistentVolumeClaim) + newPvc.ObjectMeta = api.ObjectMeta{Name: fmt.Sprintf("fake-pvc-%d", counter)} + claim, err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Create(newPvc) + if err != nil { + t.Fatal("Error creating newPvc: %v", err) + } + claims = append(claims, claim) + } + + // putting a bind manually on a pv should only match the claim it is bound to + rand.Seed(time.Now().Unix()) + claim := claims[rand.Intn(maxClaims-1)] + claimRef, err := api.GetReference(claim) + if err != nil { + t.Fatalf("Unexpected error getting claimRef: %v", err) + } + pv.Spec.ClaimRef = claimRef + + pv, err = testClient.PersistentVolumes().Create(pv) + if err != nil { + t.Fatalf("Unexpected error creating pv: %v", err) + } + + waitForPersistentVolumePhase(w, api.VolumeBound) + + pv, err = testClient.PersistentVolumes().Get(pv.Name) + if err != nil { + t.Fatalf("Unexpected error getting pv: %v", err) + } + if pv.Spec.ClaimRef == nil { + t.Fatalf("Unexpected nil claimRef") + } + if pv.Spec.ClaimRef.Namespace != claimRef.Namespace || pv.Spec.ClaimRef.Name != claimRef.Name { + t.Fatalf("Bind mismatch! Expected %s/%s but got %s/%s", claimRef.Namespace, claimRef.Name, pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) + } } func waitForPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) {