diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 5031be77c04..3cdcdc6dd5d 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -260,6 +260,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { return } + allVolumesAdded := true + // Process volume spec for each volume defined in pod for _, podVolume := range pod.Spec.Volumes { volumeSpec, volumeGidValue, err := @@ -270,6 +272,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { podVolume.Name, format.Pod(pod), err) + allVolumesAdded = false continue } @@ -283,6 +286,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { volumeSpec.Name(), uniquePodName, err) + allVolumesAdded = false } glog.V(10).Infof( @@ -292,7 +296,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) { uniquePodName) } - dswp.markPodProcessed(uniquePodName) + // some of the volume additions may have failed, should not mark this pod as fully processed + if allVolumesAdded { + dswp.markPodProcessed(uniquePodName) + } + } // podPreviouslyProcessed returns true if the volumes for this pod have already @@ -327,6 +335,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod( // createVolumeSpec creates and returns a mutatable volume.Spec object for the // specified volume. It dereference any PVC to get PV objects, if needed. +// Returns an error if unable to obtain the volume at this time. func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( podVolume v1.Volume, podNamespace string) (*volume.Spec, string, error) { if pvcSource := @@ -409,6 +418,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV( } if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { + return "", "", fmt.Errorf( "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", namespace, diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 773f6eb4d52..590b580cda8 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -92,6 +92,44 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { } } +func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager(), configmap.NewFakeManager()) + + node, pod, pv, claim := createObjects() + claim.Status = v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimPending, + } + + kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) + + manager := newTestVolumeManager(tmpDir, podManager, kubeClient) + + stopCh := runVolumeManager(manager) + defer close(stopCh) + + podManager.SetPods([]*v1.Pod{pod}) + + // Fake node status update + go simulateVolumeInUseUpdate( + v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name), + stopCh, + manager) + + // delayed claim binding + go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name) + + err = manager.WaitForAttachAndMount(pod) + if err != nil { + t.Errorf("Expected success: %v", err) + } + +} + func TestGetExtraSupplementalGroupsForPod(t *testing.T) { tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") if err != nil { @@ -279,6 +317,20 @@ func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan str } } +func delayClaimBecomesBound( + kubeClient clientset.Interface, + namespace, claimName string, +) { + time.Sleep(500 * time.Millisecond) + volumeClaim, _ := + kubeClient.Core().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{}) + volumeClaim.Status = v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + } + kubeClient.Core().PersistentVolumeClaims(namespace).Update(volumeClaim) + return +} + func runVolumeManager(manager VolumeManager) chan struct{} { stopCh := make(chan struct{}) //readyCh := make(chan bool, 1)