From 194009fac9d7b8b14718798e2010389edd111354 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 5 Mar 2024 18:31:14 +0100 Subject: [PATCH] Add integration test for managedBy and cleanup of finalizers --- test/integration/job/job_test.go | 152 +++++++++++++++++++++++++++---- 1 file changed, 134 insertions(+), 18 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index e61a9e2b640..96c3d7169e9 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -45,6 +45,7 @@ import ( clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" basemetrics "k8s.io/component-base/metrics" @@ -52,6 +53,7 @@ import ( "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller" jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" @@ -63,6 +65,12 @@ import ( const waitInterval = time.Second const fastPodFailureBackoff = 100 * time.Millisecond +// Time duration used to account for controller latency in tests in which it is +// expected the Job controller does not make a change. In that cases we wait a +// little bit (more than the typical time for a couple of controller syncs) and +// verify there is no change. +const sleepDurationForControllerLatency = 100 * time.Millisecond + type metricLabelsWithValue struct { Labels []string Value int @@ -1294,11 +1302,7 @@ func TestManagedBy(t *testing.T) { } else { validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric) - // Await for a little bit to verify the reconciliation does not - // happen. We wait 100ms for the sync itself, because we already - // checked the metric is incremented so the sync would start - // immediately if it was queued. - time.Sleep(100 * time.Millisecond) + time.Sleep(sleepDurationForControllerLatency) jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj)) @@ -1360,9 +1364,7 @@ func TestManagedBy_Reenabling(t *testing.T) { Value: 1, }) - // Await for a little bit to verify the reconciliation does not happen. - // We wait 1s to account for queued sync delay plus 100ms for the sync itself. - time.Sleep(time.Second + 100*time.Millisecond) + time.Sleep(sleepDurationForControllerLatency) jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj)) @@ -1395,22 +1397,17 @@ func TestManagedBy_Reenabling(t *testing.T) { resetMetrics() ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) - // Marking the pod as finished, but + // Marking the pod as finished, but it does not result in updating of the Job status. if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj)) } - // Await for a little bit to verify the reconciliation does not happen. - // We wait 1s to account for queued sync delay plus 100ms for the sync itself. - time.Sleep(time.Second + 100*time.Millisecond) - validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{ Labels: []string{customControllerName}, Value: 1, }) - // Verify the built-in controller does not reconcile the Job. It is up to - // the external controller to update the status. + time.Sleep(sleepDurationForControllerLatency) validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ Active: 1, Ready: ptr.To[int32](0), @@ -1491,9 +1488,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) { Value: 1, }) - // Await for a little bit to verify the reconciliation does not happen. - // We wait 1s to account for queued sync delay plus 100ms for the sync itself. - time.Sleep(time.Second + 100*time.Millisecond) + time.Sleep(sleepDurationForControllerLatency) jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj)) @@ -1503,6 +1498,127 @@ func TestManagedBy_RecreatedJob(t *testing.T) { } } +// TestManagedBy_UsingReservedJobFinalizers documents the behavior of the Job +// controller when there is a job with custom value of the managedBy field, creating +// pods with the batch.kubernetes.io/job-tracking finalizer. The built-in controller +// should not remove the finalizer. Note that, the use of the finalizer in jobs +// managed by external controllers is discouraged, but may potentially happen +// when one forks the controller and does not rename the finalizer. +func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) { + customControllerName := "example.com/custom-job-controller" + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + defer cancel() + resetMetrics() + + jobSpec := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "custom-job-test", + Namespace: ns.Name, + }, + Spec: batchv1.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main-container", + Image: "foo", + }, + }, + }, + }, + ManagedBy: ptr.To(customControllerName), + }, + } + // Create a job with custom managedBy + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &jobSpec) + if err != nil { + t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj)) + } + + podControl := controller.RealPodControl{ + KubeClient: clientSet, + Recorder: &record.FakeRecorder{}, + } + + // Create the pod manually simulating the behavior of the external controller + // indicated by the managedBy field. We create the pod with the built-in + // finalizer. + podTemplate := jobObj.Spec.Template.DeepCopy() + podTemplate.Finalizers = append(podTemplate.Finalizers, batchv1.JobTrackingFinalizer) + err = podControl.CreatePodsWithGenerateName(ctx, jobObj.Namespace, podTemplate, jobObj, metav1.NewControllerRef(jobObj, batchv1.SchemeGroupVersion.WithKind("Job")), "pod1") + if err != nil { + t.Fatalf("Error %v when creating a pod for job %q", err, klog.KObj(jobObj)) + } + + // Getting the list of pods for the Jobs to obtain the reference to the created pod. + jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true }) + if err != nil { + t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj)) + } + if len(jobPods) != 1 { + t.Fatalf("Unexpected number (%d) of pods for job: %v", len(jobPods), klog.KObj(jobObj)) + } + + // Marking the pod as finished (succeeded), before marking the parent job as complete. + podObj := jobPods[0] + podObj.Status.Phase = v1.PodSucceeded + podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj)) + } + + // Mark the job as finished so that the built-in controller receives the + // UpdateJob event in reaction to each it would remove the pod's finalizer, + // if not for the custom managedBy field. + jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{ + Type: batchv1.JobComplete, + Status: v1.ConditionTrue, + }) + jobObj.Status.StartTime = ptr.To(metav1.Now()) + jobObj.Status.CompletionTime = ptr.To(metav1.Now()) + + if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj)) + } + + podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj)) + } + + // Update the pod so that the built-in controller receives the UpdatePod event + // in reaction to each it would remove the pod's finalizer, if not for the + // custom value of the managedBy field on the job. + podObj.Status.Conditions = append(podObj.Status.Conditions, v1.PodCondition{ + Type: v1.PodConditionType("CustomCondition"), + Status: v1.ConditionTrue, + }) + podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error %v when adding a condition to the pod status %v", err, klog.KObj(podObj)) + } + + time.Sleep(sleepDurationForControllerLatency) + podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj)) + } + + if diff := cmp.Diff([]string{batchv1.JobTrackingFinalizer}, podObj.Finalizers); diff != "" { + t.Fatalf("Unexpected change in the set of finalizers for pod %q, because the owner job %q has custom managedBy, diff=%s", klog.KObj(podObj), klog.KObj(jobObj), diff) + } +} + func getIndexFailureCount(p *v1.Pod) (int, error) { if p.Annotations == nil { return 0, errors.New("no annotations found")