diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3dd0eeb1135..5a2dc5802a9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1155,37 +1155,33 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } -// filterOutPodsPastActiveDeadline filters pods with an ActiveDeadlineSeconds value that has been exceeded. -// It records an event that the pod has been active longer than the allocated time, and updates the pod status as failed. -// By filtering the pod from the result set, the Kubelet will kill the pod's containers as part of normal SyncPods workflow. -func (kl *Kubelet) filterOutPodsPastActiveDeadline(allPods []*api.Pod) (pods []*api.Pod) { +// pastActiveDeadline returns true if the pod has been active for more than +// ActiveDeadlineSeconds. +func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { now := util.Now() - for _, pod := range allPods { - keepPod := true - if pod.Spec.ActiveDeadlineSeconds != nil { - podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) - if !ok { - podStatus = pod.Status - } - if !podStatus.StartTime.IsZero() { - startTime := podStatus.StartTime.Time - duration := now.Time.Sub(startTime) - allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second - if duration >= allowedDuration { - keepPod = false - } - } + if pod.Spec.ActiveDeadlineSeconds != nil { + podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !ok { + podStatus = pod.Status } - if keepPod { - pods = append(pods, pod) - } else { - kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Message: "Pod was active on the node longer than specified deadline"}) + if !podStatus.StartTime.IsZero() { + startTime := podStatus.StartTime.Time + duration := now.Time.Sub(startTime) + allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second + if duration >= allowedDuration { + return true + } } } - return pods + return false +} + +//podIsTerminated returns true if status is in one of the terminated state. +func podIsTerminated(status *api.PodStatus) bool { + if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { + return true + } + return false } // Filter out pods in the terminated state ("Failed" or "Succeeded"). @@ -1201,8 +1197,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { // restarted. status = pod.Status } - if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { - // Pod has reached the final state; ignore it. + if podIsTerminated(&status) { continue } pods = append(pods, pod) @@ -1497,8 +1492,6 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metr // These two conditions could be alleviated by checkpointing kubelet. pods := kl.filterOutTerminatedPods(allPods) - pods = kl.filterOutPodsPastActiveDeadline(pods) - // Respect the pod creation order when resolving conflicts. sort.Sort(podsByCreationTime(pods)) @@ -1597,9 +1590,12 @@ func (kl *Kubelet) validateContainerStatus(podStatus *api.PodStatus, containerNa // or all of them. func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow, previous bool, stdout, stderr io.Writer) error { // TODO(vmarmol): Refactor to not need the pod status and verification. - podStatus, err := kl.getPodStatus(podFullName) - if err != nil { - return fmt.Errorf("failed to get status for pod %q - %v", podFullName, err) + // Pod workers periodically write status to statusManager. If status is not + // cached there, something is wrong (or kubelet just restarted and hasn't + // caught up yet). Just assume the pod is not ready yet. + podStatus, found := kl.statusManager.GetPodStatus(podFullName) + if !found { + return fmt.Errorf("failed to get status for pod %q", podFullName) } if err := kl.validatePodPhase(&podStatus); err != nil { // No log is available if pod is not in a "known" phase (e.g. Unknown). @@ -1925,30 +1921,21 @@ func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []a return ready } -// getPodStatus returns information of the containers in the pod from the -// container runtime. -func (kl *Kubelet) getPodStatus(podFullName string) (api.PodStatus, error) { - // Check to see if we have a cached version of the status. - cachedPodStatus, found := kl.statusManager.GetPodStatus(podFullName) - if found { - glog.V(3).Infof("Returning cached status for %q", podFullName) - return cachedPodStatus, nil - } - pod, found := kl.GetPodByFullName(podFullName) - if !found { - return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName) - } - return kl.generatePodStatus(pod) -} - // By passing the pod directly, this method avoids pod lookup, which requires // grabbing a lock. func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { podFullName := kubecontainer.GetPodFullName(pod) glog.V(3).Infof("Generating status for %q", podFullName) - spec := &pod.Spec + // TODO: Consider include the container information. + if kl.pastActiveDeadline(pod) { + kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline") + return api.PodStatus{ + Phase: api.PodFailed, + Message: "Pod was active on the node longer than specified deadline"}, nil + } + spec := &pod.Spec podStatus, err := kl.containerRuntime.GetPodStatus(pod) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 0a89d4053e9..8ff7e7404d3 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3010,19 +3010,9 @@ func TestHandlePortConflicts(t *testing.T) { kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, err := kl.getPodStatus(conflictedPodName) - if err != nil { - t.Fatalf("status of pod %q is not found in the status map: %#v", conflictedPodName, err) - } - if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) - } - - // Check if we can retrieve the pod status from GetPodStatus(). - kl.podManager.SetPods(pods) - status, err = kl.getPodStatus(conflictedPodName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", conflictedPodName, err) + status, found := kl.statusManager.GetPodStatus(conflictedPodName) + if !found { + t.Fatalf("status of pod %q is not found in the status map", conflictedPodName) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -3062,19 +3052,9 @@ func TestHandleNodeSelector(t *testing.T) { kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, err := kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err) - } - if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) - } - - // Check if we can retrieve the pod status from GetPodStatus(). - kl.podManager.SetPods(pods) - status, err = kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err) + status, found := kl.statusManager.GetPodStatus(notfittingPodName) + if !found { + t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -3120,19 +3100,9 @@ func TestHandleMemExceeded(t *testing.T) { kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, err := kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err) - } - if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) - } - - // Check if we can retrieve the pod status from GetPodStatus(). - kl.podManager.SetPods(pods) - status, err = kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err) + status, found := kl.statusManager.GetPodStatus(notfittingPodName) + if !found { + t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -3153,13 +3123,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } // Run once to populate the status map. kl.handleNotFittingPods(pods) - if _, err := kl.getPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err != nil { - t.Fatalf("expected to have status cached for %q: %v", "pod2", err) + if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found { + t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if _, err := kl.getPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err == nil { - t.Fatalf("expected to not have status cached for %q: %v", "pod2", err) + if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found { + t.Fatalf("expected to not have status cached for pod2") } } @@ -4169,11 +4139,11 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { t.Errorf("%d: unexpected error: %v", i, err) } - // Check if we can retrieve the pod status from GetPodStatus(). + // Check if we can retrieve the pod status. podName := kubecontainer.GetPodFullName(pods[0]) - status, err := kubelet.getPodStatus(podName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", podName, err) + status, found := kubelet.statusManager.GetPodStatus(podName) + if !found { + t.Fatalf("unable to retrieve pod status for pod %q.", podName) } else { terminatedContainers := []string{} for _, cs := range status.ContainerStatuses { @@ -4244,9 +4214,9 @@ func TestGetPodCreationFailureReason(t *testing.T) { t.Errorf("unexpected error: %v", err) } - status, err := kubelet.getPodStatus(kubecontainer.GetPodFullName(pod)) - if err != nil { - t.Errorf("unexpected error %v", err) + status, found := kubelet.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !found { + t.Fatalf("unexpected error %v", err) } if len(status.ContainerStatuses) < 1 { t.Errorf("expected 1 container status, got %d", len(status.ContainerStatuses)) @@ -4310,9 +4280,9 @@ func TestGetPodPullImageFailureReason(t *testing.T) { t.Errorf("unexpected error: %v", err) } - status, err := kubelet.getPodStatus(kubecontainer.GetPodFullName(pod)) - if err != nil { - t.Errorf("unexpected error %v", err) + status, found := kubelet.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !found { + t.Errorf("expected status of pod %q to be found", kubecontainer.GetPodFullName(pod)) } if len(status.ContainerStatuses) < 1 { t.Errorf("expected 1 container status, got %d", len(status.ContainerStatuses)) @@ -4506,7 +4476,7 @@ func TestMakePortMappings(t *testing.T) { } } -func TestFilterOutPodsPastActiveDeadline(t *testing.T) { +func TestIsPodPastActiveDeadline(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet pods := newTestPods(5) @@ -4519,23 +4489,21 @@ func TestFilterOutPodsPastActiveDeadline(t *testing.T) { pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds pods[1].Status.StartTime = &startTime pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds - expected := []*api.Pod{pods[1], pods[2], pods[3], pods[4]} + tests := []struct { + pod *api.Pod + expected bool + }{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}} + kubelet.podManager.SetPods(pods) - actual := kubelet.filterOutPodsPastActiveDeadline(pods) - if !reflect.DeepEqual(expected, actual) { - expectedNames := "" - for _, pod := range expected { - expectedNames = expectedNames + pod.Name + " " + for i, tt := range tests { + actual := kubelet.pastActiveDeadline(tt.pod) + if actual != tt.expected { + t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual) } - actualNames := "" - for _, pod := range actual { - actualNames = actualNames + pod.Name + " " - } - t.Errorf("expected %#v, got %#v", expectedNames, actualNames) } } -func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) { +func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet @@ -4593,27 +4561,22 @@ func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) { }, } + // Let the pod worker sets the status to fail after this sync. err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - - verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "stop", "inspect_container", "stop", "list"}) - - // A map iteration is used to delete containers, so must not depend on - // order here. - expectedToStop := map[string]bool{ - "1234": true, - "9876": true, + podFullName := kubecontainer.GetPodFullName(pods[0]) + status, found := kubelet.statusManager.GetPodStatus(podFullName) + if !found { + t.Errorf("expected to found status for pod %q", status) } - if len(fakeDocker.Stopped) != 2 || - !expectedToStop[fakeDocker.Stopped[0]] || - !expectedToStop[fakeDocker.Stopped[1]] { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase) } } -func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) { +func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet @@ -4676,14 +4639,14 @@ func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", - // Get pod status. - "inspect_container", "inspect_container", - // Check the pod infra container. - "inspect_container", - // Get pod status. - "list", "inspect_container", "inspect_container", "list"}) + podFullName := kubecontainer.GetPodFullName(pods[0]) + status, found := kubelet.statusManager.GetPodStatus(podFullName) + if !found { + t.Errorf("expected to found status for pod %q", status) + } + if status.Phase == api.PodFailed { + t.Fatalf("expected pod status to not be %q", status.Phase) + } } func TestDeletePodDirsForDeletedPods(t *testing.T) { diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 92a3a449f1b..4df39d7c086 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -81,6 +81,9 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } // if the status has no start time, we need to set an initial time + // TODO(yujuhong): Consider setting StartTime when generating the pod + // status instead, which would allow statusManager to become a simple cache + // again. if status.StartTime.IsZero() { if pod.Status.StartTime.IsZero() { // the pod did not have a previously recorded value so set to now