Merge pull request #98676 from ahg-g/ahg-ttl

JobDeletionDurationSeconds metric in TTLAfterFinished controller
This commit is contained in:
Kubernetes Prow Robot 2021-02-12 16:57:05 -08:00 committed by GitHub
commit 002fb9fc5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 21 deletions

View File

@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/job:go_default_library",
"//pkg/controller/ttlafterfinished/metrics:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -52,6 +53,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/controller/ttlafterfinished/config:all-srcs",
"//pkg/controller/ttlafterfinished/metrics:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importpath = "k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,51 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
// TTLAfterFinishedSubsystem - subsystem name used for this controller.
const TTLAfterFinishedSubsystem = "ttl_after_finished_controller"
var (
// JobDeletionDurationSeconds tracks the time it took to delete the job since it
// became eligible for deletion.
JobDeletionDurationSeconds = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: TTLAfterFinishedSubsystem,
Name: "job_deletion_duration_seconds",
Help: "The time it took to delete the job since it became eligible for deletion",
StabilityLevel: metrics.ALPHA,
// Start with 100ms with the last bucket being [~27m, Inf).
Buckets: metrics.ExponentialBuckets(0.1, 2, 14),
},
)
)
var registerMetrics sync.Once
// Register registers TTL after finished controller metrics.
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(JobDeletionDurationSeconds)
})
}

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubectl/pkg/scheme"
"k8s.io/kubernetes/pkg/controller"
jobutil "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics"
)
// Controller watches for changes of Jobs API objects. Triggered by Job creation
@ -79,6 +80,8 @@ func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Co
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
}
metrics.Register()
tc := &Controller{
client: client,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
@ -205,9 +208,9 @@ func (tc *Controller) processJob(key string) error {
return err
}
if expired, err := tc.processTTL(job); err != nil {
if expiredAt, err := tc.processTTL(job); err != nil {
return err
} else if !expired {
} else if expiredAt == nil {
return nil
}
@ -223,9 +226,10 @@ func (tc *Controller) processJob(key string) error {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
if expired, err := tc.processTTL(fresh); err != nil {
expiredAt, err := tc.processTTL(fresh)
if err != nil {
return err
} else if !expired {
} else if expiredAt == nil {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
@ -235,30 +239,34 @@ func (tc *Controller) processJob(key string) error {
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
}
klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
return tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options)
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options); err != nil {
return err
}
metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
return nil
}
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (tc *Controller) processTTL(job *batch.Job) (expired bool, err error) {
func (tc *Controller) processTTL(job *batch.Job) (expiredAt *time.Time, err error) {
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
if job.DeletionTimestamp != nil || !needsCleanup(job) {
return false, nil
return nil, nil
}
now := tc.clock.Now()
t, err := timeLeft(job, &now)
t, e, err := timeLeft(job, &now)
if err != nil {
return false, err
return nil, err
}
// TTL has expired
if *t <= 0 {
return true, nil
return e, nil
}
tc.enqueueAfter(job, *t)
return false, nil
return nil, nil
}
// needsCleanup checks whether a Job has finished and has a TTL set.
@ -270,26 +278,26 @@ func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) {
if !needsCleanup(j) {
return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name)
}
finishAt, err := jobFinishTime(j)
t, err := jobFinishTime(j)
if err != nil {
return nil, nil, err
}
finishAtUTC := finishAt.UTC()
expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
return &finishAtUTC, &expireAtUTC, nil
finishAt := t.Time
expireAt := finishAt.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
return &finishAt, &expireAt, nil
}
func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, error) {
func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) {
finishAt, expireAt, err := getFinishAndExpireTime(j)
if err != nil {
return nil, err
return nil, nil, err
}
if finishAt.UTC().After(since.UTC()) {
if finishAt.After(*since) {
klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
}
remaining := expireAt.UTC().Sub(since.UTC())
remaining := expireAt.Sub(*since)
klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
return &remaining, nil
return &remaining, expireAt, nil
}
// jobFinishTime takes an already finished Job and returns the time it finishes.

View File

@ -87,6 +87,7 @@ func TestTimeLeft(t *testing.T) {
expectErr bool
expectErrStr string
expectedTimeLeft *time.Duration
expectedExpireAt time.Time
}{
{
name: "Error case: Job unfinished",
@ -108,6 +109,7 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(0),
since: &now.Time,
expectedTimeLeft: durationPointer(0),
expectedExpireAt: now.Time,
},
{
name: "Job completed now, 10s TTL",
@ -115,6 +117,7 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(10),
since: &now.Time,
expectedTimeLeft: durationPointer(10),
expectedExpireAt: now.Add(10 * time.Second),
},
{
name: "Job completed 10s ago, 15s TTL",
@ -122,6 +125,7 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(15),
since: &now.Time,
expectedTimeLeft: durationPointer(5),
expectedExpireAt: now.Add(5 * time.Second),
},
{
name: "Error case: Job failed now, no TTL",
@ -136,6 +140,7 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(0),
since: &now.Time,
expectedTimeLeft: durationPointer(0),
expectedExpireAt: now.Time,
},
{
name: "Job failed now, 10s TTL",
@ -143,6 +148,7 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(10),
since: &now.Time,
expectedTimeLeft: durationPointer(10),
expectedExpireAt: now.Add(10 * time.Second),
},
{
name: "Job failed 10s ago, 15s TTL",
@ -150,12 +156,13 @@ func TestTimeLeft(t *testing.T) {
ttl: utilpointer.Int32Ptr(15),
since: &now.Time,
expectedTimeLeft: durationPointer(5),
expectedExpireAt: now.Add(5 * time.Second),
},
}
for _, tc := range testCases {
job := newJob(tc.completionTime, tc.failedTime, tc.ttl)
gotTimeLeft, gotErr := timeLeft(job, tc.since)
gotTimeLeft, gotExpireAt, gotErr := timeLeft(job, tc.since)
if tc.expectErr != (gotErr != nil) {
t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr)
}
@ -169,6 +176,9 @@ func TestTimeLeft(t *testing.T) {
if *gotTimeLeft != *tc.expectedTimeLeft {
t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft)
}
if *gotExpireAt != tc.expectedExpireAt {
t.Errorf("%s: expected expire at %v, got %v", tc.name, tc.expectedExpireAt, *gotExpireAt)
}
}
}
}