From 050b8ba60bd45eb30c750fed84a4d11d43888a2f Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 15 May 2015 17:01:56 -0700 Subject: [PATCH] Kubelet: move active deadline check to per pod worker Per-pod workers have sufficient knowledge to determine whether a pod has exceeded the active deadline, and they set the status at the end of each sync. Move the active deadline check to generatePodStatus so that per pod workers can update the pod status directly. This eliminates the possibility of a race condition where both SyncPods and the pod worker are updating the status, which could lead to temporary erratic pod status behavior (pod phase: failed -> running -> failed). --- pkg/kubelet/kubelet.go | 64 +++++++++++++++++------------------ pkg/kubelet/kubelet_test.go | 61 +++++++++++++++------------------ pkg/kubelet/status_manager.go | 3 ++ 3 files changed, 62 insertions(+), 66 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 28b44fb22b2..cb9b66e1f81 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1151,37 +1151,33 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } -// filterOutPodsPastActiveDeadline filters pods with an ActiveDeadlineSeconds value that has been exceeded. -// It records an event that the pod has been active longer than the allocated time, and updates the pod status as failed. -// By filtering the pod from the result set, the Kubelet will kill the pod's containers as part of normal SyncPods workflow. -func (kl *Kubelet) filterOutPodsPastActiveDeadline(allPods []*api.Pod) (pods []*api.Pod) { +// pastActiveDeadline returns true if the pod has been active for more than +// ActiveDeadlineSeconds. +func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { now := util.Now() - for _, pod := range allPods { - keepPod := true - if pod.Spec.ActiveDeadlineSeconds != nil { - podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) - if !ok { - podStatus = pod.Status - } - if !podStatus.StartTime.IsZero() { - startTime := podStatus.StartTime.Time - duration := now.Time.Sub(startTime) - allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second - if duration >= allowedDuration { - keepPod = false - } - } + if pod.Spec.ActiveDeadlineSeconds != nil { + podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !ok { + podStatus = pod.Status } - if keepPod { - pods = append(pods, pod) - } else { - kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Message: "Pod was active on the node longer than specified deadline"}) + if !podStatus.StartTime.IsZero() { + startTime := podStatus.StartTime.Time + duration := now.Time.Sub(startTime) + allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second + if duration >= allowedDuration { + return true + } } } - return pods + return false +} + +//podIsTerminated returns true if status is in one of the terminated state. +func podIsTerminated(status *api.PodStatus) bool { + if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { + return true + } + return false } // Filter out pods in the terminated state ("Failed" or "Succeeded"). @@ -1197,8 +1193,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { // restarted. status = pod.Status } - if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { - // Pod has reached the final state; ignore it. + if podIsTerminated(&status) { continue } pods = append(pods, pod) @@ -1487,8 +1482,6 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metr // These two conditions could be alleviated by checkpointing kubelet. pods := kl.filterOutTerminatedPods(allPods) - pods = kl.filterOutPodsPastActiveDeadline(pods) - // Respect the pod creation order when resolving conflicts. sort.Sort(podsByCreationTime(pods)) @@ -1922,8 +1915,15 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { podFullName := kubecontainer.GetPodFullName(pod) glog.V(3).Infof("Generating status for %q", podFullName) - spec := &pod.Spec + // TODO: Consider include the container information. + if kl.pastActiveDeadline(pod) { + kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline") + return api.PodStatus{ + Phase: api.PodFailed, + Message: "Pod was active on the node longer than specified deadline"}, nil + } + spec := &pod.Spec podStatus, err := kl.containerRuntime.GetPodStatus(pod) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4f94dfe89ba..5769e9c12b7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -4472,7 +4472,7 @@ func TestMakePortMappings(t *testing.T) { } } -func TestFilterOutPodsPastActiveDeadline(t *testing.T) { +func TestIsPodPastActiveDeadline(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet pods := newTestPods(5) @@ -4485,23 +4485,21 @@ func TestFilterOutPodsPastActiveDeadline(t *testing.T) { pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds pods[1].Status.StartTime = &startTime pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds - expected := []*api.Pod{pods[1], pods[2], pods[3], pods[4]} + tests := []struct { + pod *api.Pod + expected bool + }{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}} + kubelet.podManager.SetPods(pods) - actual := kubelet.filterOutPodsPastActiveDeadline(pods) - if !reflect.DeepEqual(expected, actual) { - expectedNames := "" - for _, pod := range expected { - expectedNames = expectedNames + pod.Name + " " + for i, tt := range tests { + actual := kubelet.pastActiveDeadline(tt.pod) + if actual != tt.expected { + t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual) } - actualNames := "" - for _, pod := range actual { - actualNames = actualNames + pod.Name + " " - } - t.Errorf("expected %#v, got %#v", expectedNames, actualNames) } } -func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) { +func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet @@ -4559,27 +4557,22 @@ func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) { }, } + // Let the pod worker sets the status to fail after this sync. err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - - verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "stop", "inspect_container", "stop", "list"}) - - // A map iteration is used to delete containers, so must not depend on - // order here. - expectedToStop := map[string]bool{ - "1234": true, - "9876": true, + podFullName := kubecontainer.GetPodFullName(pods[0]) + status, found := kubelet.statusManager.GetPodStatus(podFullName) + if !found { + t.Errorf("expected to found status for pod %q", status) } - if len(fakeDocker.Stopped) != 2 || - !expectedToStop[fakeDocker.Stopped[0]] || - !expectedToStop[fakeDocker.Stopped[1]] { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase) } } -func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) { +func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet @@ -4642,12 +4635,12 @@ func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", - // Get pod status. - "inspect_container", "inspect_container", - // Check the pod infra container. - "inspect_container", - // Get pod status. - "list", "inspect_container", "inspect_container", "list"}) + podFullName := kubecontainer.GetPodFullName(pods[0]) + status, found := kubelet.statusManager.GetPodStatus(podFullName) + if !found { + t.Errorf("expected to found status for pod %q", status) + } + if status.Phase == api.PodFailed { + t.Fatalf("expected pod status to not be %q", status.Phase) + } } diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 92a3a449f1b..4df39d7c086 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -81,6 +81,9 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } // if the status has no start time, we need to set an initial time + // TODO(yujuhong): Consider setting StartTime when generating the pod + // status instead, which would allow statusManager to become a simple cache + // again. if status.StartTime.IsZero() { if pod.Status.StartTime.IsZero() { // the pod did not have a previously recorded value so set to now