Graduate JobReadyPods to beta

Set podUpdateBatchPeriod to 1s

Change-Id: I8a10fd8f8559adad9df179b664b8c82851607855
This commit is contained in:
Aldo Culquicondor 2022-01-11 14:20:43 -05:00
parent 1ea07d482a
commit 8c00f510ef
11 changed files with 100 additions and 28 deletions

View File

@ -4297,7 +4297,7 @@
"type": "integer"
},
"ready": {
"description": "The number of pods which have a Ready condition.\n\nThis field is alpha-level. The job controller populates the field when the feature gate JobReadyPods is enabled (disabled by default).",
"description": "The number of pods which have a Ready condition.\n\nThis field is beta-level. The job controller populates the field when the feature gate JobReadyPods is enabled (enabled by default).",
"format": "int32",
"type": "integer"
},

View File

@ -344,7 +344,7 @@
"type": "integer"
},
"ready": {
"description": "The number of pods which have a Ready condition.\n\nThis field is alpha-level. The job controller populates the field when the feature gate JobReadyPods is enabled (disabled by default).",
"description": "The number of pods which have a Ready condition.\n\nThis field is beta-level. The job controller populates the field when the feature gate JobReadyPods is enabled (enabled by default).",
"format": "int32",
"type": "integer"
},

View File

