diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index 9bbeb9c94dd..683acfb307b 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -56,8 +56,8 @@ import ( type DesiredStateOfWorldPopulator interface { Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) - // ReprocessPod removes the specified pod from the list of processedPods - // (if it exists) forcing it to be reprocessed. This is required to enable + // ReprocessPod sets value for the specified pod in processedPods + // to false, forcing it to be reprocessed. This is required to enable // remounting volumes on pod updates (volumes like Downward API volumes // depend on this behavior to ensure volume content is updated). ReprocessPod(podName volumetypes.UniquePodName) @@ -150,7 +150,7 @@ func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, func (dswp *desiredStateOfWorldPopulator) ReprocessPod( podName volumetypes.UniquePodName) { - dswp.deleteProcessedPod(podName) + dswp.markPodProcessingFailed(podName) } func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool { @@ -362,6 +362,12 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes( dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName) // Remove any stored errors for the pod, everything went well in this processPodVolumes dswp.desiredStateOfWorld.PopPodErrors(uniquePodName) + } else if dswp.podHasBeenSeenOnce(uniquePodName) { + // For the Pod which has been processed at least once, even though some volumes + // may not have been reprocessed successfully this round, we still mark it as processed to avoid + // processing it at a very high frequency. The pod will be reprocessed when volume manager calls + // ReprocessPod() which is triggered by SyncPod. + dswp.markPodProcessed(uniquePodName) } } @@ -434,14 +440,32 @@ func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolu } // podPreviouslyProcessed returns true if the volumes for this pod have already -// been processed by the populator +// been processed/reprocessed by the populator. Otherwise, the volumes for this pod need to +// be reprocessed. func (dswp *desiredStateOfWorldPopulator) podPreviouslyProcessed( podName volumetypes.UniquePodName) bool { dswp.pods.RLock() defer dswp.pods.RUnlock() - _, exists := dswp.pods.processedPods[podName] - return exists + return dswp.pods.processedPods[podName] +} + +// markPodProcessingFailed marks the specified pod from processedPods as false to indicate that it failed processing +func (dswp *desiredStateOfWorldPopulator) markPodProcessingFailed( + podName volumetypes.UniquePodName) { + dswp.pods.Lock() + dswp.pods.processedPods[podName] = false + dswp.pods.Unlock() +} + +// podHasBeenSeenOnce returns true if the pod has been seen by the popoulator +// at least once. +func (dswp *desiredStateOfWorldPopulator) podHasBeenSeenOnce( + podName volumetypes.UniquePodName) bool { + dswp.pods.RLock() + _, exist := dswp.pods.processedPods[podName] + dswp.pods.RUnlock() + return exist } // markPodProcessed records that the volumes for the specified pod have been diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index b6889372601..a8bde8a0c37 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -47,6 +47,111 @@ import ( "k8s.io/kubernetes/pkg/volume/util/types" ) +func pluginPVOmittingClient(dswp *desiredStateOfWorldPopulator) { + fakeClient := &fake.Clientset{} + fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { + return false, nil, nil + }) + fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) { + return false, nil, nil + }) + dswp.kubeClient = fakeClient +} + +func prepareDswpWithVolume(t *testing.T) (*desiredStateOfWorldPopulator, kubepod.Manager) { + // create dswp + mode := v1.PersistentVolumeFilesystem + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dswp-test-volume-name", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "file-bound"}, + VolumeMode: &mode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "dswp-test-volume-name", + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + } + dswp, fakePodManager, _ := createDswpWithVolume(t, pv, pvc) + return dswp, fakePodManager +} + +func TestFindAndAddNewPods_WithReprocessPodAndVolumeRetrievalError(t *testing.T) { + // create dswp + dswp, fakePodManager := prepareDswpWithVolume(t) + + // create pod + containers := []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{ + { + Name: "dswp-test-volume-name", + MountPath: "/mnt", + }, + }, + }, + } + pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) + + fakePodManager.AddPod(pod) + + podName := util.GetUniquePodName(pod) + + dswp.findAndAddNewPods() + + if !dswp.podPreviouslyProcessed(podName) { + t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName) + } + pluginPVOmittingClient(dswp) + + dswp.ReprocessPod(podName) + dswp.findAndAddNewPods() + + if !dswp.podPreviouslyProcessed(podName) { + t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName) + } + fakePodManager.DeletePod(pod) +} + +func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { + // create dswp + dswp, fakePodManager := prepareDswpWithVolume(t) + + pluginPVOmittingClient(dswp) + + // create pod + containers := []v1.Container{ + { + VolumeMounts: []v1.VolumeMount{ + { + Name: "dswp-test-volume-name", + MountPath: "/mnt", + }, + }, + }, + } + pod := createPodWithVolume("dswp-test-pod", "dswp-test-volume-name", "file-bound", containers) + + fakePodManager.AddPod(pod) + + podName := util.GetUniquePodName(pod) + + dswp.findAndAddNewPods() + + if dswp.podPreviouslyProcessed(podName) { + t.Fatalf("The volumes for the specified pod: %s should not have been processed by the populator", podName) + } + if dswp.podHasBeenSeenOnce(podName) { + t.Fatalf("The volumes for the specified pod: %s should not have been processed by the populator", podName) + } +} + func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { // create dswp mode := v1.PersistentVolumeFilesystem