mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Fixed persistent volume claim controllers processing an old claim.
Fixes #19860 (it may be easier to look at the issue to see exact sequence to reproduce the bug and understand the fix). When PersistentVolumeProvisionerController.reconcileClaim() is called with the same claim in short succession (e.g. the claim is created by an user and at the same time periodic check of all claims is scheduled), the second reconcileClaim() call gets an old copy of the claim as its parameter. The method should always reload the claim to get a fresh copy with all annotations, possibly added by previous reconcileClaim() call. The same applies to PersistentVolumeClaimBinder.syncClaim(). Also update all the test to store claims in "fake" API server before calling syncClaim and reconcileClaim.
This commit is contained in:
parent
71ae2736c0
commit
1edf34a4e5
@ -329,7 +329,15 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
|
||||
}
|
||||
|
||||
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
|
||||
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
|
||||
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s] for binding", claim.Name)
|
||||
|
||||
// The claim may have been modified by parallel call to syncClaim, load
|
||||
// the current version.
|
||||
newClaim, err := binderClient.GetPersistentVolumeClaim(claim.Namespace, claim.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot reload claim %s/%s: %v", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
claim = newClaim
|
||||
|
||||
switch claim.Status.Phase {
|
||||
case api.ClaimPending:
|
||||
|
@ -123,7 +123,6 @@ func TestClaimRace(t *testing.T) {
|
||||
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost(tmpDir, nil, nil))
|
||||
|
||||
// adds the volume to the index, making the volume available
|
||||
syncVolume(volumeIndex, mockClient, v)
|
||||
if mockClient.volume.Status.Phase != api.VolumeAvailable {
|
||||
@ -133,6 +132,8 @@ func TestClaimRace(t *testing.T) {
|
||||
t.Errorf("Expected to find volume in index but it did not exist")
|
||||
}
|
||||
|
||||
// add the claim to fake API server
|
||||
mockClient.UpdatePersistentVolumeClaim(c1)
|
||||
// an initial sync for a claim matches the volume
|
||||
err = syncClaim(volumeIndex, mockClient, c1)
|
||||
if err != nil {
|
||||
@ -143,6 +144,8 @@ func TestClaimRace(t *testing.T) {
|
||||
}
|
||||
|
||||
// before the volume gets updated w/ claimRef, a 2nd claim can attempt to bind and find the same volume
|
||||
// add the 2nd claim to fake API server
|
||||
mockClient.UpdatePersistentVolumeClaim(c2)
|
||||
err = syncClaim(volumeIndex, mockClient, c2)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error for unmatched claim: %v", err)
|
||||
@ -408,6 +411,8 @@ func TestBindingWithExamples(t *testing.T) {
|
||||
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, mockClient.volume.Status.Phase)
|
||||
}
|
||||
|
||||
// add the claim to fake API server
|
||||
mockClient.UpdatePersistentVolumeClaim(claim)
|
||||
// an initial sync for a claim will bind it to an unbound volume
|
||||
syncClaim(volumeIndex, mockClient, claim)
|
||||
|
||||
@ -477,6 +482,14 @@ func TestCasting(t *testing.T) {
|
||||
Status: api.PersistentVolumeClaimStatus{Phase: api.ClaimBound},
|
||||
}
|
||||
|
||||
// Inject mockClient into the binder. This prevents weird errors on stderr
|
||||
// as the binder wants to load PV/PVC from API server.
|
||||
mockClient := &mockBinderClient{
|
||||
volume: pv,
|
||||
claim: pvc,
|
||||
}
|
||||
binder.client = mockClient
|
||||
|
||||
// none of these should fail casting.
|
||||
// the real test is not failing when passed DeletedFinalStateUnknown in the deleteHandler
|
||||
binder.addVolume(pv)
|
||||
|
@ -153,6 +153,20 @@ func (controller *PersistentVolumeProvisionerController) handleUpdateClaim(oldOb
|
||||
}
|
||||
|
||||
func (controller *PersistentVolumeProvisionerController) reconcileClaim(claim *api.PersistentVolumeClaim) error {
|
||||
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s] for dynamic provisioning", claim.Name)
|
||||
|
||||
// The claim may have been modified by parallel call to reconcileClaim, load
|
||||
// the current version.
|
||||
newClaim, err := controller.client.GetPersistentVolumeClaim(claim.Namespace, claim.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot reload claim %s/%s: %v", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
claim = newClaim
|
||||
err = controller.claimStore.Update(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot update claim %s/%s: %v", claim.Namespace, claim.Name, err)
|
||||
}
|
||||
|
||||
if controller.provisioner == nil {
|
||||
return fmt.Errorf("No provisioner configured for controller")
|
||||
}
|
||||
|
@ -105,6 +105,9 @@ func TestReconcileClaim(t *testing.T) {
|
||||
|
||||
// watch would have added the claim to the store
|
||||
controller.claimStore.Add(pvc)
|
||||
// store it in fake API server
|
||||
mockClient.UpdatePersistentVolumeClaim(pvc)
|
||||
|
||||
err := controller.reconcileClaim(pvc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
@ -116,6 +119,8 @@ func TestReconcileClaim(t *testing.T) {
|
||||
}
|
||||
|
||||
pvc.Annotations[qosProvisioningKey] = "foo"
|
||||
// store it in fake API server
|
||||
mockClient.UpdatePersistentVolumeClaim(pvc)
|
||||
|
||||
err = controller.reconcileClaim(pvc)
|
||||
if err != nil {
|
||||
@ -130,6 +135,40 @@ func TestReconcileClaim(t *testing.T) {
|
||||
if mockClient.volume.Spec.ClaimRef.Name != pvc.Name {
|
||||
t.Errorf("Expected PV to be bound to %s but got %s", mockClient.volume.Spec.ClaimRef.Name, pvc.Name)
|
||||
}
|
||||
|
||||
// the PVC should have correct annotation
|
||||
if mockClient.claim.Annotations[pvProvisioningRequiredAnnotationKey] != pvProvisioningCompletedAnnotationValue {
|
||||
t.Errorf("Annotation %q not set", pvProvisioningRequiredAnnotationKey)
|
||||
}
|
||||
|
||||
// Run the syncClaim 2nd time to simulate periodic sweep running in parallel
|
||||
// to the previous syncClaim. There is a lock in handleUpdateVolume(), so
|
||||
// they will be called sequentially, but the second call will have old
|
||||
// version of the claim.
|
||||
oldPVName := mockClient.volume.Name
|
||||
|
||||
// Make the "old" claim
|
||||
pvc2 := makeTestClaim()
|
||||
pvc2.Annotations[qosProvisioningKey] = "foo"
|
||||
// Add a dummy annotation so we recognize the claim was updated (i.e.
|
||||
// stored in mockClient)
|
||||
pvc2.Annotations["test"] = "test"
|
||||
|
||||
err = controller.reconcileClaim(pvc2)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// The 2nd PVC should be ignored, no new PV was created
|
||||
if val, found := pvc2.Annotations[pvProvisioningRequiredAnnotationKey]; found {
|
||||
t.Errorf("2nd PVC got unexpected annotation %q: %q", pvProvisioningRequiredAnnotationKey, val)
|
||||
}
|
||||
if mockClient.volume.Name != oldPVName {
|
||||
t.Errorf("2nd PVC unexpectedly provisioned a new volume")
|
||||
}
|
||||
if _, found := mockClient.claim.Annotations["test"]; found {
|
||||
t.Errorf("2nd PVC was unexpectedly updated")
|
||||
}
|
||||
}
|
||||
|
||||
func checkTagValue(t *testing.T, tags map[string]string, tag string, expectedValue string) {
|
||||
|
@ -46,9 +46,11 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
||||
defer s.Close()
|
||||
|
||||
deleteAllEtcdKeys()
|
||||
binderClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
recyclerClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
testClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
// Use higher QPS and Burst, there is a test for race condition below, which
|
||||
// creates many claims and default values were too low.
|
||||
binderClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000})
|
||||
recyclerClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000})
|
||||
testClient := clientset.NewForConfigOrDie(&client.Config{Host: s.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000})
|
||||
host := volume.NewFakeVolumeHost("/tmp/fake", nil, nil)
|
||||
|
||||
plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}, volume.VolumeOptions{}}}
|
||||
|
Loading…
Reference in New Issue
Block a user