Merge pull request #16432 from markturansky/recycler_race

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-11-25 11:54:06 -08:00
commit 7f2f7aa091
5 changed files with 341 additions and 280 deletions

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
@ -159,44 +160,53 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
currentPhase := volume.Status.Phase
nextPhase := currentPhase
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
switch currentPhase {
// pending volumes are available only after indexing in order to be matched to claims.
case api.VolumePending:
// 3 possible states:
// 1. ClaimRef != nil and Claim exists: Prebound to claim. Make volume available for binding (it will match PVC).
// 2. ClaimRef != nil and Claim !exists: Recently recycled. Remove bind. Make volume available for new claim.
// 3. ClaimRef == nil: Neither recycled nor prebound. Make volume available for binding.
nextPhase = api.VolumeAvailable
if volume.Spec.ClaimRef != nil {
// Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending
// to start the volume again at the beginning of this lifecycle.
// ClaimRef is the last bind between persistent volume and claim.
// The claim has already been deleted by the user at this point
oldClaimRef := volume.Spec.ClaimRef
volume.Spec.ClaimRef = nil
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
// rollback on error, keep the ClaimRef until we can successfully update the volume
volume.Spec.ClaimRef = oldClaimRef
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if errors.IsNotFound(err) {
// Pending volumes that have a ClaimRef where the claim is missing were recently recycled.
// The Recycler set the phase to VolumePending to start the volume at the beginning of this lifecycle.
// removing ClaimRef unbinds the volume
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = nil
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(volume)
}
} else if err != nil {
return fmt.Errorf("Error getting PersistentVolumeClaim[%s/%s]: %v", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, err)
}
}
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name)
nextPhase = api.VolumeAvailable
glog.V(5).Infof("PersistentVolume[%s] is available\n", volume.Name)
// available volumes await a claim
case api.VolumeAvailable:
// TODO: remove api.VolumePending phase altogether
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
if volume.Spec.ClaimRef != nil {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
@ -265,79 +275,71 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
// claims can be in one of the following states:
//
// ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist.
// ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil
currentPhase := claim.Status.Phase
nextPhase := currentPhase
switch currentPhase {
// pending claims await a matching volume
switch claim.Status.Phase {
case api.ClaimPending:
volume, err := volumeIndex.FindBestMatchForClaim(claim)
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name)
glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name)
return nil
}
// make a binding reference to the claim.
// triggers update of the claim in this controller, which builds claim status
claim.Spec.VolumeName = volume.Name
// TODO: make this similar to Pod's binding both with BindingREST subresource and GuaranteedUpdate helper in etcd.go
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err == nil {
nextPhase = api.ClaimBound
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
// create a reference to the claim and assign it to the volume being bound.
// the volume is a pointer and assigning the reference fixes a race condition where another
// claim might match this volume but before the claimRef is persistent in the next case statement
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// make a binding reference to the claim and ensure to update the local index to prevent dupe bindings
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = claimRef
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
} else {
// Rollback by unsetting the ClaimRef on the volume pointer.
// the volume in the index will be unbound again and ready to be matched.
claim.Spec.VolumeName = ""
// Rollback by restoring original phase to claim pointer
nextPhase = api.ClaimPending
return fmt.Errorf("Error updating volume: %+v\n", err)
volume = updatedVolume
volumeIndex.Update(updatedVolume)
}
// the bind is persisted on the volume above and will always match the claim in a search.
// claim would remain Pending if the update fails, so processing this state is idempotent.
// this only needs to be processed once.
if claim.Spec.VolumeName != volume.Name {
claim.Spec.VolumeName = volume.Name
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err != nil {
return fmt.Errorf("Error updating claim with VolumeName %s: %+v\n", volume.Name, err)
}
}
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
case api.ClaimBound:
volume, err := binderClient.GetPersistentVolume(claim.Spec.VolumeName)
if err != nil {
return fmt.Errorf("Unexpected error getting persistent volume: %v\n", err)
}
// no-op. Claim is bound, values from PV are set. PVCs are technically mutable in the API server
// and we don't want to handle those changes at this time.
if volume.Spec.ClaimRef == nil {
glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n")
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
volume.Spec.ClaimRef = claimRef
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
}
}
default:
return fmt.Errorf("Unknown state for PVC: %#v", claim)
// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim.
// Volumes cannot have zero AccessModes, so checking that a claim has access modes
// is sufficient to tell us if these values have already been set.
if len(claim.Status.AccessModes) == 0 {
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err := binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
}
}
if currentPhase != nextPhase {
claim.Status.Phase = nextPhase
binderClient.UpdatePersistentVolumeClaimStatus(claim)
}
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
return nil
}

