mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #115332 from obaranov1/ttlafterfinished-logging-migration
Migrate /pkg/controller/ttlafterfinished to structured and contextual logging
This commit is contained in:
commit
637bd66165
@ -565,7 +565,9 @@ func startPVProtectionController(ctx context.Context, controllerContext Controll
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "ttlafterfinished"))
|
||||||
go ttlafterfinished.New(
|
go ttlafterfinished.New(
|
||||||
|
ctx,
|
||||||
controllerContext.InformerFactory.Batch().V1().Jobs(),
|
controllerContext.InformerFactory.Batch().V1().Jobs(),
|
||||||
controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
|
controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
|
||||||
).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs))
|
).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs))
|
||||||
|
@ -21,10 +21,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
batch "k8s.io/api/batch/v1"
|
batch "k8s.io/api/batch/v1"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
@ -36,6 +34,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubectl/pkg/scheme"
|
"k8s.io/kubectl/pkg/scheme"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
jobutil "k8s.io/kubernetes/pkg/controller/job"
|
jobutil "k8s.io/kubernetes/pkg/controller/job"
|
||||||
@ -70,7 +69,7 @@ type Controller struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New creates an instance of Controller
|
// New creates an instance of Controller
|
||||||
func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
|
func New(ctx context.Context, jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartStructuredLogging(0)
|
eventBroadcaster.StartStructuredLogging(0)
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
|
||||||
@ -83,9 +82,14 @@ func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Co
|
|||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: tc.addJob,
|
AddFunc: func(obj interface{}) {
|
||||||
UpdateFunc: tc.updateJob,
|
tc.addJob(logger, obj)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
tc.updateJob(logger, oldObj, newObj)
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
tc.jLister = jobInformer.Lister()
|
tc.jLister = jobInformer.Lister()
|
||||||
@ -101,8 +105,9 @@ func (tc *Controller) Run(ctx context.Context, workers int) {
|
|||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer tc.queue.ShutDown()
|
defer tc.queue.ShutDown()
|
||||||
|
|
||||||
klog.Infof("Starting TTL after finished controller")
|
logger := klog.FromContext(ctx)
|
||||||
defer klog.Infof("Shutting down TTL after finished controller")
|
logger.Info("Starting TTL after finished controller")
|
||||||
|
defer logger.Info("Shutting down TTL after finished controller")
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) {
|
if !cache.WaitForNamedCacheSync("TTL after finished", ctx.Done(), tc.jListerSynced) {
|
||||||
return
|
return
|
||||||
@ -115,26 +120,27 @@ func (tc *Controller) Run(ctx context.Context, workers int) {
|
|||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *Controller) addJob(obj interface{}) {
|
func (tc *Controller) addJob(logger klog.Logger, obj interface{}) {
|
||||||
job := obj.(*batch.Job)
|
job := obj.(*batch.Job)
|
||||||
klog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
|
logger.V(4).Info("Adding job", "job", klog.KObj(job))
|
||||||
|
|
||||||
if job.DeletionTimestamp == nil && needsCleanup(job) {
|
if job.DeletionTimestamp == nil && needsCleanup(job) {
|
||||||
tc.enqueue(job)
|
tc.enqueue(logger, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *Controller) updateJob(old, cur interface{}) {
|
func (tc *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
|
||||||
job := cur.(*batch.Job)
|
job := cur.(*batch.Job)
|
||||||
klog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
|
logger.V(4).Info("Updating job", "job", klog.KObj(job))
|
||||||
|
|
||||||
if job.DeletionTimestamp == nil && needsCleanup(job) {
|
if job.DeletionTimestamp == nil && needsCleanup(job) {
|
||||||
tc.enqueue(job)
|
tc.enqueue(logger, job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *Controller) enqueue(job *batch.Job) {
|
func (tc *Controller) enqueue(logger klog.Logger, job *batch.Job) {
|
||||||
klog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
|
logger.V(4).Info("Add job to cleanup", "job", klog.KObj(job))
|
||||||
key, err := controller.KeyFunc(job)
|
key, err := controller.KeyFunc(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
|
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
|
||||||
@ -193,9 +199,12 @@ func (tc *Controller) processJob(ctx context.Context, key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
|
|
||||||
// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
|
// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
|
||||||
job, err := tc.jLister.Jobs(namespace).Get(name)
|
job, err := tc.jLister.Jobs(namespace).Get(name)
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
logger.V(4).Info("Checking if Job is ready for cleanup", "job", klog.KRef(namespace, name))
|
||||||
|
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -203,7 +212,7 @@ func (tc *Controller) processJob(ctx context.Context, key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if expiredAt, err := tc.processTTL(job); err != nil {
|
if expiredAt, err := tc.processTTL(logger, job); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if expiredAt == nil {
|
} else if expiredAt == nil {
|
||||||
return nil
|
return nil
|
||||||
@ -221,7 +230,7 @@ func (tc *Controller) processJob(ctx context.Context, key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Use the latest Job TTL to see if the TTL truly expires.
|
// Use the latest Job TTL to see if the TTL truly expires.
|
||||||
expiredAt, err := tc.processTTL(fresh)
|
expiredAt, err := tc.processTTL(logger, fresh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
} else if expiredAt == nil {
|
} else if expiredAt == nil {
|
||||||
@ -233,7 +242,7 @@ func (tc *Controller) processJob(ctx context.Context, key string) error {
|
|||||||
PropagationPolicy: &policy,
|
PropagationPolicy: &policy,
|
||||||
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
|
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
|
logger.V(4).Info("Cleaning up Job", "job", klog.KObj(fresh))
|
||||||
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
|
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -243,14 +252,15 @@ func (tc *Controller) processJob(ctx context.Context, key string) error {
|
|||||||
|
|
||||||
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
|
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
|
||||||
// if the TTL will expire later.
|
// if the TTL will expire later.
|
||||||
func (tc *Controller) processTTL(job *batch.Job) (expiredAt *time.Time, err error) {
|
func (tc *Controller) processTTL(logger klog.Logger, job *batch.Job) (expiredAt *time.Time, err error) {
|
||||||
|
|
||||||
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
|
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
|
||||||
if job.DeletionTimestamp != nil || !needsCleanup(job) {
|
if job.DeletionTimestamp != nil || !needsCleanup(job) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
now := tc.clock.Now()
|
now := tc.clock.Now()
|
||||||
t, e, err := timeLeft(job, &now)
|
t, e, err := timeLeft(logger, job, &now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -282,16 +292,17 @@ func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) {
|
|||||||
return &finishAt, &expireAt, nil
|
return &finishAt, &expireAt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) {
|
func timeLeft(logger klog.Logger, j *batch.Job, since *time.Time) (*time.Duration, *time.Time, error) {
|
||||||
finishAt, expireAt, err := getFinishAndExpireTime(j)
|
finishAt, expireAt, err := getFinishAndExpireTime(j)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if finishAt.After(*since) {
|
if finishAt.After(*since) {
|
||||||
klog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
|
logger.Info("Warning: Found Job finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", "job", klog.KObj(j))
|
||||||
}
|
}
|
||||||
remaining := expireAt.Sub(*since)
|
remaining := expireAt.Sub(*since)
|
||||||
klog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
|
logger.V(4).Info("Found Job finished", "job", klog.KObj(j), "finishTime", finishAt.UTC(), "remainingTTL", remaining, "startTime", since.UTC(), "deadlineTTL", expireAt.UTC())
|
||||||
return &remaining, expireAt, nil
|
return &remaining, expireAt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package ttlafterfinished
|
package ttlafterfinished
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"k8s.io/klog/v2"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -24,6 +25,7 @@ import (
|
|||||||
batch "k8s.io/api/batch/v1"
|
batch "k8s.io/api/batch/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/klog/v2/ktesting"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -161,8 +163,11 @@ func TestTimeLeft(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
|
||||||
job := newJob(tc.completionTime, tc.failedTime, tc.ttl)
|
job := newJob(tc.completionTime, tc.failedTime, tc.ttl)
|
||||||
gotTimeLeft, gotExpireAt, gotErr := timeLeft(job, tc.since)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
gotTimeLeft, gotExpireAt, gotErr := timeLeft(logger, job, tc.since)
|
||||||
if tc.expectErr != (gotErr != nil) {
|
if tc.expectErr != (gotErr != nil) {
|
||||||
t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr)
|
t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user