From 0166d446b9c631f5515b4545ef833753e96fe143 Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Wed, 6 Oct 2021 20:06:08 -0500 Subject: [PATCH 1/3] kubelet: set terminated podWorker status for terminated pods --- pkg/kubelet/pod_workers.go | 33 ++++++++++++++++++++++++++ pkg/kubelet/pod_workers_test.go | 42 +++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 090062b0d48..a866d0cfa6f 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" @@ -501,6 +502,22 @@ func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) return ok } +func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { + runningContainers := 0 + runningSandboxes := 0 + for _, container := range status.ContainerStatuses { + if container.State == kubecontainer.ContainerStateRunning { + runningContainers++ + } + } + for _, sb := range status.SandboxStatuses { + if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY { + runningSandboxes++ + } + } + return runningContainers == 0 && runningSandboxes == 0 +} + // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable, // terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is // discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet. @@ -536,6 +553,22 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { status = &podSyncStatus{ syncedAt: now, } + // if this pod is being synced for the first time, we need to make sure it is an active pod + if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) { + // check to see if the pod is not running and the pod is terminal. + // If this succeeds then record in the podWorker that it is terminated. + if statusCache, err := p.podCache.Get(pod.UID); err == nil { + if isPodStatusCacheTerminal(statusCache) { + status = &podSyncStatus{ + terminatedAt: now, + terminatingAt: now, + syncedAt: now, + startedTerminating: true, + finished: true, + } + } + } + } p.podSyncStatuses[uid] = status } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 1664d148333..91b36999593 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -140,6 +140,18 @@ func newPod(uid, name string) *v1.Pod { } } +func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Name: name, + }, + Status: v1.PodStatus{ + Phase: phase, + }, + } +} + // syncPodRecord is a record of a sync pod call type syncPodRecord struct { name string @@ -273,6 +285,36 @@ func TestUpdatePod(t *testing.T) { } } +func TestUpdatePodWithTerminatedPod(t *testing.T) { + podWorkers, _ := createPodWorkers() + terminatedPod := newPodWithPhase("0000-0000-0000", "done-pod", v1.PodSucceeded) + runningPod := &kubecontainer.Pod{ID: "0000-0000-0001", Name: "done-pod"} + pod := newPod("0000-0000-0002", "running-pod") + + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: terminatedPod, + UpdateType: kubetypes.SyncPodCreate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodCreate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + UpdateType: kubetypes.SyncPodKill, + RunningPod: runningPod, + }) + + if podWorkers.IsPodKnownTerminated(pod.UID) == true { + t.Errorf("podWorker state should not be terminated") + } + if podWorkers.IsPodKnownTerminated(terminatedPod.UID) == false { + t.Errorf("podWorker state should be terminated") + } + if podWorkers.IsPodKnownTerminated(runningPod.ID) == true { + t.Errorf("podWorker state should not be marked terminated for a running pod") + } +} + func TestUpdatePodForRuntimePod(t *testing.T) { podWorkers, processed := createPodWorkers() From c771698de32f5aab96933434b89527ff4cf20229 Mon Sep 17 00:00:00 2001 From: Elana Hashman Date: Thu, 7 Oct 2021 12:31:36 -0700 Subject: [PATCH 2/3] Add e2e test to verify kubelet restart behaviour Succeeded pods should not be counted as running on restart. --- test/e2e_node/restart_test.go | 72 ++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/test/e2e_node/restart_test.go b/test/e2e_node/restart_test.go index a628216bfde..ff17e3b4868 100644 --- a/test/e2e_node/restart_test.go +++ b/test/e2e_node/restart_test.go @@ -28,10 +28,12 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/test/e2e/framework" testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -49,7 +51,7 @@ func waitForPods(f *framework.Framework, podCount int, timeout time.Duration) (r runningPods = []*v1.Pod{} for _, pod := range podList.Items { - if r, err := testutils.PodRunningReady(&pod); err != nil || !r { + if r, err := testutils.PodRunningReadyOrSucceeded(&pod); err != nil || !r { continue } runningPods = append(runningPods, &pod) @@ -62,7 +64,7 @@ func waitForPods(f *framework.Framework, podCount int, timeout time.Duration) (r return runningPods } -var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive] [NodeFeature:ContainerRuntimeRestart]", func() { +var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive]", func() { const ( // Saturate the node. It's not necessary that all these pods enter // Running/Ready, because we don't know the number of cores in the @@ -145,4 +147,70 @@ var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive] [NodeFeature:Container }) }) }) + + ginkgo.Context("Kubelet", func() { + ginkgo.It("should correctly account for terminated pods after restart", func() { + node := getLocalNode(f) + cpus := node.Status.Allocatable[v1.ResourceCPU] + numCpus := int((&cpus).Value()) + if numCpus < 1 { + e2eskipper.Skipf("insufficient CPU available for kubelet restart test") + } + + // create as many restartNever pods as there are allocatable CPU + // nodes; if they are not correctly accounted for as terminated + // later, this will fill up all node capacity + podCountRestartNever := numCpus + ginkgo.By(fmt.Sprintf("creating %d RestartNever pods on node", podCountRestartNever)) + restartNeverPods := newTestPods(podCountRestartNever, false, imageutils.GetE2EImage(imageutils.BusyBox), "restart-kubelet-test") + for _, pod := range restartNeverPods { + pod.Spec.RestartPolicy = "Never" + pod.Spec.Containers[0].Command = []string{"echo", "hi"} + pod.Spec.Containers[0].Resources.Limits = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + } + } + createBatchPodWithRateControl(f, restartNeverPods, podCreationInterval) + defer deletePodsSync(f, restartNeverPods) + + completedPods := waitForPods(f, podCountRestartNever, time.Minute) + if len(completedPods) < podCountRestartNever { + framework.Failf("Failed to run sufficient restartNever pods, got %d but expected %d", len(completedPods), podCountRestartNever) + } + + podCountRestartAlways := (numCpus / 2) + 1 + ginkgo.By(fmt.Sprintf("creating %d RestartAlways pods on node", podCountRestartAlways)) + restartAlwaysPods := newTestPods(podCountRestartAlways, false, imageutils.GetPauseImageName(), "restart-kubelet-test") + for _, pod := range restartAlwaysPods { + pod.Spec.Containers[0].Resources.Limits = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1"), + } + } + createBatchPodWithRateControl(f, restartAlwaysPods, podCreationInterval) + defer deletePodsSync(f, restartAlwaysPods) + + numAllPods := podCountRestartNever + podCountRestartAlways + allPods := waitForPods(f, numAllPods, startTimeout) + if len(allPods) < numAllPods { + framework.Failf("Failed to run sufficient restartAlways pods, got %d but expected %d", len(allPods), numAllPods) + } + + ginkgo.By("killing and restarting kubelet") + // We want to kill the kubelet rather than a graceful restart + startKubelet := stopKubelet() + startKubelet() + + // If this test works correctly, each of these pods will exit + // with no issue. But if accounting breaks, pods scheduled after + // restart may think these old pods are consuming CPU and we + // will get an OutOfCpu error. + ginkgo.By("verifying restartNever pods succeed and restartAlways pods stay running") + for start := time.Now(); time.Since(start) < startTimeout; time.Sleep(10 * time.Second) { + postRestartRunningPods := waitForPods(f, numAllPods, time.Minute) + if len(postRestartRunningPods) < numAllPods { + framework.Failf("less pods are running after node restart, got %d but expected %d", len(postRestartRunningPods), numAllPods) + } + } + }) + }) }) From 3982fcae64928a890455d70711a8604befb6688e Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Thu, 7 Oct 2021 20:13:43 -0500 Subject: [PATCH 3/3] go fmt --- test/e2e_node/restart_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e_node/restart_test.go b/test/e2e_node/restart_test.go index ff17e3b4868..6262a741908 100644 --- a/test/e2e_node/restart_test.go +++ b/test/e2e_node/restart_test.go @@ -31,9 +31,9 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/test/e2e/framework" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" - e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "github.com/onsi/ginkgo" "github.com/onsi/gomega"