From 0f2c94d842bee097b2046bc0f86f1b565124ebd2 Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Tue, 3 Aug 2021 22:20:02 +0900 Subject: [PATCH 1/2] NodeConformance: Respect grace period when updating static pod --- test/e2e_node/mirror_pod_grace_period_test.go | 74 +++++++++++++++---- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/test/e2e_node/mirror_pod_grace_period_test.go b/test/e2e_node/mirror_pod_grace_period_test.go index 9e8a1c63205..26710a1c1e1 100644 --- a/test/e2e_node/mirror_pod_grace_period_test.go +++ b/test/e2e_node/mirror_pod_grace_period_test.go @@ -26,8 +26,11 @@ import ( "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" + clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" + imageutils "k8s.io/kubernetes/test/utils/image" ) var _ = SIGDescribe("MirrorPodWithGracePeriod", func() { @@ -53,10 +56,9 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() { ginkgo.It("mirror pod termination should satisfy grace period when static pod is deleted [NodeConformance]", func() { ginkgo.By("get mirror pod uid") - _, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) framework.ExpectNoError(err) - - start := time.Now() + uid := pod.UID ginkgo.By("delete the static pod") file := staticPodPath(podPath, staticPodName, ns) @@ -64,25 +66,51 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() { err = os.Remove(file) framework.ExpectNoError(err) - for { - if time.Now().Sub(start).Seconds() > 19 { - break - } - pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) - framework.ExpectNoError(err) - if pod.Status.Phase != v1.PodRunning { - framework.Failf("expected the mirror pod %q to be running, got %q", mirrorPodName, pod.Status.Phase) - } - // have some pause in between the API server queries to avoid throttling - time.Sleep(time.Duration(200) * time.Millisecond) - } + ginkgo.By("wait for the mirror pod to be running for grace period") + gomega.Consistently(func() error { + return checkMirrorPodRunningWithUID(f.ClientSet, mirrorPodName, ns, uid) + }, 19*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) + }) + + ginkgo.It("mirror pod termination should satisfy grace period when static pod is updated [NodeConformance]", func() { + ginkgo.By("get mirror pod uid") + pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + uid := pod.UID + + ginkgo.By("update the static pod container image") + image := imageutils.GetPauseImageName() + err = createStaticPod(podPath, staticPodName, ns, image, v1.RestartPolicyAlways) + framework.ExpectNoError(err) + + ginkgo.By("wait for the mirror pod to be running for grace period") + gomega.Consistently(func() error { + return checkMirrorPodRunningWithUID(f.ClientSet, mirrorPodName, ns, uid) + }, 19*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) + + ginkgo.By("wait for the mirror pod to be updated") + gomega.Eventually(func() error { + return checkMirrorPodRecreatedAndRunning(f.ClientSet, mirrorPodName, ns, uid) + }, 2*time.Minute, time.Second*4).Should(gomega.BeNil()) + + ginkgo.By("check the mirror pod container image is updated") + pod, err = f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.ExpectEqual(len(pod.Spec.Containers), 1) + framework.ExpectEqual(pod.Spec.Containers[0].Image, image) }) ginkgo.AfterEach(func() { + ginkgo.By("delete the static pod") + err := deleteStaticPod(podPath, staticPodName, ns) + if !os.IsNotExist(err) { + framework.ExpectNoError(err) + } + ginkgo.By("wait for the mirror pod to disappear") gomega.Eventually(func() error { return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns) - }, time.Second*19, time.Second).Should(gomega.BeNil()) + }, 2*time.Minute, time.Second*4).Should(gomega.BeNil()) }) }) }) @@ -124,3 +152,17 @@ spec: framework.Logf("has written %v", file) return err } + +func checkMirrorPodRunningWithUID(cl clientset.Interface, name, namespace string, oUID types.UID) error { + pod, err := cl.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("expected the mirror pod %q to appear: %v", name, err) + } + if pod.UID != oUID { + return fmt.Errorf("expected the uid of mirror pod %q to be same, got %q", name, pod.UID) + } + if pod.Status.Phase != v1.PodRunning { + return fmt.Errorf("expected the mirror pod %q to be running, got %q", name, pod.Status.Phase) + } + return nil +} From 3bce24527966cd839bb13c81d5124e60533a69a5 Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Wed, 1 Sep 2021 23:33:14 +0900 Subject: [PATCH 2/2] Ensure there is one running static pod with the same full name --- pkg/kubelet/pod_workers.go | 152 +++++++++++++++++++----- pkg/kubelet/pod_workers_test.go | 198 ++++++++++++++++++++++++++++++++ 2 files changed, 320 insertions(+), 30 deletions(-) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 48a9542aeac..12e75e0b87f 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -246,6 +246,8 @@ type podSyncStatus struct { cancelFn context.CancelFunc // working is true if a pod worker is currently in a sync method. working bool + // fullname of the pod + fullname string // syncedAt is the time at which the pod worker first observed this pod. syncedAt time.Time @@ -375,9 +377,10 @@ type podWorkers struct { // Tracks by UID the termination status of a pod - syncing, terminating, // terminated, and evicted. podSyncStatuses map[types.UID]*podSyncStatus - // Tracks when a static pod is being killed and is removed when the - // static pod transitions to the killed state. - terminatingStaticPodFullnames map[string]struct{} + // Tracks all uids for started static pods by full name + startedStaticPodsByFullname map[string]types.UID + // Tracks all uids for static pods that are waiting to start by full name + waitingToStartStaticPodsByFullname map[string][]types.UID workQueue queue.WorkQueue @@ -412,18 +415,19 @@ func newPodWorkers( podCache kubecontainer.Cache, ) PodWorkers { return &podWorkers{ - podSyncStatuses: map[types.UID]*podSyncStatus{}, - podUpdates: map[types.UID]chan podWork{}, - lastUndeliveredWorkUpdate: map[types.UID]podWork{}, - terminatingStaticPodFullnames: map[string]struct{}{}, - syncPodFn: syncPodFn, - syncTerminatingPodFn: syncTerminatingPodFn, - syncTerminatedPodFn: syncTerminatedPodFn, - recorder: recorder, - workQueue: workQueue, - resyncInterval: resyncInterval, - backOffPeriod: backOffPeriod, - podCache: podCache, + podSyncStatuses: map[types.UID]*podSyncStatus{}, + podUpdates: map[types.UID]chan podWork{}, + lastUndeliveredWorkUpdate: map[types.UID]podWork{}, + startedStaticPodsByFullname: map[string]types.UID{}, + waitingToStartStaticPodsByFullname: map[string][]types.UID{}, + syncPodFn: syncPodFn, + syncTerminatingPodFn: syncTerminatingPodFn, + syncTerminatedPodFn: syncTerminatedPodFn, + recorder: recorder, + workQueue: workQueue, + resyncInterval: resyncInterval, + backOffPeriod: backOffPeriod, + podCache: podCache, } } @@ -497,8 +501,19 @@ func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool { func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool { p.podLock.Lock() defer p.podLock.Unlock() - _, ok := p.terminatingStaticPodFullnames[podFullName] - return ok + uid, started := p.startedStaticPodsByFullname[podFullName] + if !started { + return false + } + status, exists := p.podSyncStatuses[uid] + if !exists { + return false + } + if !status.IsTerminationRequested() || status.IsTerminated() { + return false + } + + return true } func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { @@ -551,6 +566,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID) status = &podSyncStatus{ syncedAt: now, + fullname: kubecontainer.GetPodFullName(pod), } // if this pod is being synced for the first time, we need to make sure it is an active pod if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) { @@ -663,11 +679,6 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { // will never be zero. options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod - // if a static pod comes through, start tracking it explicitly (cleared by the pod worker loop) - if kubetypes.IsStaticPod(pod) { - p.terminatingStaticPodFullnames[kubecontainer.GetPodFullName(pod)] = struct{}{} - } - default: workType = SyncPodWork @@ -697,6 +708,12 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { podUpdates = make(chan podWork, 1) p.podUpdates[uid] = podUpdates + // ensure that static pods start in the order they are received by UpdatePod + if kubetypes.IsStaticPod(pod) { + p.waitingToStartStaticPodsByFullname[status.fullname] = + append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) + } + // Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding @@ -766,10 +783,76 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options * return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod } +// allowPodStart tries to start the pod and returns true if allowed, otherwise +// it requeues the pod and returns false. +func (p *podWorkers) allowPodStart(pod *v1.Pod) bool { + if !kubetypes.IsStaticPod(pod) { + // TBD: Do we want to allow non-static pods with the same full name? + // Note that it may disable the force deletion of pods. + return true + } + p.podLock.Lock() + defer p.podLock.Unlock() + status, ok := p.podSyncStatuses[pod.UID] + if !ok { + klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID) + p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) + status.working = false + return false + } + if !p.allowStaticPodStart(status.fullname, pod.UID) { + p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) + status.working = false + return false + } + return true +} + +// allowStaticPodStart tries to start the static pod and returns true if +// 1. there are no other started static pods with the same fullname +// 2. the uid matches that of the first valid static pod waiting to start +func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool { + startedUID, started := p.startedStaticPodsByFullname[fullname] + if started { + return startedUID == uid + } + + waitingPods := p.waitingToStartStaticPodsByFullname[fullname] + for i, waitingUID := range waitingPods { + // has pod already terminated or been deleted? + if _, ok := p.podSyncStatuses[waitingUID]; !ok { + continue + } + // another pod is next in line + if waitingUID != uid { + p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:] + return false + } + // we are up next, remove ourselves + waitingPods = waitingPods[i+1:] + break + } + if len(waitingPods) != 0 { + p.waitingToStartStaticPodsByFullname[fullname] = waitingPods + } else { + delete(p.waitingToStartStaticPodsByFullname, fullname) + } + p.startedStaticPodsByFullname[fullname] = uid + return true +} + func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { var lastSyncTime time.Time + var podStarted bool for update := range podUpdates { pod := update.Options.Pod + if !podStarted { + if !p.allowPodStart(pod) { + klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID) + continue + } + podStarted = true + } klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) err := func() error { @@ -899,9 +982,6 @@ func (p *podWorkers) completeTerminating(pod *v1.Pod) { klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID) - // if a static pod is being tracked, forget it - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) - if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -933,9 +1013,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID) - // if a static pod is being tracked, forget it - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) - if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -943,6 +1020,10 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { status.terminatedAt = time.Now() status.finished = true status.working = false + + if p.startedStaticPodsByFullname[status.fullname] == pod.UID { + delete(p.startedStaticPodsByFullname, status.fullname) + } } ch, ok := p.podUpdates[pod.UID] @@ -951,7 +1032,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { } delete(p.podUpdates, pod.UID) delete(p.lastUndeliveredWorkUpdate, pod.UID) - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) } // completeTerminated is invoked after syncTerminatedPod completes successfully and means we @@ -968,7 +1048,6 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) { } delete(p.podUpdates, pod.UID) delete(p.lastUndeliveredWorkUpdate, pod.UID) - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { @@ -979,6 +1058,10 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) { } status.finished = true status.working = false + + if p.startedStaticPodsByFullname[status.fullname] == pod.UID { + delete(p.startedStaticPodsByFullname, status.fullname) + } } } @@ -1078,6 +1161,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) { return } + if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid { + klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid) + status.finished = true + } + if !status.finished { klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) return @@ -1091,6 +1179,10 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) { delete(p.podSyncStatuses, uid) delete(p.podUpdates, uid) delete(p.lastUndeliveredWorkUpdate, uid) + + if p.startedStaticPodsByFullname[status.fullname] == uid { + delete(p.startedStaticPodsByFullname, status.fullname) + } } // killPodNow returns a KillPodFunc that can be used to kill a pod. diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 91b36999593..4028c06c292 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -152,6 +152,18 @@ func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod { } } +func newStaticPod(uid, name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Name: name, + Annotations: map[string]string{ + kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource, + }, + }, + } +} + // syncPodRecord is a record of a sync pod call type syncPodRecord struct { name string @@ -682,3 +694,189 @@ func TestKillPodNowFunc(t *testing.T) { t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true) } } + +func Test_allowPodStart(t *testing.T) { + testCases := []struct { + desc string + pod *v1.Pod + podSyncStatuses map[types.UID]*podSyncStatus + startedStaticPodsByFullname map[string]types.UID + waitingToStartStaticPodsByFullname map[string][]types.UID + allowed bool + }{ + { + // TBD: Do we want to allow non-static pods with the same full name? + // Note that it may disable the force deletion of pods. + desc: "non-static pod", + pod: newPod("uid-0", "test"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "test_", + }, + "uid-1": { + fullname: "test_", + }, + }, + allowed: true, + }, + { + // TBD: Do we want to allow a non-static pod with the same full name + // as the started static pod? + desc: "non-static pod when there is a started static pod with the same full name", + pod: newPod("uid-0", "test"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "test_", + }, + "uid-1": { + fullname: "test_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "test_": types.UID("uid-1"), + }, + allowed: true, + }, + { + // TBD: Do we want to allow a static pod with the same full name as the + // started non-static pod? + desc: "static pod when there is a started non-static pod with the same full name", + pod: newPod("uid-0", "test"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "test_", + }, + "uid-1": { + fullname: "test_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "test_": types.UID("uid-1"), + }, + allowed: true, + }, + { + desc: "static pod when there are no started static pods with the same full name", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "bar_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "bar_": types.UID("uid-1"), + }, + allowed: true, + }, + { + desc: "static pod when there is a started static pod with the same full name", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-1"), + }, + allowed: false, + }, + { + desc: "static pod if the static pod has already started", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + }, + allowed: true, + }, + { + desc: "static pod if the static pod is the first pod waiting to start", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-0"), + }, + }, + allowed: true, + }, + { + desc: "static pod if the static pod is not the first pod waiting to start", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + types.UID("uid-0"), + }, + }, + allowed: false, + }, + { + desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-2"), + types.UID("uid-2"), + types.UID("uid-3"), + types.UID("uid-0"), + types.UID("uid-1"), + }, + }, + allowed: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + podWorkers, _ := createPodWorkers() + if tc.podSyncStatuses != nil { + podWorkers.podSyncStatuses = tc.podSyncStatuses + } + if tc.startedStaticPodsByFullname != nil { + podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname + } + if tc.waitingToStartStaticPodsByFullname != nil { + podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname + } + if podWorkers.allowPodStart(tc.pod) != tc.allowed { + if tc.allowed { + t.Errorf("Pod should be allowed") + } else { + t.Errorf("Pod should not be allowed") + } + } + }) + } +}