From 181e2173cf831ac8ed7f5597fa6b6303f2954b7d Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Mon, 1 Feb 2021 19:33:14 -0500 Subject: [PATCH] Added time to deletion metric to ttl-after-finidhed controller --- pkg/controller/ttlafterfinished/BUILD | 2 + pkg/controller/ttlafterfinished/metrics/BUILD | 26 ++++++++++ .../ttlafterfinished/metrics/metrics.go | 51 +++++++++++++++++++ .../ttlafterfinished_controller.go | 48 +++++++++-------- .../ttlafterfinished_controller_test.go | 12 ++++- 5 files changed, 118 insertions(+), 21 deletions(-) create mode 100644 pkg/controller/ttlafterfinished/metrics/BUILD create mode 100644 pkg/controller/ttlafterfinished/metrics/metrics.go diff --git a/pkg/controller/ttlafterfinished/BUILD b/pkg/controller/ttlafterfinished/BUILD index d75d878cf81..a4623abdf79 100644 --- a/pkg/controller/ttlafterfinished/BUILD +++ b/pkg/controller/ttlafterfinished/BUILD @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/controller:go_default_library", "//pkg/controller/job:go_default_library", + "//pkg/controller/ttlafterfinished/metrics:go_default_library", "//staging/src/k8s.io/api/batch/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -52,6 +53,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/controller/ttlafterfinished/config:all-srcs", + "//pkg/controller/ttlafterfinished/metrics:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/controller/ttlafterfinished/metrics/BUILD b/pkg/controller/ttlafterfinished/metrics/BUILD new file mode 100644 index 00000000000..6f070032c6f --- /dev/null +++ b/pkg/controller/ttlafterfinished/metrics/BUILD @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["metrics.go"], + importpath = "k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/component-base/metrics:go_default_library", + "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/ttlafterfinished/metrics/metrics.go b/pkg/controller/ttlafterfinished/metrics/metrics.go new file mode 100644 index 00000000000..a13dec53387 --- /dev/null +++ b/pkg/controller/ttlafterfinished/metrics/metrics.go @@ -0,0 +1,51 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +// TTLAfterFinishedSubsystem - subsystem name used for this controller. +const TTLAfterFinishedSubsystem = "ttl_after_finished_controller" + +var ( + // JobDeletionDurationSeconds tracks the time it took to delete the job since it + // became eligible for deletion. + JobDeletionDurationSeconds = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: TTLAfterFinishedSubsystem, + Name: "job_deletion_duration_seconds", + Help: "The time it took to delete the job since it became eligible for deletion", + StabilityLevel: metrics.ALPHA, + // Start with 100ms with the last bucket being [~27m, Inf). + Buckets: metrics.ExponentialBuckets(0.1, 2, 14), + }, + ) +) + +var registerMetrics sync.Once + +// Register registers TTL after finished controller metrics. +func Register() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(JobDeletionDurationSeconds) + }) +} diff --git a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go index 656884cbf01..39e5091cda4 100644 --- a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go +++ b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubectl/pkg/scheme" "k8s.io/kubernetes/pkg/controller" jobutil "k8s.io/kubernetes/pkg/controller/job" + "k8s.io/kubernetes/pkg/controller/ttlafterfinished/metrics" ) // Controller watches for changes of Jobs API objects. Triggered by Job creation @@ -79,6 +80,8 @@ func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Co ratelimiter.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter()) } + metrics.Register() + tc := &Controller{ client: client, recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}), @@ -205,9 +208,9 @@ func (tc *Controller) processJob(key string) error { return err } - if expired, err := tc.processTTL(job); err != nil { + if expiredAt, err := tc.processTTL(job); err != nil { return err - } else if !expired { + } else if expiredAt == nil { return nil } @@ -223,9 +226,10 @@ func (tc *Controller) processJob(key string) error { return err } // Use the latest Job TTL to see if the TTL truly expires. - if expired, err := tc.processTTL(fresh); err != nil { + expiredAt, err := tc.processTTL(fresh) + if err != nil { return err - } else if !expired { + } else if expiredAt == nil { return nil } // Cascade deletes the Jobs if TTL truly expires. @@ -235,30 +239,34 @@ func (tc *Controller) processJob(key string) error { Preconditions: &metav1.Preconditions{UID: &fresh.UID}, } klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name) - return tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options) + if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(context.TODO(), fresh.Name, options); err != nil { + return err + } + metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds()) + return nil } // processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire // if the TTL will expire later. -func (tc *Controller) processTTL(job *batch.Job) (expired bool, err error) { +func (tc *Controller) processTTL(job *batch.Job) (expiredAt *time.Time, err error) { // We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up. if job.DeletionTimestamp != nil || !needsCleanup(job) { - return false, nil + return nil, nil } now := tc.clock.Now() - t, err := timeLeft(job, &now) + t, e, err := timeLeft(job, &now) if err != nil { - return false, err + return nil, err } // TTL has expired if *t <= 0 { - return true, nil + return e, nil } tc.enqueueAfter(job, *t) - return false, nil + return nil, nil } // needsCleanup checks whether a Job has finished and has a TTL set. @@ -270,26 +278,26 @@ func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) { if !needsCleanup(j) { return nil, nil, fmt.Errorf("job %s/%s should not be cleaned up", j.Namespace, j.Name) } - finishAt, err := jobFinishTime(j) + t, err := jobFinishTime(j) if err != nil { return nil, nil, err } - finishAtUTC := finishAt.UTC() - expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second) - return &finishAtUTC, &expireAtUTC, nil + finishAt := t.Time + expireAt := finishAt.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second) + return &finishAt, &expireAt, nil } -func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, error) { +func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) { finishAt, expireAt, err := getFinishAndExpireTime(j) if err != nil { - return nil, err + return nil, nil, err } - if finishAt.UTC().After(since.UTC()) { + if finishAt.After(*since) { klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name) } - remaining := expireAt.UTC().Sub(since.UTC()) + remaining := expireAt.Sub(*since) klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC()) - return &remaining, nil + return &remaining, expireAt, nil } // jobFinishTime takes an already finished Job and returns the time it finishes. diff --git a/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go b/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go index e4847322dfa..924a3988f40 100644 --- a/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go +++ b/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go @@ -87,6 +87,7 @@ func TestTimeLeft(t *testing.T) { expectErr bool expectErrStr string expectedTimeLeft *time.Duration + expectedExpireAt time.Time }{ { name: "Error case: Job unfinished", @@ -108,6 +109,7 @@ func TestTimeLeft(t *testing.T) { ttl: utilpointer.Int32Ptr(0), since: &now.Time, expectedTimeLeft: durationPointer(0), + expectedExpireAt: now.Time, }, { name: "Job completed now, 10s TTL", @@ -115,6 +117,7 @@ func TestTimeLeft(t *testing.T) { ttl: utilpointer.Int32Ptr(10), since: &now.Time, expectedTimeLeft: durationPointer(10), + expectedExpireAt: now.Add(10 * time.Second), }, { name: "Job completed 10s ago, 15s TTL", @@ -122,6 +125,7 @@ func TestTimeLeft(t *testing.T) { ttl: utilpointer.Int32Ptr(15), since: &now.Time, expectedTimeLeft: durationPointer(5), + expectedExpireAt: now.Add(5 * time.Second), }, { name: "Error case: Job failed now, no TTL", @@ -136,6 +140,7 @@ func TestTimeLeft(t *testing.T) { ttl: utilpointer.Int32Ptr(0), since: &now.Time, expectedTimeLeft: durationPointer(0), + expectedExpireAt: now.Time, }, { name: "Job failed now, 10s TTL", @@ -143,6 +148,7 @@ func TestTimeLeft(t *testing.T) { ttl: utilpointer.Int32Ptr(10), since: &now.Time, expectedTimeLeft: durationPointer(10), + expectedExpireAt: now.Add(10 * time.Second), }, { name: "Job failed 10s ago, 15s TTL", @@ -150,12 +156,13 @@ func TestTimeLeft(t *testing.T) { ttl: utilpointer.Int32Ptr(15), since: &now.Time, expectedTimeLeft: durationPointer(5), + expectedExpireAt: now.Add(5 * time.Second), }, } for _, tc := range testCases { job := newJob(tc.completionTime, tc.failedTime, tc.ttl) - gotTimeLeft, gotErr := timeLeft(job, tc.since) + gotTimeLeft, gotExpireAt, gotErr := timeLeft(job, tc.since) if tc.expectErr != (gotErr != nil) { t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr) } @@ -169,6 +176,9 @@ func TestTimeLeft(t *testing.T) { if *gotTimeLeft != *tc.expectedTimeLeft { t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft) } + if *gotExpireAt != tc.expectedExpireAt { + t.Errorf("%s: expected expire at %v, got %v", tc.name, tc.expectedExpireAt, *gotExpireAt) + } } } }