update e2e and integration test for PodIndexLabel

Signed-off-by: Alay Patel <alayp@nvidia.com>
This commit is contained in:
Alay Patel 2024-10-15 17:00:30 -04:00
parent 9b56413d56
commit 0aa065ab7e
6 changed files with 126 additions and 34 deletions

View File

@ -41,6 +41,7 @@ import (
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -466,6 +467,43 @@ done`}
gomega.Expect(gotIndexes).To(gomega.Equal(wantIndexes), "expected completed indexes %s, but got %s", wantIndexes, gotIndexes)
})
/*
Release: v1.32
Testname: Ensure Pods of an Indexed Job get a unique index for PodIndexLabel key.
Description: Create an Indexed job. Job MUST complete successfully.
Ensure that created pods have completion index label.
*/
// TODO: once this test is stable, squash the functionality into pre-existing conformance test called "should create
// pods for an Indexed job with completion indexes and specified hostname" earlier in this file.
framework.It("should create pods with completion indexes for an Indexed Job", feature.PodIndexLabel, func(ctx context.Context) {
parallelism := int32(2)
completions := int32(4)
backoffLimit := int32(6) // default value
ginkgo.By("Creating Indexed job")
job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job.Spec.CompletionMode = ptr.To(batchv1.IndexedCompletion)
job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name)
ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, nil, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
ginkgo.By("Ensuring all pods have the required index labels")
pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
succeededIndexes := sets.NewInt()
for _, pod := range pods.Items {
ix, err := strconv.Atoi(pod.Labels[batchv1.JobCompletionIndexAnnotation])
framework.ExpectNoError(err, "failed obtaining completion index in namespace: %s for pod: %s", pod.Namespace, pod.Name)
succeededIndexes.Insert(ix)
}
gotIndexes := succeededIndexes.List()
wantIndexes := []int{0, 1, 2, 3}
gomega.Expect(gotIndexes).To(gomega.Equal(wantIndexes), "expected completed indexes in namespace: %s for job: %s", job.Namespace, job.Name)
})
/*
Testcase: Ensure that job with successPolicy succeeded when all indexes succeeded
Description: Create an indexed job with successPolicy.

View File

@ -162,6 +162,9 @@ var _ = SIGDescribe("StatefulSet", func() {
ginkgo.By("Verifying statefulset set proper service name")
framework.ExpectNoError(e2estatefulset.CheckServiceName(ss, headlessSvcName))
ginkgo.By("checking the index label and value of all pods")
framework.ExpectNoError(e2estatefulset.CheckPodIndexLabel(ctx, c, ss))
cmd := "echo $(hostname) | dd of=/data/hostname conv=fsync"
ginkgo.By("Running " + cmd + " in all stateful pods")
framework.ExpectNoError(e2estatefulset.ExecInStatefulPods(ctx, c, ss, cmd))

View File

@ -346,6 +346,8 @@ var (
// TODO: document the feature (owning SIG, when to use this feature for a test)
StatefulSet = framework.WithFeature(framework.ValidFeatures.Add("StatefulSet"))
PodIndexLabel = framework.WithFeature(framework.ValidFeatures.Add("PodIndexLabel"))
// TODO: document the feature (owning SIG, when to use this feature for a test)
StatefulSetStartOrdinal = framework.WithFeature(framework.ValidFeatures.Add("StatefulSetStartOrdinal"))

View File

@ -20,6 +20,8 @@ import (
"context"
"fmt"
"path/filepath"
"reflect"
"strconv"
"strings"
"time"
@ -233,6 +235,26 @@ func CheckServiceName(ss *appsv1.StatefulSet, expectedServiceName string) error
return nil
}
// CheckPodIndexLabel asserts that the pods for ss have expected index label and values.
func CheckPodIndexLabel(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) error {
pods := GetPodList(ctx, c, ss)
labelIndices := sets.NewInt()
for _, pod := range pods.Items {
ix, err := strconv.Atoi(pod.Labels[appsv1.PodIndexLabel])
if err != nil {
return err
}
labelIndices.Insert(ix)
}
wantIndexes := []int{0, 1, 2}
gotIndexes := labelIndices.List()
if !reflect.DeepEqual(gotIndexes, wantIndexes) {
return fmt.Errorf("pod index labels are not as expected, got: %v, want: %v", gotIndexes, wantIndexes)
}
return nil
}
// ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods.
func ExecInStatefulPods(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, cmd string) error {
podList := GetPodList(ctx, c, ss)

View File

@ -4127,13 +4127,20 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients
for _, pod := range pods.Items {
if metav1.IsControlledBy(&pod, jobObj) {
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
ix, err := getCompletionIndex(&pod)
annotationIx, err := getCompletionIndex(pod.Annotations)
if err != nil {
t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err)
} else {
gotActive.Insert(ix)
t.Errorf("Failed getting completion index in annotations for pod %s: %v", pod.Name, err)
}
expectedName := fmt.Sprintf("%s-%d", jobObj.Name, ix)
labelIx, err := getCompletionIndex(pod.Labels)
if err != nil {
t.Errorf("Failed getting completion index in labels for pod %s: %v", pod.Name, err)
}
if annotationIx != labelIx {
t.Errorf("Mismatch in value of annotation index: %v and label index: %v", labelIx,
annotationIx)
}
gotActive.Insert(labelIx)
expectedName := fmt.Sprintf("%s-%d", jobObj.Name, labelIx)
if diff := cmp.Equal(expectedName, pod.Spec.Hostname); !diff {
t.Errorf("Got pod hostname %s, want %s", pod.Spec.Hostname, expectedName)
}
@ -4298,7 +4305,7 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job
if p := pod.Status.Phase; !metav1.IsControlledBy(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
continue
}
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
if pix, err := getCompletionIndex(pod.Annotations); err == nil && pix == ix {
pod.Status.Phase = phase
if phase == v1.PodFailed || phase == v1.PodSucceeded {
pod.Status.ContainerStatuses = []v1.ContainerStatus{
@ -4352,20 +4359,17 @@ func getJobPodsForIndex(ctx context.Context, clientSet clientset.Interface, jobO
if !filter(&pod) {
continue
}
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
if pix, err := getCompletionIndex(pod.Annotations); err == nil && pix == ix {
result = append(result, &pod)
}
}
return result, nil
}
func getCompletionIndex(p *v1.Pod) (int, error) {
if p.Annotations == nil {
return 0, errors.New("no annotations found")
}
v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotation]
func getCompletionIndex(lookupMap map[string]string) (int, error) {
v, ok := lookupMap[batchv1.JobCompletionIndexAnnotation]
if !ok {
return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotation)
return 0, fmt.Errorf("key %s not found in lookup Map", batchv1.JobCompletionIndexAnnotation)
}
return strconv.Atoi(v)
}

View File

@ -19,6 +19,7 @@ package statefulset
import (
"context"
"fmt"
"strconv"
"testing"
"time"
@ -655,24 +656,24 @@ func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
func TestStatefulSetStartOrdinal(t *testing.T) {
tests := []struct {
ordinals *appsv1.StatefulSetOrdinals
name string
namespace string
replicas int
expectedPodNames []string
ordinals *appsv1.StatefulSetOrdinals
name string
namespace string
replicas int
expectedPodIndexes []int
}{
{
name: "default start ordinal, no ordinals set",
namespace: "no-ordinals",
replicas: 3,
expectedPodNames: []string{"sts-0", "sts-1", "sts-2"},
name: "default start ordinal, no ordinals set",
namespace: "no-ordinals",
replicas: 3,
expectedPodIndexes: []int{0, 1, 2},
},
{
name: "default start ordinal",
namespace: "no-start-ordinals",
ordinals: &appsv1.StatefulSetOrdinals{},
replicas: 3,
expectedPodNames: []string{"sts-0", "sts-1", "sts-2"},
name: "default start ordinal",
namespace: "no-start-ordinals",
ordinals: &appsv1.StatefulSetOrdinals{},
replicas: 3,
expectedPodIndexes: []int{0, 1, 2},
},
{
name: "start ordinal 4",
@ -680,8 +681,8 @@ func TestStatefulSetStartOrdinal(t *testing.T) {
ordinals: &appsv1.StatefulSetOrdinals{
Start: 4,
},
replicas: 4,
expectedPodNames: []string{"sts-4", "sts-5", "sts-6", "sts-7"},
replicas: 4,
expectedPodIndexes: []int{4, 5, 6, 7},
},
{
name: "start ordinal 5",
@ -689,8 +690,8 @@ func TestStatefulSetStartOrdinal(t *testing.T) {
ordinals: &appsv1.StatefulSetOrdinals{
Start: 2,
},
replicas: 7,
expectedPodNames: []string{"sts-2", "sts-3", "sts-4", "sts-5", "sts-6", "sts-7", "sts-8"},
replicas: 7,
expectedPodIndexes: []int{2, 3, 4, 5, 6, 7, 8},
},
}
@ -719,17 +720,39 @@ func TestStatefulSetStartOrdinal(t *testing.T) {
}
var podNames []string
var podLabelIndexes []int
for _, pod := range pods.Items {
podNames = append(podNames, pod.Name)
if idx, ok := pod.Labels[appsv1.PodIndexLabel]; !ok {
t.Errorf("Expected pod index label with key: %s", appsv1.PodIndexLabel)
} else {
idxInt, err := strconv.Atoi(idx)
if err != nil {
t.Errorf("Unable to convert pod index to int, unexpected pod index: %s", idx)
}
podLabelIndexes = append(podLabelIndexes, idxInt)
}
}
ignoreOrder := cmpopts.SortSlices(func(a, b string) bool {
return a < b
})
ignoreOrderForOrdinals := cmpopts.SortSlices(func(a, b int) bool {
return a < b
})
expectedNames := []string{}
for _, ord := range test.expectedPodIndexes {
expectedNames = append(expectedNames, fmt.Sprintf("sts-%d", ord))
}
// Validate all the expected pods were created.
if diff := cmp.Diff(test.expectedPodNames, podNames, ignoreOrder); diff != "" {
if diff := cmp.Diff(expectedNames, podNames, ignoreOrder); diff != "" {
t.Errorf("Unexpected pod names: (-want +got): %v", diff)
}
// Validate all the expected index labels were added.
if diff := cmp.Diff(test.expectedPodIndexes, podLabelIndexes, ignoreOrderForOrdinals); diff != "" {
t.Errorf("Unexpected pod indices: (-want +got): %v", diff)
}
// Scale down to 1 pod and verify it matches the first pod.
scaleSTS(t, c, sts, 1)
@ -739,8 +762,8 @@ func TestStatefulSetStartOrdinal(t *testing.T) {
if len(pods.Items) != 1 {
t.Errorf("len(pods) = %v, want %v", len(pods.Items), 1)
}
if pods.Items[0].Name != test.expectedPodNames[0] {
t.Errorf("Unexpected singleton pod name: got = %v, want %v", pods.Items[0].Name, test.expectedPodNames[0])
if pods.Items[0].Name != expectedNames[0] {
t.Errorf("Unexpected singleton pod name: got = %v, want %v", pods.Items[0].Name, expectedNames[0])
}
})
}