From 0166d446b9c631f5515b4545ef833753e96fe143 Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Wed, 6 Oct 2021 20:06:08 -0500 Subject: [PATCH] kubelet: set terminated podWorker status for terminated pods --- pkg/kubelet/pod_workers.go | 33 ++++++++++++++++++++++++++ pkg/kubelet/pod_workers_test.go | 42 +++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 090062b0d48..a866d0cfa6f 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" @@ -501,6 +502,22 @@ func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) return ok } +func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { + runningContainers := 0 + runningSandboxes := 0 + for _, container := range status.ContainerStatuses { + if container.State == kubecontainer.ContainerStateRunning { + runningContainers++ + } + } + for _, sb := range status.SandboxStatuses { + if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY { + runningSandboxes++ + } + } + return runningContainers == 0 && runningSandboxes == 0 +} + // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable, // terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is // discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet. @@ -536,6 +553,22 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { status = &podSyncStatus{ syncedAt: now, } + // if this pod is being synced for the first time, we need to make sure it is an active pod + if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) { + // check to see if the pod is not running and the pod is terminal. + // If this succeeds then record in the podWorker that it is terminated. + if statusCache, err := p.podCache.Get(pod.UID); err == nil { + if isPodStatusCacheTerminal(statusCache) { + status = &podSyncStatus{ + terminatedAt: now, + terminatingAt: now, + syncedAt: now, + startedTerminating: true, + finished: true, + } + } + } + } p.podSyncStatuses[uid] = status } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 1664d148333..91b36999593 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -140,6 +140,18 @@ func newPod(uid, name string) *v1.Pod { } } +func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Name: name, + }, + Status: v1.PodStatus{ + Phase: phase, + }, + } +} + // syncPodRecord is a record of a sync pod call type syncPodRecord struct { name string @@ -273,6 +285,36 @@ func TestUpdatePod(t *testing.T) { } } +func TestUpdatePodWithTerminatedPod(t *testing.T) { + podWorkers, _ := createPodWorkers() + terminatedPod := newPodWithPhase("0000-0000-0000", "done-pod", v1.PodSucceeded) + runningPod := &kubecontainer.Pod{ID: "0000-0000-0001", Name: "done-pod"} + pod := newPod("0000-0000-0002", "running-pod") + + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: terminatedPod, + UpdateType: kubetypes.SyncPodCreate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodCreate, + }) + podWorkers.UpdatePod(UpdatePodOptions{ + UpdateType: kubetypes.SyncPodKill, + RunningPod: runningPod, + }) + + if podWorkers.IsPodKnownTerminated(pod.UID) == true { + t.Errorf("podWorker state should not be terminated") + } + if podWorkers.IsPodKnownTerminated(terminatedPod.UID) == false { + t.Errorf("podWorker state should be terminated") + } + if podWorkers.IsPodKnownTerminated(runningPod.ID) == true { + t.Errorf("podWorker state should not be marked terminated for a running pod") + } +} + func TestUpdatePodForRuntimePod(t *testing.T) { podWorkers, processed := createPodWorkers()