Merge pull request #117019 from bobbypage/gh_116925

kubelet: Mark new terminal pods as non-finished in pod worker
This commit is contained in:
Kubernetes Prow Robot 2023-04-17 15:10:58 -07:00 committed by GitHub
commit 53cccbe4f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 6 deletions

View File

@ -775,16 +775,23 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) {
// check to see if the pod is not running and the pod is terminal.
// If this succeeds then record in the podWorker that it is terminated.
// Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated.
// This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523
// However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set.
if statusCache, err := p.podCache.Get(uid); err == nil {
if isPodStatusCacheTerminal(statusCache) {
// At this point we know:
// (1) The pod is terminal based on the config source.
// (2) The pod is terminal based on the runtime cache.
// This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart.
// These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod.
// As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run.
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
finished: false,
fullname: kubecontainer.BuildPodFullName(name, ns),
}
}

View File

@ -754,23 +754,41 @@ func TestUpdatePod(t *testing.T) {
expectKnownTerminated: true,
},
{
name: "a pod that is terminal and has never started is finished immediately if the runtime has a cached terminal state",
name: "a pod that is terminal and has never started advances to finished if the runtime has a cached terminal state",
update: UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},
runtimeStatus: &kubecontainer.PodStatus{ /* we know about this pod */ },
expect: &podSyncStatus{
expectBeforeWorker: &podSyncStatus{
fullname: "done-pod_ns",
syncedAt: time.Unix(1, 0),
terminatingAt: time.Unix(1, 0),
terminatedAt: time.Unix(1, 0),
pendingUpdate: &UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},
finished: false, // Should be marked as not finished initially (to ensure `SyncTerminatedPod` will run) and status will progress to terminated.
startedTerminating: true,
working: true,
},
expect: hasContext(&podSyncStatus{
fullname: "done-pod_ns",
syncedAt: time.Unix(1, 0),
terminatingAt: time.Unix(1, 0),
terminatedAt: time.Unix(1, 0),
startedAt: time.Unix(3, 0),
startedTerminating: true,
finished: true,
activeUpdate: &UpdatePodOptions{
UpdateType: kubetypes.SyncPodSync,
Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
},
// if we have never seen the pod before, a restart makes no sense
restartRequested: false,
},
}),
expectKnownTerminated: true,
},
{

View File

@ -28,7 +28,11 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
testutils "k8s.io/kubernetes/test/utils"
@ -37,6 +41,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/uuid"
)
type podCondition func(pod *v1.Pod) (bool, error)
@ -265,5 +270,117 @@ var _ = SIGDescribe("Restart [Serial] [Slow] [Disruptive]", func() {
}
}
})
// Regression test for https://issues.k8s.io/116925
ginkgo.It("should delete pods which are marked as terminal and have a deletion timestamp set after restart", func(ctx context.Context) {
podName := "terminal-restart-pod" + string(uuid.NewUUID())
gracePeriod := int64(30)
podSpec := e2epod.MustMixinRestrictedPodSecurity(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &gracePeriod,
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: podName,
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"sh", "-c"},
Args: []string{`
sleep 9999999 &
PID=$!
_term () {
kill $PID
echo "Caught SIGTERM!"
}
trap _term SIGTERM
wait $PID
trap - TERM
# Wait for the long running sleep to exit
wait $PID
exit 0
`,
},
},
},
},
})
ginkgo.By(fmt.Sprintf("Creating a pod (%v/%v) with restart policy: %v", f.Namespace.Name, podName, podSpec.Spec.RestartPolicy))
pod := e2epod.NewPodClient(f).Create(ctx, podSpec)
ginkgo.By(fmt.Sprintf("Waiting for the pod (%v/%v) to be running", f.Namespace.Name, pod.Name))
err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err, "Failed to await for the pod to be running: (%v/%v)", f.Namespace.Name, pod.Name)
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(ctx, options)
},
}
podsList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list pods in namespace: %s", f.Namespace.Name)
ginkgo.By(fmt.Sprintf("Deleting the pod (%v/%v) to set a deletion timestamp", pod.Namespace, pod.Name))
time.Sleep(time.Second)
err = e2epod.NewPodClient(f).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
framework.ExpectNoError(err, "Failed to delete the pod: %q", pod.Name)
ctxUntil, cancel := context.WithTimeout(ctx, f.Timeouts.PodStart)
defer cancel()
ginkgo.By(fmt.Sprintf("Started watch for pod (%v/%v) to enter succeeded phase", pod.Namespace, pod.Name))
_, err = watchtools.Until(ctxUntil, podsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
if pod, ok := event.Object.(*v1.Pod); ok {
found := pod.ObjectMeta.Name == podName &&
pod.ObjectMeta.Namespace == f.Namespace.Name &&
pod.Status.Phase == v1.PodSucceeded
if !found {
ginkgo.By(fmt.Sprintf("Observed Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase))
return false, nil
}
ginkgo.By(fmt.Sprintf("Found Pod (%s/%s) in phase %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.Phase))
return found, nil
}
ginkgo.By(fmt.Sprintf("Observed event: %+v", event.Object))
return false, nil
})
ginkgo.By("Ended watch for pod entering succeeded phase")
framework.ExpectNoError(err, "failed to see event that pod (%s/%s) enter succeeded phase: %v", pod.Namespace, pod.Name, err)
// As soon as the pod enters succeeded phase (detected by the watch above); kill the kubelet.
// This is a bit racy, but the goal is to stop the kubelet before the kubelet is able to delete the pod from the API-sever in order to repro https://issues.k8s.io/116925
ginkgo.By("Stopping the kubelet")
startKubelet := stopKubelet()
// wait until the kubelet health check will fail
gomega.Eventually(ctx, func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeFalse())
ginkgo.By("Starting the kubelet")
startKubelet()
// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func() bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}, f.Timeouts.PodStart, f.Timeouts.Poll).Should(gomega.BeTrue())
// Wait for the Kubelet to be ready.
gomega.Eventually(ctx, func(ctx context.Context) bool {
nodes, err := e2enode.TotalReady(ctx, f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, f.Timeouts.Poll).Should(gomega.BeTrue())
ginkgo.By(fmt.Sprintf("After the kubelet is restarted, verify the pod (%s/%s) is deleted by kubelet", pod.Namespace, pod.Name))
gomega.Eventually(ctx, func(ctx context.Context) error {
return checkMirrorPodDisappear(ctx, f.ClientSet, pod.Name, pod.Namespace)
}, f.Timeouts.PodDelete, f.Timeouts.Poll).Should(gomega.BeNil())
})
})
})