From 25668ccc11214fcbff2843c2079ee0965692aa4c Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 15 May 2015 15:30:28 -0700 Subject: [PATCH 1/2] Kubelet: remove the getPodstatus method Pod statuses are periodically writtien to the status manager, and status manager sets the start time of the pod. All non-status-modifying code should perform cache lookup and should not attempt to generate pod status on its own. --- pkg/kubelet/kubelet.go | 25 +++--------- pkg/kubelet/kubelet_test.go | 76 +++++++++++-------------------------- 2 files changed, 29 insertions(+), 72 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0fa54ed89cc..28b44fb22b2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1587,9 +1587,12 @@ func (kl *Kubelet) validateContainerStatus(podStatus *api.PodStatus, containerNa // or all of them. func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow, previous bool, stdout, stderr io.Writer) error { // TODO(vmarmol): Refactor to not need the pod status and verification. - podStatus, err := kl.getPodStatus(podFullName) - if err != nil { - return fmt.Errorf("failed to get status for pod %q - %v", podFullName, err) + // Pod workers periodically write status to statusManager. If status is not + // cached there, something is wrong (or kubelet just restarted and hasn't + // caught up yet). Just assume the pod is not ready yet. + podStatus, found := kl.statusManager.GetPodStatus(podFullName) + if !found { + return fmt.Errorf("failed to get status for pod %q", podFullName) } if err := kl.validatePodPhase(&podStatus); err != nil { // No log is available if pod is not in a "known" phase (e.g. Unknown). @@ -1913,22 +1916,6 @@ func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []a return ready } -// getPodStatus returns information of the containers in the pod from the -// container runtime. -func (kl *Kubelet) getPodStatus(podFullName string) (api.PodStatus, error) { - // Check to see if we have a cached version of the status. - cachedPodStatus, found := kl.statusManager.GetPodStatus(podFullName) - if found { - glog.V(3).Infof("Returning cached status for %q", podFullName) - return cachedPodStatus, nil - } - pod, found := kl.GetPodByFullName(podFullName) - if !found { - return api.PodStatus{}, fmt.Errorf("couldn't find pod %q", podFullName) - } - return kl.generatePodStatus(pod) -} - // By passing the pod directly, this method avoids pod lookup, which requires // grabbing a lock. func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 787dbc56821..4f94dfe89ba 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3010,19 +3010,9 @@ func TestHandlePortConflicts(t *testing.T) { kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, err := kl.getPodStatus(conflictedPodName) - if err != nil { - t.Fatalf("status of pod %q is not found in the status map: %#v", conflictedPodName, err) - } - if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) - } - - // Check if we can retrieve the pod status from GetPodStatus(). - kl.podManager.SetPods(pods) - status, err = kl.getPodStatus(conflictedPodName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", conflictedPodName, err) + status, found := kl.statusManager.GetPodStatus(conflictedPodName) + if !found { + t.Fatalf("status of pod %q is not found in the status map", conflictedPodName) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -3062,19 +3052,9 @@ func TestHandleNodeSelector(t *testing.T) { kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, err := kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err) - } - if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) - } - - // Check if we can retrieve the pod status from GetPodStatus(). - kl.podManager.SetPods(pods) - status, err = kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err) + status, found := kl.statusManager.GetPodStatus(notfittingPodName) + if !found { + t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -3120,19 +3100,9 @@ func TestHandleMemExceeded(t *testing.T) { kl.handleNotFittingPods(pods) // Check pod status stored in the status map. - status, err := kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("status of pod %q is not found in the status map: %#v", notfittingPodName, err) - } - if status.Phase != api.PodFailed { - t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) - } - - // Check if we can retrieve the pod status from GetPodStatus(). - kl.podManager.SetPods(pods) - status, err = kl.getPodStatus(notfittingPodName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", notfittingPodName, err) + status, found := kl.statusManager.GetPodStatus(notfittingPodName) + if !found { + t.Fatalf("status of pod %q is not found in the status map", notfittingPodName) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) @@ -3153,13 +3123,13 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } // Run once to populate the status map. kl.handleNotFittingPods(pods) - if _, err := kl.getPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err != nil { - t.Fatalf("expected to have status cached for %q: %v", "pod2", err) + if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); !found { + t.Fatalf("expected to have status cached for pod2") } // Sync with empty pods so that the entry in status map will be removed. kl.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) - if _, err := kl.getPodStatus(kubecontainer.BuildPodFullName("pod2", "")); err == nil { - t.Fatalf("expected to not have status cached for %q: %v", "pod2", err) + if _, found := kl.statusManager.GetPodStatus(kubecontainer.BuildPodFullName("pod2", "")); found { + t.Fatalf("expected to not have status cached for pod2") } } @@ -4165,11 +4135,11 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { t.Errorf("%d: unexpected error: %v", i, err) } - // Check if we can retrieve the pod status from GetPodStatus(). + // Check if we can retrieve the pod status. podName := kubecontainer.GetPodFullName(pods[0]) - status, err := kubelet.getPodStatus(podName) - if err != nil { - t.Fatalf("unable to retrieve pod status for pod %q: %#v.", podName, err) + status, found := kubelet.statusManager.GetPodStatus(podName) + if !found { + t.Fatalf("unable to retrieve pod status for pod %q.", podName) } else { terminatedContainers := []string{} for _, cs := range status.ContainerStatuses { @@ -4240,9 +4210,9 @@ func TestGetPodCreationFailureReason(t *testing.T) { t.Errorf("unexpected error: %v", err) } - status, err := kubelet.getPodStatus(kubecontainer.GetPodFullName(pod)) - if err != nil { - t.Errorf("unexpected error %v", err) + status, found := kubelet.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !found { + t.Fatalf("unexpected error %v", err) } if len(status.ContainerStatuses) < 1 { t.Errorf("expected 1 container status, got %d", len(status.ContainerStatuses)) @@ -4306,9 +4276,9 @@ func TestGetPodPullImageFailureReason(t *testing.T) { t.Errorf("unexpected error: %v", err) } - status, err := kubelet.getPodStatus(kubecontainer.GetPodFullName(pod)) - if err != nil { - t.Errorf("unexpected error %v", err) + status, found := kubelet.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !found { + t.Errorf("expected status of pod %q to be found", kubecontainer.GetPodFullName(pod)) } if len(status.ContainerStatuses) < 1 { t.Errorf("expected 1 container status, got %d", len(status.ContainerStatuses)) From 050b8ba60bd45eb30c750fed84a4d11d43888a2f Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 15 May 2015 17:01:56 -0700 Subject: [PATCH 2/2] Kubelet: move active deadline check to per pod worker Per-pod workers have sufficient knowledge to determine whether a pod has exceeded the active deadline, and they set the status at the end of each sync. Move the active deadline check to generatePodStatus so that per pod workers can update the pod status directly. This eliminates the possibility of a race condition where both SyncPods and the pod worker are updating the status, which could lead to temporary erratic pod status behavior (pod phase: failed -> running -> failed). --- pkg/kubelet/kubelet.go | 64 +++++++++++++++++------------------ pkg/kubelet/kubelet_test.go | 61 +++++++++++++++------------------ pkg/kubelet/status_manager.go | 3 ++ 3 files changed, 62 insertions(+), 66 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 28b44fb22b2..cb9b66e1f81 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1151,37 +1151,33 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } -// filterOutPodsPastActiveDeadline filters pods with an ActiveDeadlineSeconds value that has been exceeded. -// It records an event that the pod has been active longer than the allocated time, and updates the pod status as failed. -// By filtering the pod from the result set, the Kubelet will kill the pod's containers as part of normal SyncPods workflow. -func (kl *Kubelet) filterOutPodsPastActiveDeadline(allPods []*api.Pod) (pods []*api.Pod) { +// pastActiveDeadline returns true if the pod has been active for more than +// ActiveDeadlineSeconds. +func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { now := util.Now() - for _, pod := range allPods { - keepPod := true - if pod.Spec.ActiveDeadlineSeconds != nil { - podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) - if !ok { - podStatus = pod.Status - } - if !podStatus.StartTime.IsZero() { - startTime := podStatus.StartTime.Time - duration := now.Time.Sub(startTime) - allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second - if duration >= allowedDuration { - keepPod = false - } - } + if pod.Spec.ActiveDeadlineSeconds != nil { + podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod)) + if !ok { + podStatus = pod.Status } - if keepPod { - pods = append(pods, pod) - } else { - kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline") - kl.statusManager.SetPodStatus(pod, api.PodStatus{ - Phase: api.PodFailed, - Message: "Pod was active on the node longer than specified deadline"}) + if !podStatus.StartTime.IsZero() { + startTime := podStatus.StartTime.Time + duration := now.Time.Sub(startTime) + allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second + if duration >= allowedDuration { + return true + } } } - return pods + return false +} + +//podIsTerminated returns true if status is in one of the terminated state. +func podIsTerminated(status *api.PodStatus) bool { + if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { + return true + } + return false } // Filter out pods in the terminated state ("Failed" or "Succeeded"). @@ -1197,8 +1193,7 @@ func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod { // restarted. status = pod.Status } - if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded { - // Pod has reached the final state; ignore it. + if podIsTerminated(&status) { continue } pods = append(pods, pod) @@ -1487,8 +1482,6 @@ func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]metr // These two conditions could be alleviated by checkpointing kubelet. pods := kl.filterOutTerminatedPods(allPods) - pods = kl.filterOutPodsPastActiveDeadline(pods) - // Respect the pod creation order when resolving conflicts. sort.Sort(podsByCreationTime(pods)) @@ -1922,8 +1915,15 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) { podFullName := kubecontainer.GetPodFullName(pod) glog.V(3).Infof("Generating status for %q", podFullName) - spec := &pod.Spec + // TODO: Consider include the container information. + if kl.pastActiveDeadline(pod) { + kl.recorder.Eventf(pod, "deadline", "Pod was active on the node longer than specified deadline") + return api.PodStatus{ + Phase: api.PodFailed, + Message: "Pod was active on the node longer than specified deadline"}, nil + } + spec := &pod.Spec podStatus, err := kl.containerRuntime.GetPodStatus(pod) if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4f94dfe89ba..5769e9c12b7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -4472,7 +4472,7 @@ func TestMakePortMappings(t *testing.T) { } } -func TestFilterOutPodsPastActiveDeadline(t *testing.T) { +func TestIsPodPastActiveDeadline(t *testing.T) { testKubelet := newTestKubelet(t) kubelet := testKubelet.kubelet pods := newTestPods(5) @@ -4485,23 +4485,21 @@ func TestFilterOutPodsPastActiveDeadline(t *testing.T) { pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds pods[1].Status.StartTime = &startTime pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds - expected := []*api.Pod{pods[1], pods[2], pods[3], pods[4]} + tests := []struct { + pod *api.Pod + expected bool + }{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}} + kubelet.podManager.SetPods(pods) - actual := kubelet.filterOutPodsPastActiveDeadline(pods) - if !reflect.DeepEqual(expected, actual) { - expectedNames := "" - for _, pod := range expected { - expectedNames = expectedNames + pod.Name + " " + for i, tt := range tests { + actual := kubelet.pastActiveDeadline(tt.pod) + if actual != tt.expected { + t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual) } - actualNames := "" - for _, pod := range actual { - actualNames = actualNames + pod.Name + " " - } - t.Errorf("expected %#v, got %#v", expectedNames, actualNames) } } -func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) { +func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet @@ -4559,27 +4557,22 @@ func TestSyncPodsDeletesPodsThatRunTooLong(t *testing.T) { }, } + // Let the pod worker sets the status to fail after this sync. err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - - verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "stop", "inspect_container", "stop", "list"}) - - // A map iteration is used to delete containers, so must not depend on - // order here. - expectedToStop := map[string]bool{ - "1234": true, - "9876": true, + podFullName := kubecontainer.GetPodFullName(pods[0]) + status, found := kubelet.statusManager.GetPodStatus(podFullName) + if !found { + t.Errorf("expected to found status for pod %q", status) } - if len(fakeDocker.Stopped) != 2 || - !expectedToStop[fakeDocker.Stopped[0]] || - !expectedToStop[fakeDocker.Stopped[1]] { - t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase) } } -func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) { +func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet @@ -4642,12 +4635,12 @@ func TestSyncPodsDoesNotDeletePodsThatRunTooLong(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", - // Get pod status. - "inspect_container", "inspect_container", - // Check the pod infra container. - "inspect_container", - // Get pod status. - "list", "inspect_container", "inspect_container", "list"}) + podFullName := kubecontainer.GetPodFullName(pods[0]) + status, found := kubelet.statusManager.GetPodStatus(podFullName) + if !found { + t.Errorf("expected to found status for pod %q", status) + } + if status.Phase == api.PodFailed { + t.Fatalf("expected pod status to not be %q", status.Phase) + } } diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 92a3a449f1b..4df39d7c086 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -81,6 +81,9 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } // if the status has no start time, we need to set an initial time + // TODO(yujuhong): Consider setting StartTime when generating the pod + // status instead, which would allow statusManager to become a simple cache + // again. if status.StartTime.IsZero() { if pod.Status.StartTime.IsZero() { // the pod did not have a previously recorded value so set to now