From 3bce24527966cd839bb13c81d5124e60533a69a5 Mon Sep 17 00:00:00 2001 From: Gunju Kim Date: Wed, 1 Sep 2021 23:33:14 +0900 Subject: [PATCH] Ensure there is one running static pod with the same full name --- pkg/kubelet/pod_workers.go | 152 +++++++++++++++++++----- pkg/kubelet/pod_workers_test.go | 198 ++++++++++++++++++++++++++++++++ 2 files changed, 320 insertions(+), 30 deletions(-) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 48a9542aeac..12e75e0b87f 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -246,6 +246,8 @@ type podSyncStatus struct { cancelFn context.CancelFunc // working is true if a pod worker is currently in a sync method. working bool + // fullname of the pod + fullname string // syncedAt is the time at which the pod worker first observed this pod. syncedAt time.Time @@ -375,9 +377,10 @@ type podWorkers struct { // Tracks by UID the termination status of a pod - syncing, terminating, // terminated, and evicted. podSyncStatuses map[types.UID]*podSyncStatus - // Tracks when a static pod is being killed and is removed when the - // static pod transitions to the killed state. - terminatingStaticPodFullnames map[string]struct{} + // Tracks all uids for started static pods by full name + startedStaticPodsByFullname map[string]types.UID + // Tracks all uids for static pods that are waiting to start by full name + waitingToStartStaticPodsByFullname map[string][]types.UID workQueue queue.WorkQueue @@ -412,18 +415,19 @@ func newPodWorkers( podCache kubecontainer.Cache, ) PodWorkers { return &podWorkers{ - podSyncStatuses: map[types.UID]*podSyncStatus{}, - podUpdates: map[types.UID]chan podWork{}, - lastUndeliveredWorkUpdate: map[types.UID]podWork{}, - terminatingStaticPodFullnames: map[string]struct{}{}, - syncPodFn: syncPodFn, - syncTerminatingPodFn: syncTerminatingPodFn, - syncTerminatedPodFn: syncTerminatedPodFn, - recorder: recorder, - workQueue: workQueue, - resyncInterval: resyncInterval, - backOffPeriod: backOffPeriod, - podCache: podCache, + podSyncStatuses: map[types.UID]*podSyncStatus{}, + podUpdates: map[types.UID]chan podWork{}, + lastUndeliveredWorkUpdate: map[types.UID]podWork{}, + startedStaticPodsByFullname: map[string]types.UID{}, + waitingToStartStaticPodsByFullname: map[string][]types.UID{}, + syncPodFn: syncPodFn, + syncTerminatingPodFn: syncTerminatingPodFn, + syncTerminatedPodFn: syncTerminatedPodFn, + recorder: recorder, + workQueue: workQueue, + resyncInterval: resyncInterval, + backOffPeriod: backOffPeriod, + podCache: podCache, } } @@ -497,8 +501,19 @@ func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool { func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool { p.podLock.Lock() defer p.podLock.Unlock() - _, ok := p.terminatingStaticPodFullnames[podFullName] - return ok + uid, started := p.startedStaticPodsByFullname[podFullName] + if !started { + return false + } + status, exists := p.podSyncStatuses[uid] + if !exists { + return false + } + if !status.IsTerminationRequested() || status.IsTerminated() { + return false + } + + return true } func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { @@ -551,6 +566,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID) status = &podSyncStatus{ syncedAt: now, + fullname: kubecontainer.GetPodFullName(pod), } // if this pod is being synced for the first time, we need to make sure it is an active pod if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) { @@ -663,11 +679,6 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { // will never be zero. options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod - // if a static pod comes through, start tracking it explicitly (cleared by the pod worker loop) - if kubetypes.IsStaticPod(pod) { - p.terminatingStaticPodFullnames[kubecontainer.GetPodFullName(pod)] = struct{}{} - } - default: workType = SyncPodWork @@ -697,6 +708,12 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) { podUpdates = make(chan podWork, 1) p.podUpdates[uid] = podUpdates + // ensure that static pods start in the order they are received by UpdatePod + if kubetypes.IsStaticPod(pod) { + p.waitingToStartStaticPodsByFullname[status.fullname] = + append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) + } + // Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding @@ -766,10 +783,76 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options * return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod } +// allowPodStart tries to start the pod and returns true if allowed, otherwise +// it requeues the pod and returns false. +func (p *podWorkers) allowPodStart(pod *v1.Pod) bool { + if !kubetypes.IsStaticPod(pod) { + // TBD: Do we want to allow non-static pods with the same full name? + // Note that it may disable the force deletion of pods. + return true + } + p.podLock.Lock() + defer p.podLock.Unlock() + status, ok := p.podSyncStatuses[pod.UID] + if !ok { + klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID) + p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) + status.working = false + return false + } + if !p.allowStaticPodStart(status.fullname, pod.UID) { + p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) + status.working = false + return false + } + return true +} + +// allowStaticPodStart tries to start the static pod and returns true if +// 1. there are no other started static pods with the same fullname +// 2. the uid matches that of the first valid static pod waiting to start +func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool { + startedUID, started := p.startedStaticPodsByFullname[fullname] + if started { + return startedUID == uid + } + + waitingPods := p.waitingToStartStaticPodsByFullname[fullname] + for i, waitingUID := range waitingPods { + // has pod already terminated or been deleted? + if _, ok := p.podSyncStatuses[waitingUID]; !ok { + continue + } + // another pod is next in line + if waitingUID != uid { + p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:] + return false + } + // we are up next, remove ourselves + waitingPods = waitingPods[i+1:] + break + } + if len(waitingPods) != 0 { + p.waitingToStartStaticPodsByFullname[fullname] = waitingPods + } else { + delete(p.waitingToStartStaticPodsByFullname, fullname) + } + p.startedStaticPodsByFullname[fullname] = uid + return true +} + func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) { var lastSyncTime time.Time + var podStarted bool for update := range podUpdates { pod := update.Options.Pod + if !podStarted { + if !p.allowPodStart(pod) { + klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID) + continue + } + podStarted = true + } klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType) err := func() error { @@ -899,9 +982,6 @@ func (p *podWorkers) completeTerminating(pod *v1.Pod) { klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID) - // if a static pod is being tracked, forget it - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) - if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -933,9 +1013,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID) - // if a static pod is being tracked, forget it - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) - if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) @@ -943,6 +1020,10 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { status.terminatedAt = time.Now() status.finished = true status.working = false + + if p.startedStaticPodsByFullname[status.fullname] == pod.UID { + delete(p.startedStaticPodsByFullname, status.fullname) + } } ch, ok := p.podUpdates[pod.UID] @@ -951,7 +1032,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) { } delete(p.podUpdates, pod.UID) delete(p.lastUndeliveredWorkUpdate, pod.UID) - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) } // completeTerminated is invoked after syncTerminatedPod completes successfully and means we @@ -968,7 +1048,6 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) { } delete(p.podUpdates, pod.UID) delete(p.lastUndeliveredWorkUpdate, pod.UID) - delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod)) if status, ok := p.podSyncStatuses[pod.UID]; ok { if status.terminatingAt.IsZero() { @@ -979,6 +1058,10 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) { } status.finished = true status.working = false + + if p.startedStaticPodsByFullname[status.fullname] == pod.UID { + delete(p.startedStaticPodsByFullname, status.fullname) + } } } @@ -1078,6 +1161,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) { return } + if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid { + klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid) + status.finished = true + } + if !status.finished { klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) return @@ -1091,6 +1179,10 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) { delete(p.podSyncStatuses, uid) delete(p.podUpdates, uid) delete(p.lastUndeliveredWorkUpdate, uid) + + if p.startedStaticPodsByFullname[status.fullname] == uid { + delete(p.startedStaticPodsByFullname, status.fullname) + } } // killPodNow returns a KillPodFunc that can be used to kill a pod. diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 91b36999593..4028c06c292 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -152,6 +152,18 @@ func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod { } } +func newStaticPod(uid, name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(uid), + Name: name, + Annotations: map[string]string{ + kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource, + }, + }, + } +} + // syncPodRecord is a record of a sync pod call type syncPodRecord struct { name string @@ -682,3 +694,189 @@ func TestKillPodNowFunc(t *testing.T) { t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true) } } + +func Test_allowPodStart(t *testing.T) { + testCases := []struct { + desc string + pod *v1.Pod + podSyncStatuses map[types.UID]*podSyncStatus + startedStaticPodsByFullname map[string]types.UID + waitingToStartStaticPodsByFullname map[string][]types.UID + allowed bool + }{ + { + // TBD: Do we want to allow non-static pods with the same full name? + // Note that it may disable the force deletion of pods. + desc: "non-static pod", + pod: newPod("uid-0", "test"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "test_", + }, + "uid-1": { + fullname: "test_", + }, + }, + allowed: true, + }, + { + // TBD: Do we want to allow a non-static pod with the same full name + // as the started static pod? + desc: "non-static pod when there is a started static pod with the same full name", + pod: newPod("uid-0", "test"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "test_", + }, + "uid-1": { + fullname: "test_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "test_": types.UID("uid-1"), + }, + allowed: true, + }, + { + // TBD: Do we want to allow a static pod with the same full name as the + // started non-static pod? + desc: "static pod when there is a started non-static pod with the same full name", + pod: newPod("uid-0", "test"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "test_", + }, + "uid-1": { + fullname: "test_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "test_": types.UID("uid-1"), + }, + allowed: true, + }, + { + desc: "static pod when there are no started static pods with the same full name", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "bar_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "bar_": types.UID("uid-1"), + }, + allowed: true, + }, + { + desc: "static pod when there is a started static pod with the same full name", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-1"), + }, + allowed: false, + }, + { + desc: "static pod if the static pod has already started", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + }, + startedStaticPodsByFullname: map[string]types.UID{ + "foo_": types.UID("uid-0"), + }, + allowed: true, + }, + { + desc: "static pod if the static pod is the first pod waiting to start", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-0"), + }, + }, + allowed: true, + }, + { + desc: "static pod if the static pod is not the first pod waiting to start", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-1"), + types.UID("uid-0"), + }, + }, + allowed: false, + }, + { + desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod", + pod: newStaticPod("uid-0", "foo"), + podSyncStatuses: map[types.UID]*podSyncStatus{ + "uid-0": { + fullname: "foo_", + }, + "uid-1": { + fullname: "foo_", + }, + }, + waitingToStartStaticPodsByFullname: map[string][]types.UID{ + "foo_": { + types.UID("uid-2"), + types.UID("uid-2"), + types.UID("uid-3"), + types.UID("uid-0"), + types.UID("uid-1"), + }, + }, + allowed: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + podWorkers, _ := createPodWorkers() + if tc.podSyncStatuses != nil { + podWorkers.podSyncStatuses = tc.podSyncStatuses + } + if tc.startedStaticPodsByFullname != nil { + podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname + } + if tc.waitingToStartStaticPodsByFullname != nil { + podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname + } + if podWorkers.allowPodStart(tc.pod) != tc.allowed { + if tc.allowed { + t.Errorf("Pod should be allowed") + } else { + t.Errorf("Pod should not be allowed") + } + } + }) + } +}