mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-21 01:26:28 +00:00
Add integration test for managedBy and cleanup of finalizers
This commit is contained in:
parent
246e678acc
commit
194009fac9
@ -45,6 +45,7 @@ import (
|
|||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
|
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
basemetrics "k8s.io/component-base/metrics"
|
basemetrics "k8s.io/component-base/metrics"
|
||||||
@ -52,6 +53,7 @@ import (
|
|||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
|
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
|
||||||
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
@ -63,6 +65,12 @@ import (
|
|||||||
const waitInterval = time.Second
|
const waitInterval = time.Second
|
||||||
const fastPodFailureBackoff = 100 * time.Millisecond
|
const fastPodFailureBackoff = 100 * time.Millisecond
|
||||||
|
|
||||||
|
// Time duration used to account for controller latency in tests in which it is
|
||||||
|
// expected the Job controller does not make a change. In that cases we wait a
|
||||||
|
// little bit (more than the typical time for a couple of controller syncs) and
|
||||||
|
// verify there is no change.
|
||||||
|
const sleepDurationForControllerLatency = 100 * time.Millisecond
|
||||||
|
|
||||||
type metricLabelsWithValue struct {
|
type metricLabelsWithValue struct {
|
||||||
Labels []string
|
Labels []string
|
||||||
Value int
|
Value int
|
||||||
@ -1294,11 +1302,7 @@ func TestManagedBy(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)
|
validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)
|
||||||
|
|
||||||
// Await for a little bit to verify the reconciliation does not
|
time.Sleep(sleepDurationForControllerLatency)
|
||||||
// happen. We wait 100ms for the sync itself, because we already
|
|
||||||
// checked the metric is incremented so the sync would start
|
|
||||||
// immediately if it was queued.
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
|
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
|
||||||
@ -1360,9 +1364,7 @@ func TestManagedBy_Reenabling(t *testing.T) {
|
|||||||
Value: 1,
|
Value: 1,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Await for a little bit to verify the reconciliation does not happen.
|
time.Sleep(sleepDurationForControllerLatency)
|
||||||
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
|
|
||||||
time.Sleep(time.Second + 100*time.Millisecond)
|
|
||||||
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
|
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
|
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
|
||||||
@ -1395,22 +1397,17 @@ func TestManagedBy_Reenabling(t *testing.T) {
|
|||||||
resetMetrics()
|
resetMetrics()
|
||||||
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
|
||||||
|
|
||||||
// Marking the pod as finished, but
|
// Marking the pod as finished, but it does not result in updating of the Job status.
|
||||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||||
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
|
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Await for a little bit to verify the reconciliation does not happen.
|
|
||||||
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
|
|
||||||
time.Sleep(time.Second + 100*time.Millisecond)
|
|
||||||
|
|
||||||
validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
|
validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
|
||||||
Labels: []string{customControllerName},
|
Labels: []string{customControllerName},
|
||||||
Value: 1,
|
Value: 1,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Verify the built-in controller does not reconcile the Job. It is up to
|
time.Sleep(sleepDurationForControllerLatency)
|
||||||
// the external controller to update the status.
|
|
||||||
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
Active: 1,
|
Active: 1,
|
||||||
Ready: ptr.To[int32](0),
|
Ready: ptr.To[int32](0),
|
||||||
@ -1491,9 +1488,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
|
|||||||
Value: 1,
|
Value: 1,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Await for a little bit to verify the reconciliation does not happen.
|
time.Sleep(sleepDurationForControllerLatency)
|
||||||
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
|
|
||||||
time.Sleep(time.Second + 100*time.Millisecond)
|
|
||||||
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
|
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
|
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
|
||||||
@ -1503,6 +1498,127 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestManagedBy_UsingReservedJobFinalizers documents the behavior of the Job
|
||||||
|
// controller when there is a job with custom value of the managedBy field, creating
|
||||||
|
// pods with the batch.kubernetes.io/job-tracking finalizer. The built-in controller
|
||||||
|
// should not remove the finalizer. Note that, the use of the finalizer in jobs
|
||||||
|
// managed by external controllers is discouraged, but may potentially happen
|
||||||
|
// when one forks the controller and does not rename the finalizer.
|
||||||
|
func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) {
|
||||||
|
customControllerName := "example.com/custom-job-controller"
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
|
||||||
|
|
||||||
|
closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers")
|
||||||
|
defer closeFn()
|
||||||
|
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
|
||||||
|
defer cancel()
|
||||||
|
resetMetrics()
|
||||||
|
|
||||||
|
jobSpec := batchv1.Job{
|
||||||
|
TypeMeta: metav1.TypeMeta{
|
||||||
|
APIVersion: "batch/v1",
|
||||||
|
Kind: "Job",
|
||||||
|
},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "custom-job-test",
|
||||||
|
Namespace: ns.Name,
|
||||||
|
},
|
||||||
|
Spec: batchv1.JobSpec{
|
||||||
|
Completions: ptr.To[int32](1),
|
||||||
|
Parallelism: ptr.To[int32](1),
|
||||||
|
Template: v1.PodTemplateSpec{
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "main-container",
|
||||||
|
Image: "foo",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ManagedBy: ptr.To(customControllerName),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// Create a job with custom managedBy
|
||||||
|
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &jobSpec)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
podControl := controller.RealPodControl{
|
||||||
|
KubeClient: clientSet,
|
||||||
|
Recorder: &record.FakeRecorder{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the pod manually simulating the behavior of the external controller
|
||||||
|
// indicated by the managedBy field. We create the pod with the built-in
|
||||||
|
// finalizer.
|
||||||
|
podTemplate := jobObj.Spec.Template.DeepCopy()
|
||||||
|
podTemplate.Finalizers = append(podTemplate.Finalizers, batchv1.JobTrackingFinalizer)
|
||||||
|
err = podControl.CreatePodsWithGenerateName(ctx, jobObj.Namespace, podTemplate, jobObj, metav1.NewControllerRef(jobObj, batchv1.SchemeGroupVersion.WithKind("Job")), "pod1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v when creating a pod for job %q", err, klog.KObj(jobObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Getting the list of pods for the Jobs to obtain the reference to the created pod.
|
||||||
|
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
|
||||||
|
}
|
||||||
|
if len(jobPods) != 1 {
|
||||||
|
t.Fatalf("Unexpected number (%d) of pods for job: %v", len(jobPods), klog.KObj(jobObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marking the pod as finished (succeeded), before marking the parent job as complete.
|
||||||
|
podObj := jobPods[0]
|
||||||
|
podObj.Status.Phase = v1.PodSucceeded
|
||||||
|
podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark the job as finished so that the built-in controller receives the
|
||||||
|
// UpdateJob event in reaction to each it would remove the pod's finalizer,
|
||||||
|
// if not for the custom managedBy field.
|
||||||
|
jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{
|
||||||
|
Type: batchv1.JobComplete,
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
})
|
||||||
|
jobObj.Status.StartTime = ptr.To(metav1.Now())
|
||||||
|
jobObj.Status.CompletionTime = ptr.To(metav1.Now())
|
||||||
|
|
||||||
|
if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil {
|
||||||
|
t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the pod so that the built-in controller receives the UpdatePod event
|
||||||
|
// in reaction to each it would remove the pod's finalizer, if not for the
|
||||||
|
// custom value of the managedBy field on the job.
|
||||||
|
podObj.Status.Conditions = append(podObj.Status.Conditions, v1.PodCondition{
|
||||||
|
Type: v1.PodConditionType("CustomCondition"),
|
||||||
|
Status: v1.ConditionTrue,
|
||||||
|
})
|
||||||
|
podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v when adding a condition to the pod status %v", err, klog.KObj(podObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(sleepDurationForControllerLatency)
|
||||||
|
podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
|
||||||
|
}
|
||||||
|
|
||||||
|
if diff := cmp.Diff([]string{batchv1.JobTrackingFinalizer}, podObj.Finalizers); diff != "" {
|
||||||
|
t.Fatalf("Unexpected change in the set of finalizers for pod %q, because the owner job %q has custom managedBy, diff=%s", klog.KObj(podObj), klog.KObj(jobObj), diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getIndexFailureCount(p *v1.Pod) (int, error) {
|
func getIndexFailureCount(p *v1.Pod) (int, error) {
|
||||||
if p.Annotations == nil {
|
if p.Annotations == nil {
|
||||||
return 0, errors.New("no annotations found")
|
return 0, errors.New("no annotations found")
|
||||||
|
Loading…
Reference in New Issue
Block a user