View File

@ -51,6 +51,97 @@ func TestRunStop(t *testing.T) {
}
}
func TestClaimRace(t *testing.T) {
c1 := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "c1",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimPending,
},
}
c1.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
c2 := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "c2",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
Status: api.PersistentVolumeClaimStatus{
Phase: api.ClaimPending,
},
}
c2.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
v := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data01",
},
},
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumePending,
},
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, v)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if _, exists, _ := volumeIndex.Get(v); !exists {
t.Errorf("Expected to find volume in index but it did not exist")
}
// an initial sync for a claim matches the volume
err := syncClaim(volumeIndex, mockClient, c1)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if c1.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, c1.Status.Phase)
}
// before the volume gets updated w/ claimRef, a 2nd claim can attempt to bind and find the same volume
err = syncClaim(volumeIndex, mockClient, c2)
if err != nil {
t.Errorf("unexpected error for unmatched claim: %v", err)
}
if c2.Status.Phase != api.ClaimPending {
t.Errorf("Expected phase %s but got %s", api.ClaimPending, c2.Status.Phase)
}
}
func TestExampleObjects(t *testing.T) {
scenarios := map[string]struct {
expected interface{}
@ -188,6 +279,11 @@ func TestBindingWithExamples(t *testing.T) {
}
pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "")
// the default value of the PV is Pending. if processed at least once, its status in etcd is Available.
// There was a bug where only Pending volumes were being indexed and made ready for claims.
// Test that !Pending gets correctly added
pv.Status.Phase = api.VolumeAvailable
claim, error := client.PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
@ -211,145 +307,65 @@ func TestBindingWithExamples(t *testing.T) {
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
// an initial sync for a claim will bind it to an unbound volume, triggers state change
// an initial sync for a claim will bind it to an unbound volume
syncClaim(volumeIndex, mockClient, claim)
// state change causes another syncClaim to update statuses
syncClaim(volumeIndex, mockClient, claim)
// claim updated volume's status, causing an update and syncVolume call
syncVolume(volumeIndex, mockClient, pv)
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef: %+v\n", pv)
// bind expected on pv.Spec but status update hasn't happened yet
if mockClient.volume.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef\n")
}
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
if mockClient.claim.Spec.VolumeName != pv.Name {
t.Errorf("Expected claim.Spec.VolumeName %s but got %s", mockClient.claim.Spec.VolumeName, pv.Name)
}
if mockClient.claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0])
// state changes in pvc triggers sync that sets pv attributes to pvc.Status
syncClaim(volumeIndex, mockClient, claim)
if len(mockClient.claim.Status.AccessModes) == 0 {
t.Errorf("Expected %d access modes but got 0", len(pv.Spec.AccessModes))
}
// pretend the user deleted their claim
// persisting the bind to pv.Spec.ClaimRef triggers a sync
syncVolume(volumeIndex, mockClient, mockClient.volume)
if mockClient.volume.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, mockClient.volume.Status.Phase)
}
// pretend the user deleted their claim. periodic resync picks it up.
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, pv)
syncVolume(volumeIndex, mockClient, mockClient.volume)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
if mockClient.volume.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, mockClient.volume.Status.Phase)
}
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec)
}
mockClient.volume = pv
// released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing
err = recycler.reclaimVolume(pv)
err = recycler.reclaimVolume(mockClient.volume)
if err != nil {
t.Errorf("Unexpected error reclaiming volume: %+v", err)
}
if pv.Status.Phase != api.VolumePending {
t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase)
if mockClient.volume.Status.Phase != api.VolumePending {
t.Errorf("Expected phase %s but got %s", api.VolumePending, mockClient.volume.Status.Phase)
}
// after the recycling changes the phase to Pending, the binder picks up again
// to remove any vestiges of binding and make the volume Available again
syncVolume(volumeIndex, mockClient, pv)
syncVolume(volumeIndex, mockClient, mockClient.volume)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase)
if mockClient.volume.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
}
if pv.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef: %+v", pv.Spec)
}
}
func TestMissingFromIndex(t *testing.T) {
o := testclient.NewObjects(api.Scheme, api.Scheme)
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, api.Scheme); err != nil {
t.Fatal(err)
}
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, api.Scheme); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{}
client.AddReactor("*", "*", testclient.ObjectReaction(o, api.RESTMapper))
pv, err := client.PersistentVolumes().Get("any")
if err != nil {
t.Errorf("Unexpected error getting PV from client: %v", err)
}
pv.ObjectMeta.SelfLink = testapi.Default.SelfLink("pv", "")
claim, error := client.PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
}
claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", "")
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
// the default value of the PV is Pending.
// if has previously been processed by the binder, it's status in etcd would be Available.
// Only Pending volumes were being indexed and made ready for claims.
pv.Status.Phase = api.VolumeAvailable
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
// an initial sync for a claim will bind it to an unbound volume, triggers state change
err = syncClaim(volumeIndex, mockClient, claim)
if err != nil {
t.Fatalf("Expected Clam to be bound, instead got an error: %+v\n", err)
}
// state change causes another syncClaim to update statuses
syncClaim(volumeIndex, mockClient, claim)
// claim updated volume's status, causing an update and syncVolume call
syncVolume(volumeIndex, mockClient, pv)
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef: %+v\n", pv)
}
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0])
}
// pretend the user deleted their claim
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
if mockClient.volume.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef: %+v", mockClient.volume.Spec.ClaimRef)
}
}
@ -363,7 +379,8 @@ func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolu
}
func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
c.volume = volume
return c.volume, nil
}
func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
@ -372,7 +389,8 @@ func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume)
}
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
c.volume = volume
return c.volume, nil
}
func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
@ -384,11 +402,13 @@ func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*ap
}
func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
c.claim = claim
return c.claim, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
c.claim = claim
return c.claim, nil
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
)
func TestMatchVolume(t *testing.T) {
@ -104,17 +105,17 @@ func TestMatchVolume(t *testing.T) {
}
for name, scenario := range scenarios {
volume, err := volList.FindBestMatchForClaim(scenario.claim)
volume, err := volList.findBestMatchForClaim(scenario.claim)
if err != nil {
t.Errorf("Unexpected error matching volume by claim: %v", err)
}
if scenario.expectedMatch != "" && volume == nil {
if len(scenario.expectedMatch) != 0 && volume == nil {
t.Errorf("Expected match but received nil volume for scenario: %s", name)
}
if scenario.expectedMatch != "" && volume != nil && string(volume.UID) != scenario.expectedMatch {
if len(scenario.expectedMatch) != 0 && volume != nil && string(volume.UID) != scenario.expectedMatch {
t.Errorf("Expected %s but got volume %s in scenario %s", scenario.expectedMatch, volume.UID, name)
}
if scenario.expectedMatch == "" && volume != nil {
if len(scenario.expectedMatch) == 0 && volume != nil {
t.Errorf("Unexpected match for scenario: %s", name)
}
}
@ -175,7 +176,7 @@ func TestMatchingWithBoundVolumes(t *testing.T) {
},
}
volume, err := volumeIndex.FindBestMatchForClaim(claim)
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
t.Fatalf("Unexpected error matching volume by claim: %v", err)
}
@ -296,27 +297,27 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) {
index.Add(ebs)
index.Add(nfs)
volume, _ := index.FindBestMatchForClaim(claim)
volume, _ := index.findBestMatchForClaim(claim)
if volume.Name != ebs.Name {
t.Errorf("Expected %s but got volume %s instead", ebs.Name, volume.Name)
}
claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany}
volume, _ = index.FindBestMatchForClaim(claim)
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != gce.Name {
t.Errorf("Expected %s but got volume %s instead", gce.Name, volume.Name)
}
// order of the requested modes should not matter
claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteMany, api.ReadWriteOnce, api.ReadOnlyMany}
volume, _ = index.FindBestMatchForClaim(claim)
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != nfs.Name {
t.Errorf("Expected %s but got volume %s instead", nfs.Name, volume.Name)
}
// fewer modes requested should still match
claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteMany}
volume, _ = index.FindBestMatchForClaim(claim)
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != nfs.Name {
t.Errorf("Expected %s but got volume %s instead", nfs.Name, volume.Name)
}
@ -324,7 +325,7 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) {
// pretend the exact match is bound. should get the next level up of modes.
ebs.Spec.ClaimRef = &api.ObjectReference{}
claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteOnce}
volume, _ = index.FindBestMatchForClaim(claim)
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != gce.Name {
t.Errorf("Expected %s but got volume %s instead", gce.Name, volume.Name)
}
@ -332,7 +333,7 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) {
// continue up the levels of modes.
gce.Spec.ClaimRef = &api.ObjectReference{}
claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadWriteOnce}
volume, _ = index.FindBestMatchForClaim(claim)
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != nfs.Name {
t.Errorf("Expected %s but got volume %s instead", nfs.Name, volume.Name)
}
@ -340,7 +341,7 @@ func TestFindingVolumeWithDifferentAccessModes(t *testing.T) {
// partial mode request
gce.Spec.ClaimRef = nil
claim.Spec.AccessModes = []api.PersistentVolumeAccessMode{api.ReadOnlyMany}
volume, _ = index.FindBestMatchForClaim(claim)
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != gce.Name {
t.Errorf("Expected %s but got volume %s instead", gce.Name, volume.Name)
}
@ -485,53 +486,40 @@ func createTestVolumes() []*api.PersistentVolume {
}
}
func testVolume(name, size string) *api.PersistentVolume {
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: name,
Annotations: map[string]string{},
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse(size)},
PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
},
}
}
func TestFindingPreboundVolumes(t *testing.T) {
pv1 := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "pv1",
Annotations: map[string]string{},
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi")},
PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
},
}
pv5 := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "pv5",
Annotations: map[string]string{},
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi")},
PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
},
}
pv8 := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Name: "pv8",
Annotations: map[string]string{},
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi")},
PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{}},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
},
}
claim := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
SelfLink: testapi.Default.SelfLink("pvc", ""),
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{Requests: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi")}},
},
}
claimRef, err := api.GetReference(claim)
if err != nil {
t.Errorf("error getting claimRef: %v", err)
}
pv1 := testVolume("pv1", "1Gi")
pv5 := testVolume("pv5", "5Gi")
pv8 := testVolume("pv8", "8Gi")
index := NewPersistentVolumeOrderedIndex()
index.Add(pv1)
@ -539,22 +527,22 @@ func TestFindingPreboundVolumes(t *testing.T) {
index.Add(pv8)
// expected exact match on size
volume, _ := index.FindBestMatchForClaim(claim)
volume, _ := index.findBestMatchForClaim(claim)
if volume.Name != pv1.Name {
t.Errorf("Expected %s but got volume %s instead", pv1.Name, volume.Name)
}
// pretend the exact match is pre-bound. should get the next size up.
pv1.Annotations[createdForKey] = "some/other/claim"
volume, _ = index.FindBestMatchForClaim(claim)
pv1.Spec.ClaimRef = &api.ObjectReference{Name: "foo", Namespace: "bar"}
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != pv5.Name {
t.Errorf("Expected %s but got volume %s instead", pv5.Name, volume.Name)
}
// pretend the exact match is available but the largest volume is pre-bound to the claim.
delete(pv1.Annotations, createdForKey)
pv8.Annotations[createdForKey] = "myns/claim01"
volume, _ = index.FindBestMatchForClaim(claim)
pv1.Spec.ClaimRef = nil
pv8.Spec.ClaimRef = claimRef
volume, _ = index.findBestMatchForClaim(claim)
if volume.Name != pv8.Name {
t.Errorf("Expected %s but got volume %s instead", pv8.Name, volume.Name)
}

View File

@ -79,8 +79,8 @@ func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.Persi
// matchPredicate is a function that indicates that a persistent volume matches another
type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool
// Find returns the nearest PV from the ordered list or nil if a match is not found
func (pvIndex *persistentVolumeOrderedIndex) Find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
// find returns the nearest PV from the ordered list or nil if a match is not found
func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
// the 'searchPV' argument is a synthetic PV with capacity and accessmodes set according to the user's PersistentVolumeClaim.
// the synthetic pv arg is, therefore, a request for a storage resource.
//
@ -94,6 +94,16 @@ func (pvIndex *persistentVolumeOrderedIndex) Find(searchPV *api.PersistentVolume
// potential matches (the GCEPD example above).
allPossibleModes := pvIndex.allPossibleMatchingAccessModes(searchPV.Spec.AccessModes)
// the searchPV should contain an annotation that allows pre-binding to a claim.
// we can use the same annotation value (pvc's namespace/name) and check against
// existing volumes to find an exact match. It is possible that a bind is made (ClaimRef persisted to PV)
// but the fail to update claim.Spec.VolumeName fails. This check allows the claim to find the volume
// that's already bound to the claim.
preboundClaim := ""
if createdFor, ok := searchPV.Annotations[createdForKey]; ok {
preboundClaim = createdFor
}
for _, modes := range allPossibleModes {
volumes, err := pvIndex.ListByAccessModes(modes)
if err != nil {
@ -105,23 +115,17 @@ func (pvIndex *persistentVolumeOrderedIndex) Find(searchPV *api.PersistentVolume
// return the exact pre-binding match, if found
unboundVolumes := []*api.PersistentVolume{}
for _, volume := range volumes {
// check for current binding
if volume.Spec.ClaimRef != nil {
if volume.Spec.ClaimRef == nil {
// volume isn't currently bound or pre-bound.
unboundVolumes = append(unboundVolumes, volume)
continue
}
// check for pre-bind where the volume is intended for one specific claim
if createdFor, ok := volume.Annotations[createdForKey]; ok {
if createdFor != searchPV.Annotations[createdForKey] {
// the volume is pre-bound and does not match the search criteria.
continue
}
// exact annotation match! No search required.
boundClaim := fmt.Sprintf("%s/%s", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if boundClaim == preboundClaim {
// exact match! No search required.
return volume, nil
}
// volume isn't currently bound or pre-bound.
unboundVolumes = append(unboundVolumes, volume)
}
i := sort.Search(len(unboundVolumes), func(i int) bool { return matchPredicate(searchPV, unboundVolumes[i]) })
@ -147,11 +151,11 @@ func (pvIndex *persistentVolumeOrderedIndex) findByAccessModesAndStorageCapacity
},
},
}
return pvIndex.Find(pv, matchStorageCapacity)
return pvIndex.find(pv, matchStorageCapacity)
}
// FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
return pvIndex.findByAccessModesAndStorageCapacity(fmt.Sprintf("%s/%s", claim.Namespace, claim.Name), claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)])
}

View File

@ -19,6 +19,8 @@ limitations under the License.
package integration
import (
"fmt"
"math/rand"
"testing"
"time"
@ -28,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/volume"
@ -48,11 +51,11 @@ func TestPersistentVolumeRecycler(t *testing.T) {
recyclerClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
testClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 1*time.Second)
binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Minute)
binder.Run()
defer binder.Stop()
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 1*time.Second, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}})
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Minute, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}})
recycler.Run()
defer recycler.Stop()
@ -122,6 +125,50 @@ func TestPersistentVolumeRecycler(t *testing.T) {
break
}
}
// test the race between claims and volumes. ensure only a volume only binds to a single claim.
deleteAllEtcdKeys()
counter := 0
maxClaims := 100
claims := []*api.PersistentVolumeClaim{}
for counter <= maxClaims {
counter += 1
clone, _ := conversion.NewCloner().DeepCopy(pvc)
newPvc, _ := clone.(*api.PersistentVolumeClaim)
newPvc.ObjectMeta = api.ObjectMeta{Name: fmt.Sprintf("fake-pvc-%d", counter)}
claim, err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Create(newPvc)
if err != nil {
t.Fatal("Error creating newPvc: %v", err)
}
claims = append(claims, claim)
}
// putting a bind manually on a pv should only match the claim it is bound to
rand.Seed(time.Now().Unix())
claim := claims[rand.Intn(maxClaims-1)]
claimRef, err := api.GetReference(claim)
if err != nil {
t.Fatalf("Unexpected error getting claimRef: %v", err)
}
pv.Spec.ClaimRef = claimRef
pv, err = testClient.PersistentVolumes().Create(pv)
if err != nil {
t.Fatalf("Unexpected error creating pv: %v", err)
}
waitForPersistentVolumePhase(w, api.VolumeBound)
pv, err = testClient.PersistentVolumes().Get(pv.Name)
if err != nil {
t.Fatalf("Unexpected error getting pv: %v", err)
}
if pv.Spec.ClaimRef == nil {
t.Fatalf("Unexpected nil claimRef")
}
if pv.Spec.ClaimRef.Namespace != claimRef.Namespace || pv.Spec.ClaimRef.Name != claimRef.Name {
t.Fatalf("Bind mismatch! Expected %s/%s but got %s/%s", claimRef.Namespace, claimRef.Name, pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
}
}
func waitForPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) {