recheck pod volumes before marking pod as processed

This commit is contained in:
Nate Franzen 2017-04-12 11:49:08 -07:00
parent 3823270b9e
commit 6d59906884
2 changed files with 66 additions and 1 deletions

View File

@ -260,6 +260,8 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
return return
} }
allVolumesAdded := true
// Process volume spec for each volume defined in pod // Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes { for _, podVolume := range pod.Spec.Volumes {
volumeSpec, volumeGidValue, err := volumeSpec, volumeGidValue, err :=
@ -270,6 +272,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
podVolume.Name, podVolume.Name,
format.Pod(pod), format.Pod(pod),
err) err)
allVolumesAdded = false
continue continue
} }
@ -283,6 +286,7 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
volumeSpec.Name(), volumeSpec.Name(),
uniquePodName, uniquePodName,
err) err)
allVolumesAdded = false
} }
glog.V(10).Infof( glog.V(10).Infof(
@ -292,7 +296,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {
uniquePodName) uniquePodName)
} }
dswp.markPodProcessed(uniquePodName) // some of the volume additions may have failed, should not mark this pod as fully processed
if allVolumesAdded {
dswp.markPodProcessed(uniquePodName)
}
} }
// podPreviouslyProcessed returns true if the volumes for this pod have already // podPreviouslyProcessed returns true if the volumes for this pod have already
@ -327,6 +335,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
// createVolumeSpec creates and returns a mutatable volume.Spec object for the // createVolumeSpec creates and returns a mutatable volume.Spec object for the
// specified volume. It dereference any PVC to get PV objects, if needed. // specified volume. It dereference any PVC to get PV objects, if needed.
// Returns an error if unable to obtain the volume at this time.
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec( func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
podVolume v1.Volume, podNamespace string) (*volume.Spec, string, error) { podVolume v1.Volume, podNamespace string) (*volume.Spec, string, error) {
if pvcSource := if pvcSource :=
@ -409,6 +418,7 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
} }
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
return "", "", fmt.Errorf( return "", "", fmt.Errorf(
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)", "PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
namespace, namespace,

View File

@ -92,6 +92,47 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
} }
} }
func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient(), secret.NewFakeManager())
node, pod, pv, claim := createObjects()
claim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimPending,
}
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
manager, err := newTestVolumeManager(tmpDir, podManager, kubeClient)
if err != nil {
t.Fatalf("Failed to initialize volume manager: %v", err)
}
stopCh := runVolumeManager(manager)
defer close(stopCh)
podManager.SetPods([]*v1.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
stopCh,
manager)
// delayed claim binding
go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)
err = manager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
}
}
func TestGetExtraSupplementalGroupsForPod(t *testing.T) { func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest") tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
if err != nil { if err != nil {
@ -279,6 +320,20 @@ func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan str
} }
} }
func delayClaimBecomesBound(
kubeClient clientset.Interface,
namespace, claimName string,
) {
time.Sleep(500 * time.Millisecond)
volumeClaim, _ :=
kubeClient.Core().PersistentVolumeClaims(namespace).Get(claimName, metav1.GetOptions{})
volumeClaim.Status = v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
}
kubeClient.Core().PersistentVolumeClaims(namespace).Update(volumeClaim)
return
}
func runVolumeManager(manager VolumeManager) chan struct{} { func runVolumeManager(manager VolumeManager) chan struct{} {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
//readyCh := make(chan bool, 1) //readyCh := make(chan bool, 1)