From 0020631985ebed3ae81e6aa307b0d65760798af3 Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 29 Mar 2023 20:48:29 -0700 Subject: [PATCH 1/2] test: Add node e2e to restart kubelet while pod is terminating Add a regression test for https://issues.k8s.io/116925. The test exercises the following: 1) Start a restart never pod which will exit with `v1.PodSucceeded` phase. 2) Start a graceful deletion of the pod (set a deletion timestamp) 3) Restart the kubelet as soon as the kubelet reports the pod is terminal (but before the pod is deleted). 4) Verify that after kubelet restart, the pod is deleted. As of v1.27, there is a delay between the pod being marked terminal phaes, and the status manager deleting the pod. If the kubelet is restarted in the middle, after starting up again, the kubelet needs to ensure the pod will be deleted on the API server. Signed-off-by: David Porter --- test/e2e_node/restart_test.go | 117 ++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/test/e2e_node/restart_test.go b/test/e2e_node/restart_test.go index 96d90252f97..9157e3e69e3 100644 --- a/test/e2e_node/restart_test.go +++ b/test/e2e_node/restart_test.go @@ -28,7 +28,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" testutils "k8s.io/kubernetes/test/utils" @@ -37,6 +41,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/util/uuid" ) type podCondition func(pod *v1.Pod) (bool, error) @@ -265,5 +270,117 @@ var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive]", func() { } } }) + // Regression test for https://issues.k8s.io/116925 + ginkgo.It("should delete pods which are marked as terminal and have a deletion timestamp set after restart", func(ctx context.Context) { + podName := "terminal-restart-pod" + string(uuid.NewUUID()) + gracePeriod := int64(30) + podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &gracePeriod, + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: podName, + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{"sh", "-c"}, + Args: []string{` + sleep 9999999 & + PID=$! + + _term () { + kill $PID + echo "Caught SIGTERM!" + } + + trap _term SIGTERM + wait $PID + trap - TERM + + # Wait for the long running sleep to exit + wait $PID + + exit 0 + `, + }, + }, + }, + }, + }) + ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podName, podSpec.Spec.RestartPolicy)) + pod := e2epod.NewPodClient(f).Create(ctx, podSpec) + + ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be running", f.Namespace.Name, pod.Name)) + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name) + framework.ExpectNoError(err, "Failed to await for the pod to be running: (%v/%v)", f.Namespace.Name, pod.Name) + + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(ctx, options) + }, + } + + podsList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err, "Failed to list pods in namespace: %s", f.Namespace.Name) + + ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name)) + time.Sleep(time.Second) + err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}) + framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name) + + ctxUntil, cancel := context.WithTimeout(ctx, f.Timeouts.PodStart) + defer cancel() + + ginkgo.By(fmt.Sprintf("Started watch for pod (%v/%v) to enter succeeded phase", pod.Namespace, pod.Name)) + _, err = watchtools.Until(ctxUntil, podsList.ResourceVersion, w, func(event watch.Event) (bool, error) { + if pod, ok := event.Object.(*v1.Pod); ok { + found := pod.ObjectMeta.Name == podName && + pod.ObjectMeta.Namespace == f.Namespace.Name && + pod.Status.Phase == v1.PodSucceeded + if !found { + ginkgo.By(fmt.Sprintf("Observed Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase)) + return false, nil + } + ginkgo.By(fmt.Sprintf("Found Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase)) + return found, nil + } + ginkgo.By(fmt.Sprintf("Observed event: %+v", event.Object)) + return false, nil + }) + ginkgo.By("Ended watch for pod entering succeeded phase") + framework.ExpectNoError(err, "failed to see event that pod (%s/%s) enter succeeded phase: %v", pod.Namespace, pod.Name, err) + + // As soon as the pod enters succeeded phase (detected by the watch above); kill the kubelet. + // This is a bit racy, but the goal is to stop the kubelet before the kubelet is able to delete the pod from the API-sever in order to repro https://issues.k8s.io/116925 + ginkgo.By("Stopping the kubelet") + startKubelet := stopKubelet() + // wait until the kubelet health check will fail + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse()) + + ginkgo.By("Starting the kubelet") + startKubelet() + + // wait until the kubelet health check will succeed + gomega.Eventually(ctx, func() bool { + return kubeletHealthCheck(kubeletHealthCheckURL) + }, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue()) + + // Wait for the Kubelet to be ready. + gomega.Eventually(ctx, func(ctx context.Context) bool { + nodes, err := e2enode.TotalReady(ctx, f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, f.Timeouts.Poll).Should(gomega.BeTrue()) + + ginkgo.By(fmt.Sprintf("After the kubelet is restarted, verify the pod (%s/%s) is deleted by kubelet", pod.Namespace, pod.Name)) + gomega.Eventually(ctx, func(ctx context.Context) error { + return checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace) + }, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil()) + }) }) + }) From d04d7ffa6ec16474e849abe02ab55f49536884b7 Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 31 Mar 2023 01:53:11 -0700 Subject: [PATCH 2/2] kubelet: Mark new terminal pods as non-finished in pod worker The pod worker may recieve a new pod which is marked as terminal in the runtime cache. This can occur if a pod is marked as terminal and the kubelet is restarted. The kubelet needs to drive these pods through the termination state machine. If upon restart, the kubelet receives a pod which is terminal based on runtime cache, it indicates that pod finished `SyncTerminatingPod`, but it did not complete `SyncTerminatedPod`. The pod worker needs ensure that `SyncTerminatedPod` will run on these pods. To accomplish this, set `finished=False`, on the pod sync status, to drive the pod through the rest of the state machine. This will ensure that status manager and other kubelet subcomponents (e.g. volume manager), will be aware of this pod and properly cleanup all of the resources of the pod after the kubelet is restarted. While making change, also update the comments to provide a bit more background around why the kubelet needs to read the runtime pod cache for newly synced terminal pods. Signed-off-by: David Porter --- pkg/kubelet/pod_workers.go | 13 ++++++++++--- pkg/kubelet/pod_workers_test.go | 24 +++++++++++++++++++++--- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 8d72e1ad55e..a176b2fb984 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -775,16 +775,23 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { } // if this pod is being synced for the first time, we need to make sure it is an active pod if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) { - // check to see if the pod is not running and the pod is terminal. - // If this succeeds then record in the podWorker that it is terminated. + // Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated. + // This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523 + // However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set. if statusCache, err := p.podCache.Get(uid); err == nil { if isPodStatusCacheTerminal(statusCache) { + // At this point we know: + // (1) The pod is terminal based on the config source. + // (2) The pod is terminal based on the runtime cache. + // This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart. + // These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod. + // As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run. status = &podSyncStatus{ terminatedAt: now, terminatingAt: now, syncedAt: now, startedTerminating: true, - finished: true, + finished: false, fullname: kubecontainer.BuildPodFullName(name, ns), } } diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 3fcd2465c14..8fca3c255a8 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -754,23 +754,41 @@ func TestUpdatePod(t *testing.T) { expectKnownTerminated: true, }, { - name: "a pod that is terminal and has never started is finished immediately if the runtime has a cached terminal state", + name: "a pod that is terminal and has never started advances to finished if the runtime has a cached terminal state", update: UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), }, runtimeStatus: &kubecontainer.PodStatus{ /* we know about this pod */ }, - expect: &podSyncStatus{ + expectBeforeWorker: &podSyncStatus{ + fullname: "done-pod_ns", + syncedAt: time.Unix(1, 0), + terminatingAt: time.Unix(1, 0), + terminatedAt: time.Unix(1, 0), + pendingUpdate: &UpdatePodOptions{ + UpdateType: kubetypes.SyncPodCreate, + Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), + }, + finished: false, // Should be marked as not finished initially (to ensure `SyncTerminatedPod` will run) and status will progress to terminated. + startedTerminating: true, + working: true, + }, + expect: hasContext(&podSyncStatus{ fullname: "done-pod_ns", syncedAt: time.Unix(1, 0), terminatingAt: time.Unix(1, 0), terminatedAt: time.Unix(1, 0), + startedAt: time.Unix(3, 0), startedTerminating: true, finished: true, + activeUpdate: &UpdatePodOptions{ + UpdateType: kubetypes.SyncPodSync, + Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded), + }, // if we have never seen the pod before, a restart makes no sense restartRequested: false, - }, + }), expectKnownTerminated: true, }, {