Add Job e2e for tracking failure count per index (#130390)

* Add Job e2e for tracking failure count per index

* Review remarks
This commit is contained in:
Michał Woźniak
2025-02-25 16:10:37 +01:00
committed by GitHub
parent 49f419e84e
commit d66928b842
2 changed files with 165 additions and 0 deletions

View File

@@ -660,6 +660,57 @@ done`}
gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(1)))
})
/*
Testname: Track the failure count per index in Pod annotation when backoffLimitPerIndex is used
Description: Create an indexed job and ensure that the Pods are
re-created with the failure-count Pod annotation set properly to
indicate the number of so-far failures per index.
*/
ginkgo.It("should record the failure-count in the Pod annotation when using backoffLimitPerIndex", func(ctx context.Context) {
jobName := "e2e-backofflimitperindex-" + utilrand.String(5)
label := map[string]string{batchv1.JobNameLabel: jobName}
labelSelector := labels.SelectorFromSet(label).String()
parallelism := int32(2)
completions := int32(2)
backoffLimit := int32(6) // default value
job := e2ejob.NewTestJob("fail", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job.Spec.BackoffLimit = nil
job.Spec.BackoffLimitPerIndex = ptr.To[int32](1)
job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion)
tracker := NewIndexedPodAnnotationTracker(jobName, f.Namespace.Name, labelSelector, batchv1.JobCompletionIndexAnnotation, batchv1.JobIndexFailureCountAnnotation)
trackerCancel := tracker.Start(ctx, f.ClientSet)
ginkgo.DeferCleanup(trackerCancel)
ginkgo.By("Creating an indexed job with backoffLimit per index and failing pods")
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
ginkgo.By("Awaiting for the job to fail as there are failed indexes")
err = e2ejob.WaitForJobFailed(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Verify the failure-count annotation on Pods")
// Since the Job is already failed all the relevant Pod events are
// already being distributed. Still, there might be a little bit of lag
// between the events being receiced by the Job controller and the test
// code so we need to wait a little bit.
gomega.Eventually(ctx, tracker.cloneTrackedAnnotations).
WithTimeout(15 * time.Second).
WithPolling(500 * time.Millisecond).
Should(gomega.Equal(map[int][]string{0: {"0", "1"}, 1: {"0", "1"}}))
ginkgo.By("Verifying the Job status fields")
job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to retrieve latest job object")
gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("0,1")))
gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal(""))
gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(4)))
gomega.Expect(job.Status.Succeeded).Should(gomega.Equal(int32(0)))
})
/*
Testcase: Mark indexes as failed when the FailIndex action is matched in podFailurePolicy
Description: Create an indexed job with backoffLimitPerIndex, and podFailurePolicy

114
test/e2e/apps/util.go Normal file
View File

@@ -0,0 +1,114 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apps
import (
"context"
"maps"
"strconv"
"sync"
"github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/framework"
)
type IndexedPodAnnotationTracker struct {
sync.Mutex
ownerName string
ownerNs string
labelSelector string
podIndexAnnotation string
podTrackedAnnotation string
trackedAnnotations map[int][]string
}
func NewIndexedPodAnnotationTracker(ownerName, ownerNs, labelSelector, podIndexAnnotation, podTrackedAnnotation string) *IndexedPodAnnotationTracker {
return &IndexedPodAnnotationTracker{
ownerName: ownerName,
ownerNs: ownerNs,
labelSelector: labelSelector,
podIndexAnnotation: podIndexAnnotation,
podTrackedAnnotation: podTrackedAnnotation,
trackedAnnotations: make(map[int][]string),
}
}
func (t *IndexedPodAnnotationTracker) Start(ctx context.Context, c clientset.Interface) context.CancelFunc {
trackerCtx, trackerCancel := context.WithCancel(ctx)
_, podTracker := cache.NewInformerWithOptions(cache.InformerOptions{
ListerWatcher: &cache.ListWatch{
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = t.labelSelector
obj, err := c.CoreV1().Pods(t.ownerNs).List(ctx, options)
return runtime.Object(obj), err
},
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = t.labelSelector
return c.CoreV1().Pods(t.ownerNs).Watch(ctx, options)
},
},
ObjectType: &v1.Pod{},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
defer ginkgo.GinkgoRecover()
if pod, ok := obj.(*v1.Pod); ok {
framework.Logf("Observed event for Pod %q with index=%v, annotation value=%v",
klog.KObj(pod), pod.Annotations[t.podIndexAnnotation], pod.Annotations[t.podTrackedAnnotation])
podIndex, err := strconv.Atoi(pod.Annotations[t.podIndexAnnotation])
if err != nil {
framework.Failf("failed to parse pod index for Pod %q: %v", klog.KObj(pod), err.Error())
} else {
t.Lock()
defer t.Unlock()
t.trackedAnnotations[podIndex] = append(t.trackedAnnotations[podIndex], pod.Annotations[t.podTrackedAnnotation])
}
}
},
UpdateFunc: func(old, new interface{}) {
defer ginkgo.GinkgoRecover()
oldPod, oldOk := old.(*v1.Pod)
newPod, newOk := new.(*v1.Pod)
if !oldOk || !newOk {
return
}
if oldPod.Annotations[t.podTrackedAnnotation] != newPod.Annotations[t.podTrackedAnnotation] {
framework.Failf("Unexepected mutation of the annotation %q for Pod %q, old=%q, new=%q",
t.podTrackedAnnotation,
klog.KObj(newPod),
oldPod.Annotations[t.podTrackedAnnotation],
newPod.Annotations[t.podTrackedAnnotation],
)
}
},
},
})
go podTracker.RunWithContext(trackerCtx)
return trackerCancel
}
func (t *IndexedPodAnnotationTracker) cloneTrackedAnnotations() map[int][]string {
t.Lock()
defer t.Unlock()
return maps.Clone(t.trackedAnnotations)
}