graduate SuspendJob to beta

Also adds a label to two existing Job metrics.

Signed-off-by: Adhityaa Chandrasekar <adtac@google.com>
This commit is contained in:
Adhityaa Chandrasekar
2021-04-28 15:38:46 +00:00
parent eee6e54ecf
commit ba708e5fc9
10 changed files with 85 additions and 66 deletions

View File

@@ -488,6 +488,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
if isIndexedJob(&job) {
completionMode = string(batch.IndexedCompletion)
}
action := metrics.JobSyncActionReconciling
defer func() {
result := "success"
@@ -495,8 +496,8 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
result = "error"
}
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result).Observe(time.Since(startTime).Seconds())
metrics.JobSyncNum.WithLabelValues(completionMode, result).Inc()
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}()
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
@@ -568,7 +569,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc()
} else {
if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods)
active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods)
manageJobCalled = true
}
completions := succeeded
@@ -762,13 +763,13 @@ func jobSuspended(job *batch.Job) bool {
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>.
func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, error) {
func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, string, error) {
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return 0, nil
return 0, metrics.JobSyncActionTracking, nil
}
if jobSuspended(job) {
@@ -777,7 +778,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
active -= removed
return active, err
return active, metrics.JobSyncActionPodsDeleted, err
}
wantActive := int32(0)
@@ -812,21 +813,15 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism)
removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
active -= removed
if err != nil {
return active, err
}
// While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
// restrict ourselves to either just pod deletion or pod creation in any
// given sync cycle. Of these two, pod deletion takes precedence.
return active, metrics.JobSyncActionPodsDeleted, err
}
if active < wantActive {
diff := wantActive - active
if diff < 0 {
utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
diff = 0
}
if diff == 0 {
return active, nil
}
if diff > int32(maxPodCreateDeletePerSync) {
diff = int32(maxPodCreateDeletePerSync)
}
@@ -909,12 +904,10 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
}
diff -= batchSize
}
if err := errorFromChannel(errCh); err != nil {
return active, err
}
return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
}
return active, nil
return active, metrics.JobSyncActionTracking, nil
}
// activePodsForRemoval returns Pods that should be removed because there

View File

@@ -555,14 +555,13 @@ func TestControllerSyncJob(t *testing.T) {
{"2", v1.PodRunning},
{"2", v1.PodPending},
},
jobKeyForget: true,
expectedCreations: 2,
expectedDeletions: 2,
expectedActive: 4,
expectedSucceeded: 1,
expectedCompletedIdxs: "0",
expectedCreatedIndexes: sets.NewInt(3, 4),
indexedJobEnabled: true,
jobKeyForget: true,
expectedCreations: 0,
expectedDeletions: 2,
expectedActive: 2,
expectedSucceeded: 1,
expectedCompletedIdxs: "0",
indexedJobEnabled: true,
},
"indexed job with indexes outside of range": {
parallelism: 2,
@@ -576,15 +575,14 @@ func TestControllerSyncJob(t *testing.T) {
{"7", v1.PodPending},
{"8", v1.PodFailed},
},
jobKeyForget: true,
expectedCreations: 2,
expectedSucceeded: 1,
expectedDeletions: 2,
expectedCompletedIdxs: "0",
expectedCreatedIndexes: sets.NewInt(1, 2),
expectedActive: 2,
expectedFailed: 0,
indexedJobEnabled: true,
jobKeyForget: true,
expectedCreations: 0, // only one of creations and deletions can happen in a sync
expectedSucceeded: 1,
expectedDeletions: 2,
expectedCompletedIdxs: "0",
expectedActive: 0,
expectedFailed: 0,
indexedJobEnabled: true,
},
"indexed job feature disabled": {
parallelism: 2,

View File

@@ -27,8 +27,11 @@ import (
const JobControllerSubsystem = "job_controller"
var (
// JobSyncDurationSeconds tracks the latency of job syncs as
// completion_mode = Indexed / NonIndexed and result = success / error.
// JobSyncDurationSeconds tracks the latency of Job syncs. Possible label
// values:
// completion_mode: Indexed, NonIndexed
// result: success, error
// action: reconciling, tracking, pods_created, pods_deleted
JobSyncDurationSeconds = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: JobControllerSubsystem,
@@ -37,10 +40,12 @@ var (
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
},
[]string{"completion_mode", "result"},
[]string{"completion_mode", "result", "action"},
)
// JobSyncNum tracks the number of job syncs as
// completion_mode = Indexed / NonIndexed and result = success / error.
// JobSyncNum tracks the number of Job syncs. Possible label values:
// completion_mode: Indexed, NonIndexed
// result: success, error
// action: reconciling, tracking, pods_created, pods_deleted
JobSyncNum = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
@@ -48,10 +53,12 @@ var (
Help: "The number of job syncs",
StabilityLevel: metrics.ALPHA,
},
[]string{"completion_mode", "result"},
[]string{"completion_mode", "result", "action"},
)
// JobFinishedNum tracks the number of jobs that finish as
// completion_mode = Indexed / NonIndexed and result = failed / succeeded.
// JobFinishedNum tracks the number of Jobs that finish. Possible label
// values:
// completion_mode: Indexed, NonIndexed
// result: failed, succeeded
JobFinishedNum = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
@@ -63,6 +70,26 @@ var (
)
)
// Possible values for the "action" label in the above metrics.
const (
// JobSyncActionReconciling when the Job's pod creation/deletion expectations
// are unsatisfied and the controller is waiting for issued Pod
// creation/deletions to complete.
JobSyncActionReconciling = "reconciling"
// JobSyncActionTracking when the Job's pod creation/deletion expectations
// are satisfied and the number of active Pods matches expectations (i.e. no
// pod creation/deletions issued in this sync). This is expected to be the
// action in most of the syncs.
JobSyncActionTracking = "tracking"
// JobSyncActionPodsCreated when the controller creates Pods. This can happen
// when the number of active Pods is less than the wanted Job parallelism.
JobSyncActionPodsCreated = "pods_created"
// JobSyncActionPodsDeleted when the controller deletes Pods. This can happen
// if a Job is suspended or if the number of active Pods is more than
// parallelism.
JobSyncActionPodsDeleted = "pods_deleted"
)
var registerMetrics sync.Once
// Register registers Job controller metrics.