From ed48dcd2d71d9b25a6aab83c07487b4038cc647e Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 29 Mar 2023 15:29:59 -0400 Subject: [PATCH] kubelet: Ensure pods that have not started track a pendingUpdate A pod that cannot be started yet (due to static pod fullname exclusion when UIDs are reused) must be accounted for in the pod worker since it is considered to have been admitted and will eventually start. Due to a bug we accidentally cleared pendingUpdate for pods that cannot start yet which means we can't report the right metric to users in kubelet_working_pods and in theory we might fail to start the pod in the future (although we currently have not observed that in tests that should catch such an error). Describe, implement, and test the invariant that when startPodSync returns in every path that either activeUpdate OR pendingUpdate is set on the status, but never both, and is only nil when the pod can never start. This bug was detected by a "programmer error" assertion we added on metrics that were not being reported, suggesting that we should be more aggressive on using log assertions and automating detection in tests. --- pkg/kubelet/kubelet_pods_test.go | 65 ++++++++++++++++++++++++++++++++ pkg/kubelet/pod_workers.go | 20 ++++++++-- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 6d8b622d21d..403c0eabf17 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -5082,6 +5082,71 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { } }, }, + { + name: "pod that could not start still has a pending update and is tracked in metrics", + wantErr: false, + pods: []*v1.Pod{ + staticPod(), + }, + prepareWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) { + // send a create of a static pod + pod := staticPod() + // block startup of the static pod due to full name collision + w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2") + + w.UpdatePod(UpdatePodOptions{ + UpdateType: kubetypes.SyncPodCreate, + StartTime: time.Unix(1, 0).UTC(), + Pod: pod, + }) + drainAllWorkers(w) + + if _, ok := records[pod.UID]; ok { + t.Fatalf("unexpected records: %#v", records) + } + // pod worker is unaware of pod1 yet + }, + wantWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) { + uid := types.UID("1") + if len(w.podSyncStatuses) != 1 { + t.Fatalf("unexpected sync statuses: %#v", w.podSyncStatuses) + } + s, ok := w.podSyncStatuses[uid] + if !ok || s.IsTerminationRequested() || s.IsTerminationStarted() || s.IsFinished() || s.IsWorking() || s.IsDeleted() || s.restartRequested || s.activeUpdate != nil || s.pendingUpdate == nil { + t.Errorf("unexpected requested pod termination: %#v", s) + } + + // expect that no sync calls are made, since the pod doesn't ever start + if actual, expected := records[uid], []syncPodRecord(nil); !reflect.DeepEqual(expected, actual) { + t.Fatalf("unexpected pod sync records: %s", cmp.Diff(expected, actual, cmp.AllowUnexported(syncPodRecord{}))) + } + }, + expectMetrics: map[string]string{ + metrics.DesiredPodCount.FQName(): `# HELP kubelet_desired_pods [ALPHA] The number of pods the kubelet is being instructed to run. static is true if the pod is not from the apiserver. + # TYPE kubelet_desired_pods gauge + kubelet_desired_pods{static=""} 0 + kubelet_desired_pods{static="true"} 1 + `, + metrics.WorkingPodCount.FQName(): `# HELP kubelet_working_pods [ALPHA] Number of pods the kubelet is actually running, broken down by lifecycle phase, whether the pod is desired, orphaned, or runtime only (also orphaned), and whether the pod is static. An orphaned pod has been removed from local configuration or force deleted in the API and consumes resources that are not otherwise visible. + # TYPE kubelet_working_pods gauge + kubelet_working_pods{config="desired",lifecycle="sync",static=""} 0 + kubelet_working_pods{config="desired",lifecycle="sync",static="true"} 1 + kubelet_working_pods{config="desired",lifecycle="terminated",static=""} 0 + kubelet_working_pods{config="desired",lifecycle="terminated",static="true"} 0 + kubelet_working_pods{config="desired",lifecycle="terminating",static=""} 0 + kubelet_working_pods{config="desired",lifecycle="terminating",static="true"} 0 + kubelet_working_pods{config="orphan",lifecycle="sync",static=""} 0 + kubelet_working_pods{config="orphan",lifecycle="sync",static="true"} 0 + kubelet_working_pods{config="orphan",lifecycle="terminated",static=""} 0 + kubelet_working_pods{config="orphan",lifecycle="terminated",static="true"} 0 + kubelet_working_pods{config="orphan",lifecycle="terminating",static=""} 0 + kubelet_working_pods{config="orphan",lifecycle="terminating",static="true"} 0 + kubelet_working_pods{config="runtime_only",lifecycle="sync",static="unknown"} 0 + kubelet_working_pods{config="runtime_only",lifecycle="terminated",static="unknown"} 0 + kubelet_working_pods{config="runtime_only",lifecycle="terminating",static="unknown"} 0 + `, + }, + }, { name: "pod that could not start and is not in config is force terminated without runtime during pod cleanup", wantErr: false, diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 8d72e1ad55e..3fcc69c7cc0 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -1086,6 +1086,10 @@ func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) { // or can be started, and updates the cached pod state so that downstream components can observe what the // pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any // of the boolean values is false, ensure the appropriate cleanup happens before returning. +// +// This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate, +// or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started +// should never have an activeUpdate because that is exposed to downstream components on started pods. func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) { p.podLock.Lock() defer p.podLock.Unlock() @@ -1159,6 +1163,8 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) return ctx, update, canStart, canEverStart, true case !canStart: + // this is the only path we don't start the pod, so we need to put the change back in pendingUpdate + status.pendingUpdate = &update.Options status.working = false klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID) return ctx, update, canStart, canEverStart, true @@ -1545,9 +1551,17 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke State: status.WorkType(), Orphan: orphan, } - if status.activeUpdate != nil && status.activeUpdate.Pod != nil { - sync.HasConfig = true - sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod) + switch { + case status.activeUpdate != nil: + if status.activeUpdate.Pod != nil { + sync.HasConfig = true + sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod) + } + case status.pendingUpdate != nil: + if status.pendingUpdate.Pod != nil { + sync.HasConfig = true + sync.Static = kubetypes.IsStaticPod(status.pendingUpdate.Pod) + } } workers[uid] = sync }