diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c3769d88aed..6281aebdf43 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -504,7 +504,8 @@ func NewMainKubelet( hostname, klet.podManager, klet.kubeClient, - klet.volumePluginMgr) + klet.volumePluginMgr, + klet.containerRuntime) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 552df6990b3..10e49c34b97 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -299,7 +299,8 @@ func newTestKubeletWithImageList( kubelet.hostname, kubelet.podManager, fakeKubeClient, - kubelet.volumePluginMgr) + kubelet.volumePluginMgr, + fakeRuntime) if err != nil { t.Fatalf("failed to initialize volume manager: %v", err) } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 14b8e0c5064..c88ed13d614 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -97,7 +97,8 @@ func TestRunOnce(t *testing.T) { kb.hostname, kb.podManager, kb.kubeClient, - kb.volumePluginMgr) + kb.volumePluginMgr, + fakeRuntime) kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR) // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency diff --git a/pkg/kubelet/volume/populator/desired_state_of_world_populator.go b/pkg/kubelet/volume/populator/desired_state_of_world_populator.go index bbbd4992628..eecd8022060 100644 --- a/pkg/kubelet/volume/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volume/populator/desired_state_of_world_populator.go @@ -29,7 +29,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/pod" + "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/volume/cache" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/wait" @@ -64,24 +66,31 @@ type DesiredStateOfWorldPopulator interface { func NewDesiredStateOfWorldPopulator( kubeClient internalclientset.Interface, loopSleepDuration time.Duration, + getPodStatusRetryDuration time.Duration, podManager pod.Manager, - desiredStateOfWorld cache.DesiredStateOfWorld) DesiredStateOfWorldPopulator { + desiredStateOfWorld cache.DesiredStateOfWorld, + kubeContainerRuntime kubecontainer.Runtime) DesiredStateOfWorldPopulator { return &desiredStateOfWorldPopulator{ - kubeClient: kubeClient, - loopSleepDuration: loopSleepDuration, - podManager: podManager, - desiredStateOfWorld: desiredStateOfWorld, + kubeClient: kubeClient, + loopSleepDuration: loopSleepDuration, + getPodStatusRetryDuration: getPodStatusRetryDuration, + podManager: podManager, + desiredStateOfWorld: desiredStateOfWorld, pods: processedPods{ processedPods: make(map[volumetypes.UniquePodName]bool)}, + kubeContainerRuntime: kubeContainerRuntime, } } type desiredStateOfWorldPopulator struct { - kubeClient internalclientset.Interface - loopSleepDuration time.Duration - podManager pod.Manager - desiredStateOfWorld cache.DesiredStateOfWorld - pods processedPods + kubeClient internalclientset.Interface + loopSleepDuration time.Duration + getPodStatusRetryDuration time.Duration + podManager pod.Manager + desiredStateOfWorld cache.DesiredStateOfWorld + pods processedPods + kubeContainerRuntime kubecontainer.Runtime + timeOfLastGetPodStatus time.Time } type processedPods struct { @@ -102,6 +111,20 @@ func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { return func() { dswp.findAndAddNewPods() + // findAndRemoveDeletedPods() calls out to the container runtime to + // determine if the containers for a given pod are terminated. This is + // an expensive operation, therefore we limit the rate that + // findAndRemoveDeletedPods() is called independently of the main + // populator loop. + if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration { + glog.V(5).Infof( + "Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).", + dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration), + dswp.getPodStatusRetryDuration) + + return + } + dswp.findAndRemoveDeletedPods() } } @@ -117,19 +140,60 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { // Iterate through all pods in desired state of world, and remove if they no // longer exist func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() { + var runningPods []*kubecontainer.Pod + + runningPodsFetched := false for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() { if _, podExists := - dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); !podExists { - glog.V(10).Infof( - "Removing volume %q (volSpec=%q) for pod %q from desired state.", - volumeToMount.VolumeName, - volumeToMount.VolumeSpec.Name(), - volumeToMount.PodName) - - dswp.desiredStateOfWorld.DeletePodFromVolume( - volumeToMount.PodName, volumeToMount.VolumeName) - dswp.deleteProcessedPod(volumeToMount.PodName) + dswp.podManager.GetPodByUID(volumeToMount.Pod.UID); podExists { + continue } + + // Once a pod has been deleted from kubelet pod manager, do not delete + // it immediately from volume manager. Instead, check the kubelet + // containerRuntime to verify that all containers in the pod have been + // terminated. + if !runningPodsFetched { + var getPodsErr error + runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false) + if getPodsErr != nil { + glog.Errorf( + "kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.", + getPodsErr) + continue + } + + runningPodsFetched = true + dswp.timeOfLastGetPodStatus = time.Now() + } + + runningContainers := false + for _, runningPod := range runningPods { + if runningPod.ID == volumeToMount.Pod.UID { + if len(runningPod.Containers) > 0 { + runningContainers = true + } + + break + } + } + + if runningContainers { + glog.V(5).Infof( + "Pod %q has been removed from pod manager. However, it still has one or more containers in the non-exited state. Therefore it will not be removed from volume manager.", + format.Pod(volumeToMount.Pod)) + continue + } + + glog.V(5).Infof( + "Removing volume %q (volSpec=%q) for pod %q from desired state.", + volumeToMount.VolumeName, + volumeToMount.VolumeSpec.Name(), + format.Pod(volumeToMount.Pod)) + + dswp.desiredStateOfWorld.DeletePodFromVolume( + volumeToMount.PodName, volumeToMount.VolumeName) + dswp.deleteProcessedPod(volumeToMount.PodName) } } @@ -151,10 +215,9 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *api.Pod) { dswp.createVolumeSpec(podVolume, pod.Namespace) if err != nil { glog.Errorf( - "Error processing volume %q for pod %q/%q: %v", + "Error processing volume %q for pod %q: %v", podVolume.Name, - pod.Namespace, - pod.Name, + format.Pod(pod), err) continue } diff --git a/pkg/kubelet/volume/volume_manager.go b/pkg/kubelet/volume/volume_manager.go index 3cd1d8bd263..4976ccb9e2a 100644 --- a/pkg/kubelet/volume/volume_manager.go +++ b/pkg/kubelet/volume/volume_manager.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/kubelet/container" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/volume/cache" @@ -48,6 +49,12 @@ const ( // DesiredStateOfWorldPopulator loop waits between successive executions desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond + // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of + // time the DesiredStateOfWorldPopulator loop waits between successive pod + // cleanup calls (to prevent calling containerruntime.GetPodStatus too + // frequently). + desiredStateOfWorldPopulatorGetPodStatusRetryDuration time.Duration = 2 * time.Second + // podAttachAndMountTimeout is the maximum amount of time the // WaitForAttachAndMount call will wait for all volumes in the specified pod // to be attached and mounted. Even though cloud operations can take several @@ -134,7 +141,8 @@ func NewVolumeManager( hostName string, podManager pod.Manager, kubeClient internalclientset.Interface, - volumePluginMgr *volume.VolumePluginMgr) (VolumeManager, error) { + volumePluginMgr *volume.VolumePluginMgr, + kubeContainerRuntime kubecontainer.Runtime) (VolumeManager, error) { vm := &volumeManager{ kubeClient: kubeClient, volumePluginMgr: volumePluginMgr, @@ -157,8 +165,10 @@ func NewVolumeManager( vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator( kubeClient, desiredStateOfWorldPopulatorLoopSleepPeriod, + desiredStateOfWorldPopulatorGetPodStatusRetryDuration, podManager, - vm.desiredStateOfWorld) + vm.desiredStateOfWorld, + kubeContainerRuntime) return vm, nil }