mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
job controller: refactored job controller to be able to inject FakeClock for Unit Test
This commit is contained in:
parent
05a0ce83a6
commit
208c3868cf
@ -52,6 +52,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
"k8s.io/utils/clock"
|
||||||
"k8s.io/utils/integer"
|
"k8s.io/utils/integer"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
@ -123,11 +124,17 @@ type Controller struct {
|
|||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
|
|
||||||
podUpdateBatchPeriod time.Duration
|
podUpdateBatchPeriod time.Duration
|
||||||
|
|
||||||
|
clock clock.WithTicker
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController creates a new Job controller that keeps the relevant pods
|
// NewController creates a new Job controller that keeps the relevant pods
|
||||||
// in sync with their corresponding Job objects.
|
// in sync with their corresponding Job objects.
|
||||||
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
|
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
|
||||||
|
return newControllerWithClock(podInformer, jobInformer, kubeClient, &clock.RealClock{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
|
||||||
jm := &Controller{
|
jm := &Controller{
|
||||||
@ -138,10 +145,11 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
|
|||||||
},
|
},
|
||||||
expectations: controller.NewControllerExpectations(),
|
expectations: controller.NewControllerExpectations(),
|
||||||
finalizerExpectations: newUIDTrackingExpectations(),
|
finalizerExpectations: newUIDTrackingExpectations(),
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
|
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)),
|
||||||
orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
|
orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)),
|
||||||
broadcaster: eventBroadcaster,
|
broadcaster: eventBroadcaster,
|
||||||
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
|
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
|
||||||
|
clock: clock,
|
||||||
}
|
}
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
|
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
|
||||||
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
|
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
|
||||||
@ -436,9 +444,7 @@ func (jm *Controller) updateJob(old, cur interface{}) {
|
|||||||
}
|
}
|
||||||
oldADS := oldJob.Spec.ActiveDeadlineSeconds
|
oldADS := oldJob.Spec.ActiveDeadlineSeconds
|
||||||
if oldADS == nil || *oldADS != *curADS {
|
if oldADS == nil || *oldADS != *curADS {
|
||||||
now := metav1.Now()
|
passed := jm.clock.Since(curJob.Status.StartTime.Time)
|
||||||
start := curJob.Status.StartTime.Time
|
|
||||||
passed := now.Time.Sub(start)
|
|
||||||
total := time.Duration(*curADS) * time.Second
|
total := time.Duration(*curADS) * time.Second
|
||||||
// AddAfter will handle total < passed
|
// AddAfter will handle total < passed
|
||||||
jm.queue.AddAfter(key, total-passed)
|
jm.queue.AddAfter(key, total-passed)
|
||||||
@ -572,9 +578,9 @@ func (jm Controller) processNextOrphanPod(ctx context.Context) bool {
|
|||||||
|
|
||||||
// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
|
// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
|
||||||
func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
|
func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
|
||||||
startTime := time.Now()
|
startTime := jm.clock.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, time.Since(startTime))
|
klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, jm.clock.Since(startTime))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
@ -664,9 +670,9 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinal
|
|||||||
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
||||||
// concurrently with the same key.
|
// concurrently with the same key.
|
||||||
func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rErr error) {
|
func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rErr error) {
|
||||||
startTime := time.Now()
|
startTime := jm.clock.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
|
klog.V(4).Infof("Finished syncing job %q (%v)", key, jm.clock.Since(startTime))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
@ -708,7 +714,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
result = "error"
|
result = "error"
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())
|
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(jm.clock.Since(startTime).Seconds())
|
||||||
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
|
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -743,7 +749,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
|
|
||||||
// Job first start. Set StartTime only if the job is not in the suspended state.
|
// Job first start. Set StartTime only if the job is not in the suspended state.
|
||||||
if job.Status.StartTime == nil && !jobSuspended(&job) {
|
if job.Status.StartTime == nil && !jobSuspended(&job) {
|
||||||
now := metav1.Now()
|
now := metav1.NewTime(jm.clock.Now())
|
||||||
job.Status.StartTime = &now
|
job.Status.StartTime = &now
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -759,14 +765,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
|
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
||||||
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
|
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
|
||||||
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition)
|
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
|
||||||
} else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil {
|
} else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil {
|
||||||
if uncounted != nil {
|
if uncounted != nil {
|
||||||
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
|
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
|
||||||
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage)
|
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
|
||||||
} else {
|
} else {
|
||||||
// Prepare the Failed job condition for the legacy path without finalizers (don't use the interim FailureTarget condition).
|
// Prepare the Failed job condition for the legacy path without finalizers (don't use the interim FailureTarget condition).
|
||||||
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage)
|
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -774,11 +780,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
|
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
|
||||||
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
|
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
|
||||||
// OR if the number of failed jobs increased since the last syncJob
|
// OR if the number of failed jobs increased since the last syncJob
|
||||||
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit")
|
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
|
||||||
} else if pastActiveDeadline(&job) {
|
} else if jm.pastActiveDeadline(&job) {
|
||||||
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")
|
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
|
||||||
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
|
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
|
||||||
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(job.Status.StartTime.Time)
|
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
|
||||||
klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration)
|
klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration)
|
||||||
jm.queue.AddAfter(key, syncDuration)
|
jm.queue.AddAfter(key, syncDuration)
|
||||||
}
|
}
|
||||||
@ -827,7 +833,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
complete = succeeded >= *job.Spec.Completions && active == 0
|
complete = succeeded >= *job.Spec.Completions && active == 0
|
||||||
}
|
}
|
||||||
if complete {
|
if complete {
|
||||||
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
|
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
|
||||||
} else if manageJobCalled {
|
} else if manageJobCalled {
|
||||||
// Update the conditions / emit events only if manageJob was called in
|
// Update the conditions / emit events only if manageJob was called in
|
||||||
// this syncJob. Otherwise wait for the right syncJob call to make
|
// this syncJob. Otherwise wait for the right syncJob call to make
|
||||||
@ -835,7 +841,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
if job.Spec.Suspend != nil && *job.Spec.Suspend {
|
if job.Spec.Suspend != nil && *job.Spec.Suspend {
|
||||||
// Job can be in the suspended state only if it is NOT completed.
|
// Job can be in the suspended state only if it is NOT completed.
|
||||||
var isUpdated bool
|
var isUpdated bool
|
||||||
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
|
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", jm.clock.Now())
|
||||||
if isUpdated {
|
if isUpdated {
|
||||||
suspendCondChanged = true
|
suspendCondChanged = true
|
||||||
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
|
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
|
||||||
@ -843,7 +849,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
} else {
|
} else {
|
||||||
// Job not suspended.
|
// Job not suspended.
|
||||||
var isUpdated bool
|
var isUpdated bool
|
||||||
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
|
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed", jm.clock.Now())
|
||||||
if isUpdated {
|
if isUpdated {
|
||||||
suspendCondChanged = true
|
suspendCondChanged = true
|
||||||
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
|
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
|
||||||
@ -853,7 +859,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
// consistent with resuming a Job created in the suspended state.
|
// consistent with resuming a Job created in the suspended state.
|
||||||
// (ActiveDeadlineSeconds is interpreted as the number of seconds a
|
// (ActiveDeadlineSeconds is interpreted as the number of seconds a
|
||||||
// Job is continuously active.)
|
// Job is continuously active.)
|
||||||
now := metav1.Now()
|
now := metav1.NewTime(jm.clock.Now())
|
||||||
job.Status.StartTime = &now
|
job.Status.StartTime = &now
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1109,7 +1115,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
|
|||||||
|
|
||||||
// Prepare the final Failed condition to update the job status with after the finalizers are removed.
|
// Prepare the final Failed condition to update the job status with after the finalizers are removed.
|
||||||
// It is also used in the enactJobFinished function for reporting.
|
// It is also used in the enactJobFinished function for reporting.
|
||||||
finishedCond = newFailedConditionForFailureTarget(finishedCond)
|
finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
@ -1261,7 +1267,7 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message)
|
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message, jm.clock.Now())
|
||||||
if finishedCond.Type == batch.JobComplete {
|
if finishedCond.Type == batch.JobComplete {
|
||||||
job.Status.CompletionTime = &finishedCond.LastTransitionTime
|
job.Status.CompletionTime = &finishedCond.LastTransitionTime
|
||||||
}
|
}
|
||||||
@ -1296,8 +1302,8 @@ func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.U
|
|||||||
|
|
||||||
// newFailedConditionForFailureTarget creates a job Failed condition based on
|
// newFailedConditionForFailureTarget creates a job Failed condition based on
|
||||||
// the interim FailureTarget condition.
|
// the interim FailureTarget condition.
|
||||||
func newFailedConditionForFailureTarget(condition *batch.JobCondition) *batch.JobCondition {
|
func newFailedConditionForFailureTarget(condition *batch.JobCondition, now time.Time) *batch.JobCondition {
|
||||||
return newCondition(batch.JobFailed, v1.ConditionTrue, condition.Reason, condition.Message)
|
return newCondition(batch.JobFailed, v1.ConditionTrue, condition.Reason, condition.Message, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
|
// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
|
||||||
@ -1329,23 +1335,21 @@ func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
|
|||||||
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if
|
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if
|
||||||
// it is exceeded. If the job is currently suspended, the function will always
|
// it is exceeded. If the job is currently suspended, the function will always
|
||||||
// return false.
|
// return false.
|
||||||
func pastActiveDeadline(job *batch.Job) bool {
|
func (jm *Controller) pastActiveDeadline(job *batch.Job) bool {
|
||||||
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil || jobSuspended(job) {
|
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil || jobSuspended(job) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
now := metav1.Now()
|
duration := jm.clock.Since(job.Status.StartTime.Time)
|
||||||
start := job.Status.StartTime.Time
|
|
||||||
duration := now.Time.Sub(start)
|
|
||||||
allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
|
allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
|
||||||
return duration >= allowedDuration
|
return duration >= allowedDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string) *batch.JobCondition {
|
func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) *batch.JobCondition {
|
||||||
return &batch.JobCondition{
|
return &batch.JobCondition{
|
||||||
Type: conditionType,
|
Type: conditionType,
|
||||||
Status: status,
|
Status: status,
|
||||||
LastProbeTime: metav1.Now(),
|
LastProbeTime: metav1.NewTime(now),
|
||||||
LastTransitionTime: metav1.Now(),
|
LastTransitionTime: metav1.NewTime(now),
|
||||||
Reason: reason,
|
Reason: reason,
|
||||||
Message: message,
|
Message: message,
|
||||||
}
|
}
|
||||||
@ -1720,17 +1724,17 @@ func errorFromChannel(errCh <-chan error) error {
|
|||||||
// (because going from nothing to false is meaningless); it can, however,
|
// (because going from nothing to false is meaningless); it can, however,
|
||||||
// update the status condition to false. The function returns a bool to let the
|
// update the status condition to false. The function returns a bool to let the
|
||||||
// caller know if the list was changed (either appended or updated).
|
// caller know if the list was changed (either appended or updated).
|
||||||
func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string) ([]batch.JobCondition, bool) {
|
func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string, now time.Time) ([]batch.JobCondition, bool) {
|
||||||
if condition := findConditionByType(list, cType); condition != nil {
|
if condition := findConditionByType(list, cType); condition != nil {
|
||||||
if condition.Status != status || condition.Reason != reason || condition.Message != message {
|
if condition.Status != status || condition.Reason != reason || condition.Message != message {
|
||||||
*condition = *newCondition(cType, status, reason, message)
|
*condition = *newCondition(cType, status, reason, message, now)
|
||||||
return list, true
|
return list, true
|
||||||
}
|
}
|
||||||
return list, false
|
return list, false
|
||||||
}
|
}
|
||||||
// A condition with that type doesn't exist in the list.
|
// A condition with that type doesn't exist in the list.
|
||||||
if status != v1.ConditionFalse {
|
if status != v1.ConditionFalse {
|
||||||
return append(list, *newCondition(cType, status, reason, message)), true
|
return append(list, *newCondition(cType, status, reason, message, now)), true
|
||||||
}
|
}
|
||||||
return list, false
|
return list, false
|
||||||
}
|
}
|
||||||
|
@ -54,9 +54,12 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
||||||
"k8s.io/kubernetes/pkg/controller/testutil"
|
"k8s.io/kubernetes/pkg/controller/testutil"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
"k8s.io/utils/clock"
|
||||||
|
clocktesting "k8s.io/utils/clock/testing"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var realClock = &clock.RealClock{}
|
||||||
var alwaysReady = func() bool { return true }
|
var alwaysReady = func() bool { return true }
|
||||||
|
|
||||||
func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job {
|
func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job {
|
||||||
@ -110,10 +113,13 @@ func newJob(parallelism, completions, backoffLimit int32, completionMode batch.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) {
|
func newControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) {
|
||||||
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
|
return newControllerFromClientWithClock(kubeClient, resyncPeriod, realClock)
|
||||||
jm := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient)
|
}
|
||||||
jm.podControl = &controller.FakePodControl{}
|
|
||||||
|
|
||||||
|
func newControllerFromClientWithClock(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) {
|
||||||
|
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
|
||||||
|
jm := newControllerWithClock(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock)
|
||||||
|
jm.podControl = &controller.FakePodControl{}
|
||||||
return jm, sharedInformers
|
return jm, sharedInformers
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -750,7 +756,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation))
|
manager.expectations.ExpectCreations(key, int(tc.fakeExpectationAtCreation))
|
||||||
}
|
}
|
||||||
if tc.wasSuspended {
|
if tc.wasSuspended {
|
||||||
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended"))
|
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now()))
|
||||||
}
|
}
|
||||||
if wFinalizers {
|
if wFinalizers {
|
||||||
job.Annotations = map[string]string{
|
job.Annotations = map[string]string{
|
||||||
@ -1144,8 +1150,8 @@ func TestGetStatus(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
||||||
succeededCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
|
succeededCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now())
|
||||||
failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "")
|
failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", realClock.Now())
|
||||||
indexedCompletion := batch.IndexedCompletion
|
indexedCompletion := batch.IndexedCompletion
|
||||||
mockErr := errors.New("mock error")
|
mockErr := errors.New("mock error")
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
@ -1825,10 +1831,10 @@ func hasTrueCondition(job *batch.Job) *batch.JobConditionType {
|
|||||||
|
|
||||||
func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
||||||
clientset := fake.NewSimpleClientset()
|
clientset := fake.NewSimpleClientset()
|
||||||
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second))
|
||||||
|
manager, sharedInformerFactory := newControllerFromClientWithClock(clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
manager.jobStoreSynced = alwaysReady
|
manager.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
sharedInformerFactory.Start(ctx.Done())
|
sharedInformerFactory.Start(ctx.Done())
|
||||||
@ -1855,10 +1861,9 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
job := newJobWithName(tc.jobName, 1, 1, 6, batch.NonIndexedCompletion)
|
job := newJobWithName(tc.jobName, 1, 1, 6, batch.NonIndexedCompletion)
|
||||||
activeDeadlineSeconds := int64(1)
|
job.Spec.ActiveDeadlineSeconds = pointer.Int64(1)
|
||||||
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
|
|
||||||
if tc.setStartTime {
|
if tc.setStartTime {
|
||||||
start := metav1.Unix(metav1.Now().Time.Unix()-1, 0)
|
start := metav1.NewTime(fakeClock.Now().Add(-time.Second))
|
||||||
job.Status.StartTime = &start
|
job.Status.StartTime = &start
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1870,7 +1875,25 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
|||||||
if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil {
|
if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil {
|
||||||
t.Fatalf("Failed to insert job in index: %v", err)
|
t.Fatalf("Failed to insert job in index: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var j *batch.Job
|
var j *batch.Job
|
||||||
|
err = wait.PollImmediate(200*time.Microsecond, 3*time.Second, func() (done bool, err error) {
|
||||||
|
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return j.Status.StartTime != nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Job failed to ensure that start time was set: %v", err)
|
||||||
|
}
|
||||||
|
for _, c := range j.Status.Conditions {
|
||||||
|
if c.Reason == "DeadlineExceeded" {
|
||||||
|
t.Errorf("Job contains DeadlineExceeded condition earlier than expected")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
manager.clock.Sleep(time.Second)
|
||||||
err = wait.Poll(200*time.Millisecond, 3*time.Second, func() (done bool, err error) {
|
err = wait.Poll(200*time.Millisecond, 3*time.Second, func() (done bool, err error) {
|
||||||
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1902,11 +1925,10 @@ func TestSingleJobFailedCondition(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
job := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
activeDeadlineSeconds := int64(10)
|
job.Spec.ActiveDeadlineSeconds = pointer.Int64(10)
|
||||||
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
|
|
||||||
start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
|
start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
|
||||||
job.Status.StartTime = &start
|
job.Status.StartTime = &start
|
||||||
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline", realClock.Now()))
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
|
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1940,7 +1962,7 @@ func TestSyncJobComplete(t *testing.T) {
|
|||||||
manager.jobStoreSynced = alwaysReady
|
manager.jobStoreSynced = alwaysReady
|
||||||
|
|
||||||
job := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", ""))
|
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now()))
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
|
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -4224,7 +4246,7 @@ func TestEnsureJobConditions(t *testing.T) {
|
|||||||
wantType: batch.JobSuspended,
|
wantType: batch.JobSuspended,
|
||||||
wantStatus: v1.ConditionTrue,
|
wantStatus: v1.ConditionTrue,
|
||||||
wantReason: "foo",
|
wantReason: "foo",
|
||||||
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "")},
|
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
|
||||||
expectUpdate: true,
|
expectUpdate: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -4238,44 +4260,44 @@ func TestEnsureJobConditions(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "update true condition reason",
|
name: "update true condition reason",
|
||||||
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "")},
|
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
|
||||||
wantType: batch.JobSuspended,
|
wantType: batch.JobSuspended,
|
||||||
wantStatus: v1.ConditionTrue,
|
wantStatus: v1.ConditionTrue,
|
||||||
wantReason: "bar",
|
wantReason: "bar",
|
||||||
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "bar", "")},
|
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "bar", "", realClock.Now())},
|
||||||
expectUpdate: true,
|
expectUpdate: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "update true condition status",
|
name: "update true condition status",
|
||||||
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "")},
|
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
|
||||||
wantType: batch.JobSuspended,
|
wantType: batch.JobSuspended,
|
||||||
wantStatus: v1.ConditionFalse,
|
wantStatus: v1.ConditionFalse,
|
||||||
wantReason: "foo",
|
wantReason: "foo",
|
||||||
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "")},
|
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())},
|
||||||
expectUpdate: true,
|
expectUpdate: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "update false condition status",
|
name: "update false condition status",
|
||||||
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "")},
|
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())},
|
||||||
wantType: batch.JobSuspended,
|
wantType: batch.JobSuspended,
|
||||||
wantStatus: v1.ConditionTrue,
|
wantStatus: v1.ConditionTrue,
|
||||||
wantReason: "foo",
|
wantReason: "foo",
|
||||||
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "")},
|
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
|
||||||
expectUpdate: true,
|
expectUpdate: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "condition already exists",
|
name: "condition already exists",
|
||||||
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "")},
|
haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
|
||||||
wantType: batch.JobSuspended,
|
wantType: batch.JobSuspended,
|
||||||
wantStatus: v1.ConditionTrue,
|
wantStatus: v1.ConditionTrue,
|
||||||
wantReason: "foo",
|
wantReason: "foo",
|
||||||
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "")},
|
expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
|
||||||
expectUpdate: false,
|
expectUpdate: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
gotList, isUpdated := ensureJobConditionStatus(tc.haveList, tc.wantType, tc.wantStatus, tc.wantReason, "")
|
gotList, isUpdated := ensureJobConditionStatus(tc.haveList, tc.wantType, tc.wantStatus, tc.wantReason, "", realClock.Now())
|
||||||
if isUpdated != tc.expectUpdate {
|
if isUpdated != tc.expectUpdate {
|
||||||
t.Errorf("Got isUpdated=%v, want %v", isUpdated, tc.expectUpdate)
|
t.Errorf("Got isUpdated=%v, want %v", isUpdated, tc.expectUpdate)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user