diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 56b3e3395b5..432fa16078b 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -563,7 +563,9 @@ func startPVProtectionController(ctx context.Context, controllerContext Controll } func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "ttlafterfinished")) go ttlafterfinished.New( + ctx, controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), ).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs)) diff --git a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go index b3127ba1c49..adb32a1d017 100644 --- a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go +++ b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go @@ -21,10 +21,8 @@ import ( "fmt" "time" - "k8s.io/klog/v2" - batch "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -36,6 +34,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/kubectl/pkg/scheme" "k8s.io/kubernetes/pkg/controller" jobutil "k8s.io/kubernetes/pkg/controller/job" @@ -70,7 +69,7 @@ type Controller struct { } // New creates an instance of Controller -func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller { +func New(ctx context.Context, jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) @@ -83,9 +82,14 @@ func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Co queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"), } + logger := klog.FromContext(ctx) jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: tc.addJob, - UpdateFunc: tc.updateJob, + AddFunc: func(obj interface{}) { + tc.addJob(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + tc.updateJob(logger, oldObj, newObj) + }, }) tc.jLister = jobInformer.Lister() @@ -101,8 +105,9 @@ func (tc *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer tc.queue.ShutDown() - klog.Infof("Starting TTL after finished controller") - defer klog.Infof("Shutting down TTL after finished controller") + logger := klog.FromContext(ctx) + logger.Info("Starting TTL after finished controller") + defer logger.Info("Shutting down TTL after finished controller") if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) { return @@ -115,26 +120,27 @@ func (tc *Controller) Run(ctx context.Context, workers int) { <-ctx.Done() } -func (tc *Controller) addJob(obj interface{}) { +func (tc *Controller) addJob(logger klog.Logger, obj interface{}) { job := obj.(*batch.Job) - klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name) + logger.V(4).Info("Adding job", "job", klog.KObj(job)) if job.DeletionTimestamp == nil && needsCleanup(job) { - tc.enqueue(job) + tc.enqueue(logger, job) } + } -func (tc *Controller) updateJob(old, cur interface{}) { +func (tc *Controller) updateJob(logger klog.Logger, old, cur interface{}) { job := cur.(*batch.Job) - klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name) + logger.V(4).Info("Updating job", "job", klog.KObj(job)) if job.DeletionTimestamp == nil && needsCleanup(job) { - tc.enqueue(job) + tc.enqueue(logger, job) } } -func (tc *Controller) enqueue(job *batch.Job) { - klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name) +func (tc *Controller) enqueue(logger klog.Logger, job *batch.Job) { + logger.V(4).Info("Add job to cleanup", "job", klog.KObj(job)) key, err := controller.KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err)) @@ -193,9 +199,12 @@ func (tc *Controller) processJob(ctx context.Context, key string) error { return err } - klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name) // Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up. job, err := tc.jLister.Jobs(namespace).Get(name) + + logger := klog.FromContext(ctx) + logger.V(4).Info("Checking if Job is ready for cleanup", "job", klog.KRef(namespace, name)) + if errors.IsNotFound(err) { return nil } @@ -203,7 +212,7 @@ func (tc *Controller) processJob(ctx context.Context, key string) error { return err } - if expiredAt, err := tc.processTTL(job); err != nil { + if expiredAt, err := tc.processTTL(logger, job); err != nil { return err } else if expiredAt == nil { return nil @@ -221,7 +230,7 @@ func (tc *Controller) processJob(ctx context.Context, key string) error { return err } // Use the latest Job TTL to see if the TTL truly expires. - expiredAt, err := tc.processTTL(fresh) + expiredAt, err := tc.processTTL(logger, fresh) if err != nil { return err } else if expiredAt == nil { @@ -233,7 +242,7 @@ func (tc *Controller) processJob(ctx context.Context, key string) error { PropagationPolicy: &policy, Preconditions: &metav1.Preconditions{UID: &fresh.UID}, } - klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name) + logger.V(4).Info("Cleaning up Job", "job", klog.KObj(fresh)) if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil { return err } @@ -243,14 +252,15 @@ func (tc *Controller) processJob(ctx context.Context, key string) error { // processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire // if the TTL will expire later. -func (tc *Controller) processTTL(job *batch.Job) (expiredAt *time.Time, err error) { +func (tc *Controller) processTTL(logger klog.Logger, job *batch.Job) (expiredAt *time.Time, err error) { + // We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up. if job.DeletionTimestamp != nil || !needsCleanup(job) { return nil, nil } now := tc.clock.Now() - t, e, err := timeLeft(job, &now) + t, e, err := timeLeft(logger, job, &now) if err != nil { return nil, err } @@ -282,16 +292,17 @@ func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) { return &finishAt, &expireAt, nil } -func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) { +func timeLeft(logger klog.Logger, j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) { finishAt, expireAt, err := getFinishAndExpireTime(j) if err != nil { return nil, nil, err } + if finishAt.After(*since) { - klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name) + logger.Info("Warning: Found Job finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", "job", klog.KObj(j)) } remaining := expireAt.Sub(*since) - klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC()) + logger.V(4).Info("Found Job finished", "job", klog.KObj(j), "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", since.UTC(), "deadlineTTL", expireAt.UTC()) return &remaining, expireAt, nil } diff --git a/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go b/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go index d28357e81ef..0bb3e887eb3 100644 --- a/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go +++ b/pkg/controller/ttlafterfinished/ttlafterfinished_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package ttlafterfinished import ( + "k8s.io/klog/v2" "strings" "testing" "time" @@ -24,6 +25,7 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2/ktesting" "k8s.io/utils/pointer" ) @@ -161,8 +163,11 @@ func TestTimeLeft(t *testing.T) { } for _, tc := range testCases { + job := newJob(tc.completionTime, tc.failedTime, tc.ttl) - gotTimeLeft, gotExpireAt, gotErr := timeLeft(job, tc.since) + _, ctx := ktesting.NewTestContext(t) + logger := klog.FromContext(ctx) + gotTimeLeft, gotExpireAt, gotErr := timeLeft(logger, job, tc.since) if tc.expectErr != (gotErr != nil) { t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr) }