Add benchmark for large indexed job

Change-Id: I556f0cce5842699c98654cfb5a66e7c8d63b2e2e
This commit is contained in:
Aldo Culquicondor 2022-10-31 14:52:48 -04:00
parent 4a3bac5eae
commit 5e03865f65
No known key found for this signature in database
GPG Key ID: 51D903912270D4EE
3 changed files with 94 additions and 13 deletions

View File

@ -1230,7 +1230,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
}
if !apierrors.IsNotFound(err) {
errCh <- err
utilruntime.HandleError(err)
utilruntime.HandleError(fmt.Errorf("removing tracking finalizer: %w", err))
return
}
}

View File

@ -45,7 +45,7 @@ const (
)
// CreateNamespaceOrDie creates a namespace.
func CreateNamespaceOrDie(c clientset.Interface, baseName string, t *testing.T) *v1.Namespace {
func CreateNamespaceOrDie(c clientset.Interface, baseName string, t testing.TB) *v1.Namespace {
ns := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: baseName}}
result, err := c.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{})
if err != nil {
@ -55,7 +55,7 @@ func CreateNamespaceOrDie(c clientset.Interface, baseName string, t *testing.T)
}
// DeleteNamespaceOrDie deletes a namespace.
func DeleteNamespaceOrDie(c clientset.Interface, ns *v1.Namespace, t *testing.T) {
func DeleteNamespaceOrDie(c clientset.Interface, ns *v1.Namespace, t testing.TB) {
err := c.CoreV1().Namespaces().Delete(context.TODO(), ns.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete namespace: %v", err)

View File

@ -437,7 +437,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
failedPod := jobPods[failedIndex]
updatedPod := failedPod.DeepCopy()
updatedPod.Status = podStatusMatchingOnExitCodesTerminateRule
err, _ = updatePodStatuses(ctx, cs, []v1.Pod{*updatedPod})
_, err = updatePodStatuses(ctx, cs, []v1.Pod{*updatedPod})
if err != nil {
t.Fatalf("Failed to update pod statuses %q for pods of job %q", err, klog.KObj(jobObj))
}
@ -1084,6 +1084,85 @@ func TestIndexedJob(t *testing.T) {
}
}
// BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job.
// We expect that large jobs are more commonly used as Indexed. And they are
// also faster to track, as they need less API calls.
func BenchmarkLargeIndexedJob(b *testing.B) {
defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(b, "indexed")
restConfig.QPS = 100
restConfig.Burst = 100
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
backoff := wait.Backoff{
Duration: time.Second,
Factor: 1.5,
Steps: 30,
Cap: 5 * time.Minute,
}
mode := batchv1.IndexedCompletion
for _, nPods := range []int32{1000, 10_000} {
b.Run(fmt.Sprintf("nPods=%d", nPods), func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("npods-%d-%d", nPods, n),
},
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(nPods),
Completions: pointer.Int32Ptr(nPods),
CompletionMode: &mode,
},
})
if err != nil {
b.Fatalf("Failed to create Job: %v", err)
}
remaining := int(nPods)
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
remaining -= succ
b.Logf("Transient failure succeeding pods: %v", err)
return false, nil
}
return true, nil
}); err != nil {
b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
}
validateJobSucceeded(ctx, b, clientSet, jobObj)
// Cleanup Pods and Job.
b.StopTimer()
// Clean up pods in pages, because DeleteCollection might timeout.
// #90743
for {
pods, err := clientSet.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
b.Fatalf("Failed to list Pods for cleanup: %v", err)
}
if len(pods.Items) == 0 {
break
}
err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx,
metav1.DeleteOptions{},
metav1.ListOptions{
Limit: 1000,
})
if err != nil {
b.Fatalf("Failed to cleanup Pods: %v", err)
}
}
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
if err != nil {
b.Fatalf("Failed to cleanup Job: %v", err)
}
b.StartTimer()
}
})
}
}
// TestDisableJobTrackingWithFinalizers ensures that when the
// JobTrackingWithFinalizers feature is disabled, tracking finalizers are
// removed from all pods, but Job continues to be tracked.
@ -1723,12 +1802,12 @@ func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.In
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
}
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
func validateJobSucceeded(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper()
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete)
}
func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) {
func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) {
t.Helper()
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
@ -1780,13 +1859,14 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
updates = append(updates, pod)
}
}
if len(updates) != cnt {
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt), 0
successful, err := updatePodStatuses(ctx, clientSet, updates)
if successful != cnt {
return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful
}
return updatePodStatuses(ctx, clientSet, updates)
return err, successful
}
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (error, int) {
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {
wg := sync.WaitGroup{}
wg.Add(len(updates))
errCh := make(chan error, len(updates))
@ -1808,10 +1888,10 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat
select {
case err := <-errCh:
return fmt.Errorf("updating Pod status: %w", err), int(updated)
return int(updated), fmt.Errorf("updating Pod status: %w", err)
default:
}
return nil, int(updated)
return int(updated), nil
}
func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
@ -1861,13 +1941,14 @@ func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, n
return clientSet.BatchV1().Jobs(ns).Create(ctx, jobObj, metav1.CreateOptions{})
}
func setup(t *testing.T, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) {
func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
config := restclient.CopyConfig(server.ClientConfig)
config.QPS = 200
config.Burst = 200
config.Timeout = 0
clientSet, err := clientset.NewForConfig(config)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)