@ -245,8 +245,8 @@ type JobStatus struct {
// The number of active pods which have a Ready condition.
//
// This field is alpha-level. The job controller populates the field when
// the feature gate JobReadyPods is enabled (disabled by default).
// This field is beta-level. The job controller populates the field when
// the feature gate JobReadyPods is enabled (enabled by default).
// +optional
Ready *int32

View File

@ -59,7 +59,7 @@ import (
// podUpdateBatchPeriod is the batch period to hold pod updates before syncing
// a Job. It is used if the feature gate JobReadyPods is enabled.
const podUpdateBatchPeriod = 500 * time.Millisecond
const podUpdateBatchPeriod = time.Second
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")

View File

@ -27,7 +27,7 @@ import (
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -2146,6 +2146,8 @@ func TestAddPod(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2191,6 +2193,8 @@ func TestAddPodOrphan(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2219,6 +2223,8 @@ func TestUpdatePod(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2268,6 +2274,8 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2295,6 +2303,8 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2321,6 +2331,8 @@ func TestUpdatePodRelease(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2347,6 +2359,8 @@ func TestDeletePod(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2392,6 +2406,8 @@ func TestDeletePodOrphan(t *testing.T) {
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
jm.podUpdateBatchPeriod = 0
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
job1.Name = "job1"
@ -2703,24 +2719,61 @@ func TestJobBackoff(t *testing.T) {
newPod.ResourceVersion = "2"
testCases := map[string]struct {
// inputs
requeues int
phase v1.PodPhase
// expectation
backoff int
requeues int
phase v1.PodPhase
jobReadyPodsEnabled bool
wantBackoff time.Duration
}{
"1st failure": {0, v1.PodFailed, 0},
"2nd failure": {1, v1.PodFailed, 1},
"3rd failure": {2, v1.PodFailed, 2},
"1st success": {0, v1.PodSucceeded, 0},
"2nd success": {1, v1.PodSucceeded, 0},
"1st running": {0, v1.PodSucceeded, 0},
"2nd running": {1, v1.PodSucceeded, 0},
"1st failure": {
requeues: 0,
phase: v1.PodFailed,
wantBackoff: 0,
},
"2nd failure": {
requeues: 1,
phase: v1.PodFailed,
wantBackoff: DefaultJobBackOff,
},
"3rd failure": {
requeues: 2,
phase: v1.PodFailed,
wantBackoff: 2 * DefaultJobBackOff,
},
"1st success": {
requeues: 0,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"2nd success": {
requeues: 1,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"1st running": {
requeues: 0,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"2nd running": {
requeues: 1,
phase: v1.PodSucceeded,
wantBackoff: 0,
},
"1st failure with pod updates batching": {
requeues: 0,
phase: v1.PodFailed,
wantBackoff: podUpdateBatchPeriod,
},
"2nd failure with pod updates batching": {
requeues: 1,
phase: v1.PodFailed,
wantBackoff: DefaultJobBackOff,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
@ -2735,7 +2788,7 @@ func TestJobBackoff(t *testing.T) {
newPod.Status.Phase = tc.phase
manager.updatePod(oldPod, newPod)
if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() {
if queue.duration.Nanoseconds() != int64(tc.wantBackoff)*DefaultJobBackOff.Nanoseconds() {
t.Errorf("unexpected backoff %v", queue.duration)
}
})

View File

@ -235,6 +235,7 @@ const (
// owner: @alculquicondor
// alpha: v1.23
// beta: v1.24
//
// Track the number of pods with Ready condition in the Job status.
JobReadyPods featuregate.Feature = "JobReadyPods"
@ -926,7 +927,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
TTLAfterFinished: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25
IndexedJob: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26
JobTrackingWithFinalizers: {Default: true, PreRelease: featuregate.Beta},
JobReadyPods: {Default: false, PreRelease: featuregate.Alpha},
JobReadyPods: {Default: true, PreRelease: featuregate.Beta},
KubeletPodResources: {Default: true, PreRelease: featuregate.Beta},
LocalStorageCapacityIsolationFSQuotaMonitoring: {Default: false, PreRelease: featuregate.Alpha},
NonPreemptingPriority: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25

View File

@ -13100,7 +13100,7 @@ func schema_k8sio_api_batch_v1_JobStatus(ref common.ReferenceCallback) common.Op
},
"ready": {
SchemaProps: spec.SchemaProps{
Description: "The number of pods which have a Ready condition.\n\nThis field is alpha-level. The job controller populates the field when the feature gate JobReadyPods is enabled (disabled by default).",
Description: "The number of pods which have a Ready condition.\n\nThis field is beta-level. The job controller populates the field when the feature gate JobReadyPods is enabled (enabled by default).",
Type: []string{"integer"},
Format: "int32",
},

View File

@ -339,8 +339,8 @@ message JobStatus {
// The number of pods which have a Ready condition.
//
// This field is alpha-level. The job controller populates the field when
// the feature gate JobReadyPods is enabled (disabled by default).
// This field is beta-level. The job controller populates the field when
// the feature gate JobReadyPods is enabled (enabled by default).
// +optional
optional int32 ready = 9;
}

View File

@ -267,8 +267,8 @@ type JobStatus struct {
// The number of pods which have a Ready condition.
//
// This field is alpha-level. The job controller populates the field when
// the feature gate JobReadyPods is enabled (disabled by default).
// This field is beta-level. The job controller populates the field when
// the feature gate JobReadyPods is enabled (enabled by default).
// +optional
Ready *int32 `json:"ready,omitempty" protobuf:"varint,9,opt,name=ready"`
}

View File

@ -137,7 +137,7 @@ var map_JobStatus = map[string]string{
"failed": "The number of pods which reached phase Failed.",
"completedIndexes": "CompletedIndexes holds the completed indexes when .spec.completionMode = \"Indexed\" in a text format. The indexes are represented as decimal integers separated by commas. The numbers are listed in increasing order. Three or more consecutive numbers are compressed and represented by the first and last element of the series, separated by a hyphen. For example, if the completed indexes are 1, 3, 4, 5 and 7, they are represented as \"1,3-5,7\".",
"uncountedTerminatedPods": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nThis field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null.",
"ready": "The number of pods which have a Ready condition.\n\nThis field is alpha-level. The job controller populates the field when the feature gate JobReadyPods is enabled (disabled by default).",
"ready": "The number of pods which have a Ready condition.\n\nThis field is beta-level. The job controller populates the field when the feature gate JobReadyPods is enabled (enabled by default).",
}
func (JobStatus) SwaggerDoc() map[string]string {

View File

@ -28,7 +28,7 @@ import (
"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -81,6 +81,7 @@ func TestNonParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
// Restarting controller.
@ -94,6 +95,7 @@ func TestNonParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
// Restarting controller.
@ -108,6 +110,7 @@ func TestNonParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
@ -240,6 +243,7 @@ func TestParallelJobParallelism(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 5,
Ready: pointer.Int32(0),
}, wFinalizers)
// Reduce parallelism by a number greater than backoffLimit.
@ -250,6 +254,7 @@ func TestParallelJobParallelism(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
}, wFinalizers)
// Increase parallelism again.
@ -260,6 +265,7 @@ func TestParallelJobParallelism(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4,
Ready: pointer.Int32(0),
}, wFinalizers)
// Succeed Job
@ -269,6 +275,7 @@ func TestParallelJobParallelism(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Succeeded: 4,
Ready: pointer.Int32(0),
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
@ -403,6 +410,7 @@ func TestIndexedJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Ready: pointer.Int32(0),
}, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
@ -413,6 +421,7 @@ func TestIndexedJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Succeeded: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
@ -424,6 +433,7 @@ func TestIndexedJob(t *testing.T) {
Active: 3,
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
}, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
@ -435,6 +445,7 @@ func TestIndexedJob(t *testing.T) {
Active: 0,
Failed: 1,
Succeeded: 4,
Ready: pointer.Int32(0),
}, false)
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
validateJobSucceeded(ctx, t, clientSet, jobObj)
@ -471,6 +482,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
}, true)
// Step 2: Disable tracking with finalizers.
@ -489,6 +501,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 1,
Ready: pointer.Int32(0),
}, false)
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
@ -516,6 +529,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
Active: 1,
Failed: 1,
Succeeded: 1,
Ready: pointer.Int32(0),
}, false)
}
@ -551,6 +565,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Ready: pointer.Int32(0),
}, true)
// Delete Job. The GC should delete the pods in cascade.
@ -607,6 +622,7 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: pointer.Int32(0),
}, true)
// Step 2: Disable tracking with finalizers.
@ -693,6 +709,7 @@ func TestSuspendJob(t *testing.T) {
validate := func(s string, active int, status v1.ConditionStatus, reason string) {
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: active,
Ready: pointer.Int32(0),
}, true)
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
@ -737,6 +754,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: 0,
Ready: pointer.Int32(0),
}, true)
}