diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a82e6c6d39a..ad67a9efbb1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1430,6 +1430,28 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con return nil } +// Filter out pods in the terminated state ("Failed" or "Succeeded"). +func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { + var pods []*api.Pod + for _, pod := range allPods { + var status api.PodStatus + // Check the cached pod status which was set after the last sync. + status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !ok { + // If there is no cached status, use the status from the + // apiserver. This is useful if kubelet has recently been + // restarted. + status = pod.Status + } + if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { + // Pod has reached the final state; ignore it. + continue + } + pods = append(pods, pod) + } + return pods +} + // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods map[string]*api.Pod, start time.Time) error { @@ -1444,16 +1466,20 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri } kl.statusManager.RemoveOrphanedStatuses(podFullNames) - // Filter out the rejected pod. They don't have running containers. + // Reject pods that we cannot run. kl.handleNotFittingPods(allPods) - var pods []*api.Pod - for _, pod := range allPods { - status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) - if ok && status.Phase == api.PodFailed { - continue - } - pods = append(pods, pod) - } + + // Pod phase progresses monotonically. Once a pod has reached a final state, + // it should never leave irregardless of the restart policy. The statuses + // of such pods should not be changed, and there is no need to sync them. + // TODO: the logic here does not handle two cases: + // 1. If the containers were removed immediately after they died, kubelet + // may fail to generate correct statuses, let alone filtering correctly. + // 2. If kubelet restarted before writing the terminated status for a pod + // to the apiserver, it could still restart the terminated pod (even + // though the pod was not considered terminated by the apiserver). + // These two conditions could be alleviated by checkpointing kubelet. + pods := kl.filterOutTerminatedPods(allPods) glog.V(4).Infof("Desired: %#v", pods) var err error diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index bb8c934e7f1..45e4358487b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -168,6 +168,18 @@ func verifyBoolean(t *testing.T, expected, value bool) { } } +func newTestPods(count int) []*api.Pod { + pods := make([]*api.Pod, count) + for i := 0; i < count; i++ { + pods[i] = &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("pod%d", i), + }, + } + } + return pods +} + func TestKubeletDirs(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet @@ -4246,3 +4258,20 @@ func TestGetRestartCount(t *testing.T) { fakeDocker.ExitedContainerList = []docker.APIContainers{} verifyRestartCount(&pod, 2) } + +func TestFilterOutTerminatedPods(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + pods := newTestPods(5) + pods[0].Status.Phase = api.PodFailed + pods[1].Status.Phase = api.PodSucceeded + pods[2].Status.Phase = api.PodRunning + pods[3].Status.Phase = api.PodPending + + expected := []*api.Pod{pods[2], pods[3], pods[4]} + kubelet.podManager.SetPods(pods) + actual := kubelet.filterOutTerminatedPods(pods) + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } +}