mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Add integration test for Indexed Job
This commit is contained in:
parent
e22e9b4f83
commit
18e35a86ed
@ -18,19 +18,28 @@ package job
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
eventsv1 "k8s.io/api/events/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
"k8s.io/kubernetes/pkg/controller/job"
|
"k8s.io/kubernetes/pkg/controller/job"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
@ -181,6 +190,85 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIndexedJob(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||||
|
|
||||||
|
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||||
|
defer closeFn()
|
||||||
|
ctx, cancel := startJobController(restConfig, clientSet)
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||||
|
Spec: batchv1.JobSpec{
|
||||||
|
Parallelism: pointer.Int32Ptr(3),
|
||||||
|
Completions: pointer.Int32Ptr(4),
|
||||||
|
CompletionMode: batchv1.IndexedCompletion,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create Job: %v", err)
|
||||||
|
}
|
||||||
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
|
Active: 3,
|
||||||
|
})
|
||||||
|
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
|
||||||
|
|
||||||
|
// One Pod succeeds.
|
||||||
|
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
|
t.Fatal("Failed trying to succeed pod with index 1")
|
||||||
|
}
|
||||||
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
|
Active: 3,
|
||||||
|
Succeeded: 1,
|
||||||
|
})
|
||||||
|
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||||
|
|
||||||
|
// Disable feature gate and restart controller.
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
|
||||||
|
cancel()
|
||||||
|
ctx, cancel = startJobController(restConfig, clientSet)
|
||||||
|
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer events.Stop()
|
||||||
|
|
||||||
|
// One Pod fails, but no recreations happen because feature is disabled.
|
||||||
|
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||||
|
t.Fatal("Failed trying to succeed pod with index 2")
|
||||||
|
}
|
||||||
|
if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil {
|
||||||
|
t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err)
|
||||||
|
}
|
||||||
|
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1")
|
||||||
|
|
||||||
|
// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||||
|
cancel()
|
||||||
|
ctx, cancel = startJobController(restConfig, clientSet)
|
||||||
|
|
||||||
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
|
Active: 3,
|
||||||
|
Failed: 1,
|
||||||
|
Succeeded: 1,
|
||||||
|
})
|
||||||
|
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||||
|
|
||||||
|
// Remaining Pods succeed.
|
||||||
|
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||||
|
t.Fatal("Failed trying to succeed remaining pods")
|
||||||
|
}
|
||||||
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
|
Active: 0,
|
||||||
|
Failed: 1,
|
||||||
|
Succeeded: 4,
|
||||||
|
})
|
||||||
|
validateJobPodsIndexes(ctx, t, clientSet, jobObj, nil, "0-3")
|
||||||
|
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||||
|
}
|
||||||
|
|
||||||
type podsByStatus struct {
|
type podsByStatus struct {
|
||||||
Active int
|
Active int
|
||||||
Failed int
|
Failed int
|
||||||
@ -223,6 +311,62 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validateJobPodsIndexes validates indexes of active and completed Pods of an
|
||||||
|
// Indexed Job. Call after validateJobPodsStatus
|
||||||
|
func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Int, gotCompleted string) {
|
||||||
|
t.Helper()
|
||||||
|
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get updated Job: %v", err)
|
||||||
|
}
|
||||||
|
if updatedJob.Status.CompletedIndexes != gotCompleted {
|
||||||
|
t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted)
|
||||||
|
}
|
||||||
|
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to list Job Pods: %v", err)
|
||||||
|
}
|
||||||
|
gotActive := sets.NewInt()
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if isPodOwnedByJob(&pod, jobObj) {
|
||||||
|
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
|
||||||
|
if ix, err := getCompletionIndex(&pod); err != nil {
|
||||||
|
t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err)
|
||||||
|
} else {
|
||||||
|
gotActive.Insert(ix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if wantActive == nil {
|
||||||
|
wantActive = sets.NewInt()
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(wantActive.List(), gotActive.List()); diff != "" {
|
||||||
|
t.Errorf("Unexpected active indexes (-want,+got):\n%s", diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
|
||||||
|
return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
for {
|
||||||
|
var ev watch.Event
|
||||||
|
select {
|
||||||
|
case ev = <-events.ResultChan():
|
||||||
|
default:
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
e, ok := ev.Object.(*eventsv1.Event)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ctrl := "job-controller"
|
||||||
|
if (e.ReportingController == ctrl || e.DeprecatedSource.Component == ctrl) && e.Reason == reason && e.Regarding.UID == uid {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
@ -265,6 +409,38 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
|
||||||
|
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("listing Job Pods: %w", err)
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
|
||||||
|
pod.Status.Phase = phase
|
||||||
|
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("updating pod %s status: %w", pod.Name, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.New("no pod matching index found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCompletionIndex(p *v1.Pod) (int, error) {
|
||||||
|
if p.Annotations == nil {
|
||||||
|
return 0, errors.New("no annotations found")
|
||||||
|
}
|
||||||
|
v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotationAlpha]
|
||||||
|
if !ok {
|
||||||
|
return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotationAlpha)
|
||||||
|
}
|
||||||
|
return strconv.Atoi(v)
|
||||||
|
}
|
||||||
|
|
||||||
func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool {
|
func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool {
|
||||||
for _, owner := range p.ObjectMeta.OwnerReferences {
|
for _, owner := range p.ObjectMeta.OwnerReferences {
|
||||||
if owner.Kind == "Job" && owner.UID == j.UID {
|
if owner.Kind == "Job" && owner.UID == j.UID {
|
||||||
|
Loading…
Reference in New Issue
Block a user