Kubelet sets pod.status.observedGeneration behind FG

This commit is contained in:
Natasha Sarkar 2025-02-25 22:11:40 +00:00
parent bb3ba9d073
commit 40e7d88f02
10 changed files with 162 additions and 2 deletions

View File

@ -23,6 +23,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)
// FindPort locates the container port for the given pod and portName. If the
@ -416,3 +418,13 @@ func IsRestartableInitContainer(initContainer *v1.Container) bool {
}
return *initContainer.RestartPolicy == v1.ContainerRestartPolicyAlways
}
// We will emit status.observedGeneration if the feature is enabled OR if status.observedGeneration is already set.
// This protects against an infinite loop of kubelet trying to clear the value after the FG is turned off, and
// the API server preserving existing values when an incoming update tries to clear it.
func GetPodObservedGenerationIfEnabled(pod *v1.Pod) int64 {
if pod.Status.ObservedGeneration != 0 || utilfeature.DefaultFeatureGate.Enabled(features.PodObservedGenerationTracking) {
return pod.Generation
}
return 0
}

View File

@ -502,6 +502,12 @@ const (
// Allows zero value for sleep duration in SleepAction in container lifecycle hooks
PodLifecycleSleepActionAllowZero featuregate.Feature = "PodLifecycleSleepActionAllowZero"
// owner: @natasha41575
// kep: http://kep.k8s.io/5067
//
// Enables the pod to report status.ObservedGeneration to reflect the generation of the last observed podspec.
PodObservedGenerationTracking featuregate.Feature = "PodObservedGenerationTracking"
// owner: @Huang-Wei
// kep: https://kep.k8s.io/3521
//

View File

@ -586,13 +586,20 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.Beta},
},
PodReadyToStartContainersCondition: {
{Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.29"), Default: true, PreRelease: featuregate.Beta},
},
PodLifecycleSleepActionAllowZero: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
},
PodObservedGenerationTracking: {
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
},
PodSchedulingReadiness: {
{Version: version.MustParse("1.26"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.27"), Default: true, PreRelease: featuregate.Beta},

View File

@ -456,6 +456,7 @@ func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGr
existing.Labels = ref.Labels
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
existing.Generation = ref.Generation
existing.Status = ref.Status
updateAnnotations(existing, ref)

View File

@ -235,6 +235,9 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
// Make sure we're caching a deep copy.
status = *status.DeepCopy()
// Set the observedGeneration for this pod status.
status.ObservedGeneration = podutil.GetPodObservedGenerationIfEnabled(pod)
// Force a status update if deletion timestamp is set. This is necessary
// because if the pod is in the non-running state, the pod worker still
// needs to be able to trigger an update and/or deletion.
@ -599,6 +602,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
status.StartTime = &now
}
// prevent sending unnecessary patches
if oldStatus.ObservedGeneration > status.ObservedGeneration {
status.ObservedGeneration = oldStatus.ObservedGeneration
}
normalizeStatus(pod, &status)
// Perform some more extensive logging of container termination state to assist in

View File

@ -357,6 +357,10 @@ var (
// (used for testing specific log stream <https://kep.k8s.io/3288>)
PodLogsQuerySplitStreams = framework.WithFeature(framework.ValidFeatures.Add("PodLogsQuerySplitStreams"))
// Owner: sig-node
// Marks tests that require a cluster with PodObservedGenerationTracking
PodObservedGenerationTracking = framework.WithFeature(framework.ValidFeatures.Add("PodObservedGenerationTracking"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
PodPriority = framework.WithFeature(framework.ValidFeatures.Add("PodPriority"))

View File

@ -285,7 +285,7 @@ func WaitForPodsRunningReady(ctx context.Context, c clientset.Interface, ns stri
}
// WaitForPodCondition waits a pods to be matched to the given condition.
// WaitForPodCondition waits for a pod to be matched to the given condition.
// The condition callback may use gomega.StopTrying to abort early.
func WaitForPodCondition(ctx context.Context, c clientset.Interface, ns, podName, conditionDesc string, timeout time.Duration, condition podCondition) error {
return framework.Gomega().
@ -305,6 +305,21 @@ func WaitForPodCondition(ctx context.Context, c clientset.Interface, ns, podName
}))
}
// WaitForPodObservedGeneration waits for a pod to have the given observed generation.
func WaitForPodObservedGeneration(ctx context.Context, c clientset.Interface, ns, podName string, expectedGeneration int64, timeout time.Duration) error {
return framework.Gomega().
Eventually(ctx, framework.RetryNotFound(framework.GetObject(c.CoreV1().Pods(ns).Get, podName, metav1.GetOptions{}))).
WithTimeout(timeout).
Should(framework.MakeMatcher(func(pod *v1.Pod) (func() string, error) {
if pod.Status.ObservedGeneration == expectedGeneration {
return nil, nil
}
return func() string {
return fmt.Sprintf("expected pod generation to be %d, got %d instead:\n", expectedGeneration, pod.Status.ObservedGeneration)
}, nil
}))
}
// Range determines how many items must exist and how many must match a certain
// condition. Values <= 0 are ignored.
// TODO (?): move to test/e2e/framework/range

View File

@ -99,6 +99,7 @@ var _ = SIGDescribe("PodRejectionStatus", func() {
// This detects if there are any new fields in Status that were dropped by the pod rejection.
// These new fields either should be kept by kubelet's admission or added explicitly in the list of fields that are having a different value or must be cleared.
gomega.Expect(gotPod.Status).To(gstruct.MatchAllFields(gstruct.Fields{
"ObservedGeneration": gstruct.Ignore(),
"Phase": gstruct.Ignore(),
"Conditions": gstruct.Ignore(),
"Message": gstruct.Ignore(),

View File

@ -39,9 +39,12 @@ import (
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
@ -414,6 +417,11 @@ var _ = SIGDescribe("Pods Extended", func() {
})
})
})
})
var _ = SIGDescribe("Pods Extended (pod generation)", feature.PodObservedGenerationTracking, framework.WithFeatureGate(features.PodObservedGenerationTracking), func() {
f := framework.NewDefaultFramework("pods")
f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
ginkgo.Describe("Pod Generation", func() {
var podClient *e2epod.PodClient
@ -507,7 +515,7 @@ var _ = SIGDescribe("Pods Extended", func() {
},
}
expectedPodGeneration := 1
expectedPodGeneration := int64(1)
for _, test := range tests {
ginkgo.By(test.name)
podClient.Update(ctx, podName, test.updateFn)
@ -517,6 +525,7 @@ var _ = SIGDescribe("Pods Extended", func() {
expectedPodGeneration++
}
gomega.Expect(pod.Generation).To(gomega.BeEquivalentTo(expectedPodGeneration))
framework.ExpectNoError(e2epod.WaitForPodObservedGeneration(ctx, f.ClientSet, f.Namespace.Name, pod.Name, expectedPodGeneration, 20*time.Second))
}
})
@ -555,6 +564,97 @@ var _ = SIGDescribe("Pods Extended", func() {
framework.ExpectNoError(err, "failed to query for pod")
gomega.Expect(pod.Generation).To(gomega.BeEquivalentTo(2))
})
ginkgo.It("issue 500 podspec updates and verify generation and observedGeneration eventually converge", func(ctx context.Context) {
ginkgo.By("creating the pod")
name := "pod-generation-" + string(uuid.NewUUID())
value := strconv.Itoa(time.Now().Nanosecond())
pod := e2epod.NewAgnhostPod(f.Namespace.Name, name, nil, nil, nil)
pod.ObjectMeta.Labels = map[string]string{
"time": value,
}
pod.Spec.ActiveDeadlineSeconds = utilpointer.Int64(5000)
ginkgo.By("submitting the pod to kubernetes")
pod = podClient.CreateSync(ctx, pod)
ginkgo.DeferCleanup(func(ctx context.Context) error {
ginkgo.By("deleting the pod")
return podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{})
})
for i := 0; i < 499; i++ {
podClient.Update(ctx, pod.Name, func(pod *v1.Pod) {
*pod.Spec.ActiveDeadlineSeconds--
})
}
// Verify pod observedGeneration converges to the expected generation.
expectedPodGeneration := int64(500)
framework.ExpectNoError(e2epod.WaitForPodObservedGeneration(ctx, f.ClientSet, f.Namespace.Name, pod.Name, expectedPodGeneration, 30*time.Second))
// Verify pod generation converges to the expected generation.
pod, err := podClient.Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to query for pod")
gomega.Expect(pod.Generation).To(gomega.BeEquivalentTo(expectedPodGeneration))
})
ginkgo.It("pod rejected by kubelet should have updated generation and observedGeneration", func(ctx context.Context) {
node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
framework.ExpectNoError(err, "Failed to get a ready schedulable node")
// Create a pod that requests more CPU than the node has.
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-out-of-cpu",
Namespace: f.Namespace.Name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pod-out-of-cpu",
Image: imageutils.GetPauseImageName(),
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000000000000"), // requests more CPU than any node has
},
},
},
},
},
}
ginkgo.By("submitting the pod to kubernetes")
pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.DeferCleanup(func(ctx context.Context) error {
ginkgo.By("deleting the pod")
return podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{})
})
// Force assign the Pod to a node in order to get rejection status.
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
UID: pod.UID,
},
Target: v1.ObjectReference{
Kind: "Node",
Name: node.Name,
},
}
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}))
// Kubelet has rejected the pod.
err = e2epod.WaitForPodFailedReason(ctx, f.ClientSet, pod, "OutOfcpu", f.Timeouts.PodStart)
framework.ExpectNoError(err)
// Fetch the rejected Pod and verify the generation and observedGeneration.
gotPod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
gomega.Expect(gotPod.Generation).To(gomega.BeEquivalentTo(1))
gomega.Expect(gotPod.Status.ObservedGeneration).To(gomega.BeEquivalentTo(1))
})
})
})

View File

@ -976,6 +976,12 @@
lockToDefault: false
preRelease: Alpha
version: "1.32"
- name: PodObservedGenerationTracking
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.33"
- name: PodReadyToStartContainersCondition
versionedSpecs:
- default: false