diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 3fcc69c7cc0..e5805dbcf78 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -775,16 +775,23 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { } // if this pod is being synced for the first time, we need to make sure it is an active pod if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) { - // check to see if the pod is not running and the pod is terminal. - // If this succeeds then record in the podWorker that it is terminated. + // Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated. + // This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523 + // However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set. if statusCache, err := p.podCache.Get(uid); err == nil { if isPodStatusCacheTerminal(statusCache) { + // At this point we know: + // (1) The pod is terminal based on the config source. + // (2) The pod is terminal based on the runtime cache. + // This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart. + // These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod. + // As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run. status = &podSyncStatus{ terminatedAt: now, terminatingAt: now, syncedAt: now, startedTerminating: true, - finished: true, + finished: false, fullname: kubecontainer.BuildPodFullName(name, ns), } } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 3fcd2465c14..8fca3c255a8 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -754,23 +754,41 @@ func TestUpdatePod(t *testing.T) { expectKnownTerminated: true, }, { - name: "a pod that is terminal and has never started is finished immediately if the runtime has a cached terminal state", + name: "a pod that is terminal and has never started advances to finished if the runtime has a cached terminal state", update: UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), }, runtimeStatus: &kubecontainer.PodStatus{ /* we know about this pod */ }, - expect: &podSyncStatus{ + expectBeforeWorker: &podSyncStatus{ + fullname: "done-pod_ns", + syncedAt: time.Unix(1, 0), + terminatingAt: time.Unix(1, 0), + terminatedAt: time.Unix(1, 0), + pendingUpdate: &UpdatePodOptions{ + UpdateType: kubetypes.SyncPodCreate, + Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), + }, + finished: false, // Should be marked as not finished initially (to ensure `SyncTerminatedPod` will run) and status will progress to terminated. + startedTerminating: true, + working: true, + }, + expect: hasContext(&podSyncStatus{ fullname: "done-pod_ns", syncedAt: time.Unix(1, 0), terminatingAt: time.Unix(1, 0), terminatedAt: time.Unix(1, 0), + startedAt: time.Unix(3, 0), startedTerminating: true, finished: true, + activeUpdate: &UpdatePodOptions{ + UpdateType: kubetypes.SyncPodSync, + Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), + }, // if we have never seen the pod before, a restart makes no sense restartRequested: false, - }, + }), expectKnownTerminated: true, }, { diff --git a/test/e2e_node/restart_test.go b/test/e2e_node/restart_test.go index 96d90252f97..9157e3e69e3 100644 --- a/test/e2e_node/restart_test.go +++ b/test/e2e_node/restart_test.go @@ -28,7 +28,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" testutils "k8s.io/kubernetes/test/utils" @@ -37,6 +41,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/util/uuid" ) type podCondition func(pod *v1.Pod) (bool, error) @@ -265,5 +270,117 @@ var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive]", func() { } } }) + // Regression test for https://issues.k8s.io/116925 + ginkgo.It("should delete pods which are marked as terminal and have a deletion timestamp set after restart", func(ctx context.Context) { + podName := "terminal-restart-pod" + string(uuid.NewUUID()) + gracePeriod := int64(30) + podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &gracePeriod, + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: podName, + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{"sh", "-c"}, + Args: []string{` + sleep 9999999 & + PID=$! + + _term () { + kill $PID + echo "Caught SIGTERM!" + } + + trap _term SIGTERM + wait $PID + trap - TERM + + # Wait for the long running sleep to exit + wait $PID + + exit 0 + `, + }, + }, + }, + }, + }) + ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podName, podSpec.Spec.RestartPolicy)) + pod := e2epod.NewPodClient(f).Create(ctx, podSpec) + + ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be running", f.Namespace.Name, pod.Name)) + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be running: (%v/%v)", f.Namespace.Name, pod.Name) + + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(ctx, options) + }, + } + + podsList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err, "Failed to list pods in namespace: %s", f.Namespace.Name) + + ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name)) + time.Sleep(time.Second) + err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}) + framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name) + + ctxUntil, cancel := context.WithTimeout(ctx, f.Timeouts.PodStart) + defer cancel() + + ginkgo.By(fmt.Sprintf("Started watch for pod (%v/%v) to enter succeeded phase", pod.Namespace, pod.Name)) + _, err = watchtools.Until(ctxUntil, podsList.ResourceVersion, w, func(event watch.Event) (bool, error) { + if pod, ok := event.Object.(*v1.Pod); ok { + found := pod.ObjectMeta.Name == podName && + pod.ObjectMeta.Namespace == f.Namespace.Name && + pod.Status.Phase == v1.PodSucceeded + if !found { + ginkgo.By(fmt.Sprintf("Observed Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase)) + return false, nil + } + ginkgo.By(fmt.Sprintf("Found Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase)) + return found, nil + } + ginkgo.By(fmt.Sprintf("Observed event: %+v", event.Object)) + return false, nil + }) + ginkgo.By("Ended watch for pod entering succeeded phase") + framework.ExpectNoError(err, "failed to see event that pod (%s/%s) enter succeeded phase: %v", pod.Namespace, pod.Name, err) + + // As soon as the pod enters succeeded phase (detected by the watch above); kill the kubelet. + // This is a bit racy, but the goal is to stop the kubelet before the kubelet is able to delete the pod from the API-sever in order to repro https://issues.k8s.io/116925 + ginkgo.By("Stopping the kubelet") + startKubelet := stopKubelet() + // wait until the kubelet health check will fail + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse()) + + ginkgo.By("Starting the kubelet") + startKubelet() + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue()) + + // Wait for the Kubelet to be ready. + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, f.Timeouts.Poll).Should(gomega.BeTrue()) + + ginkgo.By(fmt.Sprintf("After the kubelet is restarted, verify the pod (%s/%s) is deleted by kubelet", pod.Namespace, pod.Name)) + gomega.Eventually(ctx, func(ctx context.Context) error { + return checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace) + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil()) + }) }) + })