From 40e7d88f022e5c17d442495c5935ea6084e5690d Mon Sep 17 00:00:00 2001 From: Natasha Sarkar Date: Tue, 25 Feb 2025 22:11:40 +0000 Subject: [PATCH] Kubelet sets pod.status.observedGeneration behind FG --- pkg/api/v1/pod/util.go | 12 +++ pkg/features/kube_features.go | 6 ++ pkg/features/versioned_kube_features.go | 7 ++ pkg/kubelet/config/config.go | 1 + pkg/kubelet/status/status_manager.go | 8 ++ test/e2e/feature/feature.go | 4 + test/e2e/framework/pod/wait.go | 17 ++- test/e2e/node/pod_admission.go | 1 + test/e2e/node/pods.go | 102 +++++++++++++++++- .../test_data/versioned_feature_list.yaml | 6 ++ 10 files changed, 162 insertions(+), 2 deletions(-) diff --git a/pkg/api/v1/pod/util.go b/pkg/api/v1/pod/util.go index c2fe5197146..b3490add6b8 100644 --- a/pkg/api/v1/pod/util.go +++ b/pkg/api/v1/pod/util.go @@ -23,6 +23,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) // FindPort locates the container port for the given pod and portName. If the @@ -416,3 +418,13 @@ func IsRestartableInitContainer(initContainer *v1.Container) bool { } return *initContainer.RestartPolicy == v1.ContainerRestartPolicyAlways } + +// We will emit status.observedGeneration if the feature is enabled OR if status.observedGeneration is already set. +// This protects against an infinite loop of kubelet trying to clear the value after the FG is turned off, and +// the API server preserving existing values when an incoming update tries to clear it. +func GetPodObservedGenerationIfEnabled(pod *v1.Pod) int64 { + if pod.Status.ObservedGeneration != 0 || utilfeature.DefaultFeatureGate.Enabled(features.PodObservedGenerationTracking) { + return pod.Generation + } + return 0 +} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index f9ae60a885e..3ddc81a7121 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -502,6 +502,12 @@ const ( // Allows zero value for sleep duration in SleepAction in container lifecycle hooks PodLifecycleSleepActionAllowZero featuregate.Feature = "PodLifecycleSleepActionAllowZero" + // owner: @natasha41575 + // kep: http://kep.k8s.io/5067 + // + // Enables the pod to report status.ObservedGeneration to reflect the generation of the last observed podspec. + PodObservedGenerationTracking featuregate.Feature = "PodObservedGenerationTracking" + // owner: @Huang-Wei // kep: https://kep.k8s.io/3521 // diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index a9e0253417c..a67cb35c35f 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -586,13 +586,20 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.Beta}, }, + PodReadyToStartContainersCondition: { {Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.29"), Default: true, PreRelease: featuregate.Beta}, }, + PodLifecycleSleepActionAllowZero: { {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha}, }, + + PodObservedGenerationTracking: { + {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha}, + }, + PodSchedulingReadiness: { {Version: version.MustParse("1.26"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.27"), Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 34fde1a1038..10a42c82322 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -456,6 +456,7 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr existing.Labels = ref.Labels existing.DeletionTimestamp = ref.DeletionTimestamp existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds + existing.Generation = ref.Generation existing.Status = ref.Status updateAnnotations(existing, ref) diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 323a2192ace..f727a4be958 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -235,6 +235,9 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { // Make sure we're caching a deep copy. status = *status.DeepCopy() + // Set the observedGeneration for this pod status. + status.ObservedGeneration = podutil.GetPodObservedGenerationIfEnabled(pod) + // Force a status update if deletion timestamp is set. This is necessary // because if the pod is in the non-running state, the pod worker still // needs to be able to trigger an update and/or deletion. @@ -599,6 +602,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp status.StartTime = &now } + // prevent sending unnecessary patches + if oldStatus.ObservedGeneration > status.ObservedGeneration { + status.ObservedGeneration = oldStatus.ObservedGeneration + } + normalizeStatus(pod, &status) // Perform some more extensive logging of container termination state to assist in diff --git a/test/e2e/feature/feature.go b/test/e2e/feature/feature.go index 4352ea1dacf..39995703c07 100644 --- a/test/e2e/feature/feature.go +++ b/test/e2e/feature/feature.go @@ -357,6 +357,10 @@ var ( // (used for testing specific log stream ) PodLogsQuerySplitStreams = framework.WithFeature(framework.ValidFeatures.Add("PodLogsQuerySplitStreams")) + // Owner: sig-node + // Marks tests that require a cluster with PodObservedGenerationTracking + PodObservedGenerationTracking = framework.WithFeature(framework.ValidFeatures.Add("PodObservedGenerationTracking")) + // TODO: document the feature (owning SIG, when to use this feature for a test) PodPriority = framework.WithFeature(framework.ValidFeatures.Add("PodPriority")) diff --git a/test/e2e/framework/pod/wait.go b/test/e2e/framework/pod/wait.go index 189d0f38d32..221e5460718 100644 --- a/test/e2e/framework/pod/wait.go +++ b/test/e2e/framework/pod/wait.go @@ -285,7 +285,7 @@ func WaitForPodsRunningReady(ctx context.Context, c clientset.Interface, ns stri } -// WaitForPodCondition waits a pods to be matched to the given condition. +// WaitForPodCondition waits for a pod to be matched to the given condition. // The condition callback may use gomega.StopTrying to abort early. func WaitForPodCondition(ctx context.Context, c clientset.Interface, ns, podName, conditionDesc string, timeout time.Duration, condition podCondition) error { return framework.Gomega(). @@ -305,6 +305,21 @@ func WaitForPodCondition(ctx context.Context, c clientset.Interface, ns, podName })) } +// WaitForPodObservedGeneration waits for a pod to have the given observed generation. +func WaitForPodObservedGeneration(ctx context.Context, c clientset.Interface, ns, podName string, expectedGeneration int64, timeout time.Duration) error { + return framework.Gomega(). + Eventually(ctx, framework.RetryNotFound(framework.GetObject(c.CoreV1().Pods(ns).Get, podName, metav1.GetOptions{}))). + WithTimeout(timeout). + Should(framework.MakeMatcher(func(pod *v1.Pod) (func() string, error) { + if pod.Status.ObservedGeneration == expectedGeneration { + return nil, nil + } + return func() string { + return fmt.Sprintf("expected pod generation to be %d, got %d instead:\n", expectedGeneration, pod.Status.ObservedGeneration) + }, nil + })) +} + // Range determines how many items must exist and how many must match a certain // condition. Values <= 0 are ignored. // TODO (?): move to test/e2e/framework/range diff --git a/test/e2e/node/pod_admission.go b/test/e2e/node/pod_admission.go index 1891c5252db..f28fbc28371 100644 --- a/test/e2e/node/pod_admission.go +++ b/test/e2e/node/pod_admission.go @@ -99,6 +99,7 @@ var _ = SIGDescribe("PodRejectionStatus", func() { // This detects if there are any new fields in Status that were dropped by the pod rejection. // These new fields either should be kept by kubelet's admission or added explicitly in the list of fields that are having a different value or must be cleared. gomega.Expect(gotPod.Status).To(gstruct.MatchAllFields(gstruct.Fields{ + "ObservedGeneration": gstruct.Ignore(), "Phase": gstruct.Ignore(), "Conditions": gstruct.Ignore(), "Message": gstruct.Ignore(), diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index 19cbbaeda0e..cf9d8bc325d 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -39,9 +39,12 @@ import ( "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" @@ -414,6 +417,11 @@ var _ = SIGDescribe("Pods Extended", func() { }) }) }) +}) + +var _ = SIGDescribe("Pods Extended (pod generation)", feature.PodObservedGenerationTracking, framework.WithFeatureGate(features.PodObservedGenerationTracking), func() { + f := framework.NewDefaultFramework("pods") + f.NamespacePodSecurityLevel = admissionapi.LevelBaseline ginkgo.Describe("Pod Generation", func() { var podClient *e2epod.PodClient @@ -507,7 +515,7 @@ var _ = SIGDescribe("Pods Extended", func() { }, } - expectedPodGeneration := 1 + expectedPodGeneration := int64(1) for _, test := range tests { ginkgo.By(test.name) podClient.Update(ctx, podName, test.updateFn) @@ -517,6 +525,7 @@ var _ = SIGDescribe("Pods Extended", func() { expectedPodGeneration++ } gomega.Expect(pod.Generation).To(gomega.BeEquivalentTo(expectedPodGeneration)) + framework.ExpectNoError(e2epod.WaitForPodObservedGeneration(ctx, f.ClientSet, f.Namespace.Name, pod.Name, expectedPodGeneration, 20*time.Second)) } }) @@ -555,6 +564,97 @@ var _ = SIGDescribe("Pods Extended", func() { framework.ExpectNoError(err, "failed to query for pod") gomega.Expect(pod.Generation).To(gomega.BeEquivalentTo(2)) }) + + ginkgo.It("issue 500 podspec updates and verify generation and observedGeneration eventually converge", func(ctx context.Context) { + ginkgo.By("creating the pod") + name := "pod-generation-" + string(uuid.NewUUID()) + value := strconv.Itoa(time.Now().Nanosecond()) + pod := e2epod.NewAgnhostPod(f.Namespace.Name, name, nil, nil, nil) + pod.ObjectMeta.Labels = map[string]string{ + "time": value, + } + pod.Spec.ActiveDeadlineSeconds = utilpointer.Int64(5000) + + ginkgo.By("submitting the pod to kubernetes") + pod = podClient.CreateSync(ctx, pod) + ginkgo.DeferCleanup(func(ctx context.Context) error { + ginkgo.By("deleting the pod") + return podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{}) + }) + + for i := 0; i < 499; i++ { + podClient.Update(ctx, pod.Name, func(pod *v1.Pod) { + *pod.Spec.ActiveDeadlineSeconds-- + }) + } + + // Verify pod observedGeneration converges to the expected generation. + expectedPodGeneration := int64(500) + framework.ExpectNoError(e2epod.WaitForPodObservedGeneration(ctx, f.ClientSet, f.Namespace.Name, pod.Name, expectedPodGeneration, 30*time.Second)) + + // Verify pod generation converges to the expected generation. + pod, err := podClient.Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "failed to query for pod") + gomega.Expect(pod.Generation).To(gomega.BeEquivalentTo(expectedPodGeneration)) + }) + + ginkgo.It("pod rejected by kubelet should have updated generation and observedGeneration", func(ctx context.Context) { + node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet) + framework.ExpectNoError(err, "Failed to get a ready schedulable node") + + // Create a pod that requests more CPU than the node has. + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-out-of-cpu", + Namespace: f.Namespace.Name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "pod-out-of-cpu", + Image: imageutils.GetPauseImageName(), + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000000000000"), // requests more CPU than any node has + }, + }, + }, + }, + }, + } + + ginkgo.By("submitting the pod to kubernetes") + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}) + framework.ExpectNoError(err) + ginkgo.DeferCleanup(func(ctx context.Context) error { + ginkgo.By("deleting the pod") + return podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{}) + }) + + // Force assign the Pod to a node in order to get rejection status. + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: pod.UID, + }, + Target: v1.ObjectReference{ + Kind: "Node", + Name: node.Name, + }, + } + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{})) + + // Kubelet has rejected the pod. + err = e2epod.WaitForPodFailedReason(ctx, f.ClientSet, pod, "OutOfcpu", f.Timeouts.PodStart) + framework.ExpectNoError(err) + + // Fetch the rejected Pod and verify the generation and observedGeneration. + gotPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + gomega.Expect(gotPod.Generation).To(gomega.BeEquivalentTo(1)) + gomega.Expect(gotPod.Status.ObservedGeneration).To(gomega.BeEquivalentTo(1)) + }) }) }) diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index 11cf75bd9d0..280f7d4742f 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -976,6 +976,12 @@ lockToDefault: false preRelease: Alpha version: "1.32" +- name: PodObservedGenerationTracking + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.33" - name: PodReadyToStartContainersCondition versionedSpecs: - default: false