Merge pull request #116910 from fatsheep9146/job-controller-contextual-logging

Migrated pkg/controller/job to contextual logging
This commit is contained in:
Kubernetes Prow Robot 2023-06-14 08:00:18 -07:00 committed by GitHub
commit 47e79b8156
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 185 additions and 125 deletions

View File

@ -30,6 +30,7 @@ import (
func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go job.NewController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("job-controller"),

View File

@ -46,7 +46,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslice/.*
-contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.*
-contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.*
-contextual k8s.io/kubernetes/pkg/controller/job/.*
-contextual k8s.io/kubernetes/pkg/controller/nodeipam/.*
-contextual k8s.io/kubernetes/pkg/controller/podgc/.*
-contextual k8s.io/kubernetes/pkg/controller/replicaset/.*

View File

@ -51,8 +51,8 @@ type orderedIntervals []interval
// The old list is solely based off .status.completedIndexes, but returns an
// empty list if this Job is not tracked with finalizers. The new list includes
// the indexes that succeeded since the last sync.
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
prevIntervals := succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions))
func calculateSucceededIndexes(logger klog.Logger, job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
prevIntervals := succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
newSucceeded := sets.New[int]()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
@ -148,7 +148,7 @@ func (oi orderedIntervals) has(ix int) bool {
return oi[hi].First <= ix
}
func succeededIndexesFromString(completedIndexes string, completions int) orderedIntervals {
func succeededIndexesFromString(logger klog.Logger, completedIndexes string, completions int) orderedIntervals {
if completedIndexes == "" {
return nil
}
@ -160,7 +160,7 @@ func succeededIndexesFromString(completedIndexes string, completions int) ordere
var err error
inter.First, err = strconv.Atoi(limitsStr[0])
if err != nil {
klog.InfoS("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err)
logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err)
continue
}
if inter.First >= completions {
@ -169,7 +169,7 @@ func succeededIndexesFromString(completedIndexes string, completions int) ordere
if len(limitsStr) > 1 {
inter.Last, err = strconv.Atoi(limitsStr[1])
if err != nil {
klog.InfoS("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err)
logger.Info("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err)
continue
}
if inter.Last >= completions {

View File

@ -22,12 +22,14 @@ import (
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/pointer"
)
const noIndex = "-"
func TestCalculateSucceededIndexes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
cases := map[string]struct {
prevSucceeded string
pods []indexPhase
@ -206,7 +208,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
for _, p := range pods {
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
}
gotStatusIntervals, gotIntervals := calculateSucceededIndexes(job, pods)
gotStatusIntervals, gotIntervals := calculateSucceededIndexes(logger, job, pods)
if diff := cmp.Diff(tc.wantStatusIntervals, gotStatusIntervals); diff != "" {
t.Errorf("Unexpected completed indexes from status (-want,+got):\n%s", diff)
}

View File

@ -132,12 +132,13 @@ type Controller struct {
// NewController creates a new Job controller that keeps the relevant pods
// in sync with their corresponding Job objects.
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
return newControllerWithClock(podInformer, jobInformer, kubeClient, &clock.RealClock{})
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
return newControllerWithClock(ctx, podInformer, jobInformer, kubeClient, &clock.RealClock{})
}
func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller {
func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller {
eventBroadcaster := record.NewBroadcaster()
logger := klog.FromContext(ctx)
jm := &Controller{
kubeClient: kubeClient,
@ -160,19 +161,27 @@ func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer b
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
jm.enqueueController(logger, obj, true)
},
UpdateFunc: func(oldObj, newObj interface{}) {
jm.updateJob(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
jm.deleteJob(logger, obj)
},
UpdateFunc: jm.updateJob,
DeleteFunc: jm.deleteJob,
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
AddFunc: func(obj interface{}) {
jm.addPod(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
jm.updatePod(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
jm.deletePod(obj, true)
jm.deletePod(logger, obj, true)
},
})
jm.podStore = podInformer.Lister()
@ -190,6 +199,7 @@ func newControllerWithClock(podInformer coreinformers.PodInformer, jobInformer b
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
// Start events processing pipeline.
jm.broadcaster.StartStructuredLogging(0)
@ -199,8 +209,8 @@ func (jm *Controller) Run(ctx context.Context, workers int) {
defer jm.queue.ShutDown()
defer jm.orphanQueue.ShutDown()
klog.Infof("Starting job controller")
defer klog.Infof("Shutting down job controller")
logger.Info("Starting job controller")
defer logger.Info("Shutting down job controller")
if !cache.WaitForNamedCacheSync("job", ctx.Done(), jm.podStoreSynced, jm.jobStoreSynced) {
return
@ -255,13 +265,13 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta
}
// When a pod is created, enqueue the controller that manages it and update its expectations.
func (jm *Controller) addPod(obj interface{}) {
func (jm *Controller) addPod(logger klog.Logger, obj interface{}) {
pod := obj.(*v1.Pod)
recordFinishedPodWithTrackingFinalizer(nil, pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
jm.deletePod(pod, false)
jm.deletePod(logger, pod, false)
return
}
@ -276,7 +286,7 @@ func (jm *Controller) addPod(obj interface{}) {
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueControllerPodUpdate(job, true)
jm.enqueueControllerPodUpdate(logger, job, true)
return
}
@ -290,14 +300,14 @@ func (jm *Controller) addPod(obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueControllerPodUpdate(job, true)
jm.enqueueControllerPodUpdate(logger, job, true)
}
}
// When a pod is updated, figure out what job/s manage it and wake them up.
// If the labels of the pod have changed we need to awaken both the old
// and new job. old and cur must be *v1.Pod types.
func (jm *Controller) updatePod(old, cur interface{}) {
func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
recordFinishedPodWithTrackingFinalizer(oldPod, curPod)
@ -311,7 +321,7 @@ func (jm *Controller) updatePod(old, cur interface{}) {
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an job to create more pods asap, not wait
// until the kubelet actually deletes the pod.
jm.deletePod(curPod, false)
jm.deletePod(logger, curPod, false)
return
}
@ -335,10 +345,10 @@ func (jm *Controller) updatePod(old, cur interface{}) {
if finalizerRemoved {
key, err := controller.KeyFunc(job)
if err == nil {
jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
}
}
jm.enqueueControllerPodUpdate(job, immediate)
jm.enqueueControllerPodUpdate(logger, job, immediate)
}
}
@ -351,10 +361,10 @@ func (jm *Controller) updatePod(old, cur interface{}) {
if finalizerRemoved {
key, err := controller.KeyFunc(job)
if err == nil {
jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
}
}
jm.enqueueControllerPodUpdate(job, immediate)
jm.enqueueControllerPodUpdate(logger, job, immediate)
return
}
@ -368,14 +378,14 @@ func (jm *Controller) updatePod(old, cur interface{}) {
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueControllerPodUpdate(job, immediate)
jm.enqueueControllerPodUpdate(logger, job, immediate)
}
}
}
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
func (jm *Controller) deletePod(obj interface{}, final bool) {
func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) {
pod, ok := obj.(*v1.Pod)
if final {
recordFinishedPodWithTrackingFinalizer(pod, nil)
@ -425,13 +435,13 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {
// Consider the finalizer removed if this is the final delete. Otherwise,
// it's an update for the deletion timestamp, then check finalizer.
if final || !hasFinalizer {
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
}
jm.enqueueControllerPodUpdate(job, true)
jm.enqueueControllerPodUpdate(logger, job, true)
}
func (jm *Controller) updateJob(old, cur interface{}) {
func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
oldJob := old.(*batch.Job)
curJob := cur.(*batch.Job)
@ -440,14 +450,16 @@ func (jm *Controller) updateJob(old, cur interface{}) {
if err != nil {
return
}
if curJob.Generation == oldJob.Generation {
// Delay the Job sync when no generation change to batch Job status updates,
// typically triggered by pod events.
jm.enqueueControllerPodUpdate(curJob, true)
jm.enqueueControllerPodUpdate(logger, curJob, true)
} else {
// Trigger immediate sync when spec is changed.
jm.enqueueController(curJob, true)
jm.enqueueController(logger, curJob, true)
}
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds
@ -460,15 +472,15 @@ func (jm *Controller) updateJob(old, cur interface{}) {
total := time.Duration(*curADS) * time.Second
// AddAfter will handle total < passed
jm.queue.AddAfter(key, total-passed)
klog.V(4).Infof("job %q ActiveDeadlineSeconds updated, will rsync after %d seconds", key, total-passed)
logger.V(4).Info("job's ActiveDeadlineSeconds updated, will rsync", "key", key, "interval", total-passed)
}
}
}
// deleteJob enqueues the job and all the pods associated with it that still
// have a finalizer.
func (jm *Controller) deleteJob(obj interface{}) {
jm.enqueueController(obj, true)
func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
jm.enqueueController(logger, obj, true)
jobObj, ok := obj.(*batch.Job)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -499,15 +511,15 @@ func (jm *Controller) deleteJob(obj interface{}) {
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
jm.enqueueControllerDelayed(obj, immediate, 0)
func (jm *Controller) enqueueController(logger klog.Logger, obj interface{}, immediate bool) {
jm.enqueueControllerDelayed(logger, obj, immediate, 0)
}
func (jm *Controller) enqueueControllerPodUpdate(obj interface{}, immediate bool) {
jm.enqueueControllerDelayed(obj, immediate, jm.podUpdateBatchPeriod)
func (jm *Controller) enqueueControllerPodUpdate(logger klog.Logger, obj interface{}, immediate bool) {
jm.enqueueControllerDelayed(logger, obj, immediate, jm.podUpdateBatchPeriod)
}
func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool, delay time.Duration) {
func (jm *Controller) enqueueControllerDelayed(logger klog.Logger, obj interface{}, immediate bool, delay time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
@ -527,7 +539,7 @@ func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool,
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
klog.Infof("enqueueing job %s", key)
logger.Info("enqueueing job", "key", key)
jm.queue.AddAfter(key, backoff)
}
@ -591,8 +603,9 @@ func (jm Controller) processNextOrphanPod(ctx context.Context) bool {
// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
startTime := jm.clock.Now()
logger := klog.FromContext(ctx)
defer func() {
klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, jm.clock.Since(startTime))
logger.V(4).Info("Finished syncing orphan pod", "pod", key, "elapsed", jm.clock.Since(startTime))
}()
ns, name, err := cache.SplitMetaNamespaceKey(key)
@ -603,7 +616,7 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
sharedPod, err := jm.podStore.Pods(ns).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Orphan pod has been deleted: %v", key)
logger.V(4).Info("Orphan pod has been deleted", "pod", key)
return nil
}
return err
@ -679,8 +692,9 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po
// concurrently with the same key.
func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
startTime := jm.clock.Now()
logger := klog.FromContext(ctx)
defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, jm.clock.Since(startTime))
logger.V(4).Info("Finished syncing job", "key", key, "elapsed", jm.clock.Since(startTime))
}()
ns, name, err := cache.SplitMetaNamespaceKey(key)
@ -693,9 +707,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Job has been deleted: %v", key)
logger.V(4).Info("Job has been deleted", "key", key)
jm.expectations.DeleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(logger, key)
err := jm.backoffRecordStore.removeBackoffRecord(key)
if err != nil {
@ -798,14 +812,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
}
var prevSucceededIndexes, succeededIndexes orderedIntervals
if isIndexedJob(&job) {
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
succeeded = int32(succeededIndexes.total())
}
suspendCondChanged := false
@ -883,7 +897,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if apierrors.IsConflict(err) {
// we probably have a stale informer cache
// so don't return an error to avoid backoff
jm.enqueueController(&job, false)
jm.enqueueController(logger, &job, false)
return nil
}
return fmt.Errorf("tracking status: %w", err)
@ -927,12 +941,13 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods))
logger := klog.FromContext(ctx)
failDelete := func(pod *v1.Pod, err error) {
// Decrement the expected number of deletes because the informer won't observe this deletion
jm.expectations.DeletionObserved(jobKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
logger.V(2).Info("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
atomic.AddInt32(&successfulDeletes, -1)
errCh <- err
utilruntime.HandleError(err)
@ -969,6 +984,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
// It does this up to a limited number of Pods so that the size of .status
// doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error {
logger := klog.FromContext(ctx)
isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods
@ -1080,7 +1096,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
if jobFinished {
jm.recordJobFinished(job, finishedCond)
}
recordJobPodFinished(job, oldCounters)
recordJobPodFinished(logger, job, oldCounters)
}
return nil
}
@ -1097,6 +1113,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) {
logger := klog.FromContext(ctx)
var err error
if needsFlush {
if job, err = jm.updateStatusHandler(ctx, job); err != nil {
@ -1109,10 +1126,10 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
// this error might undercount the backoff.
// re-syncing from the current state might not help to recover
// the backoff information
klog.ErrorS(err, "Backoff update failed")
logger.Error(err, "Backoff update failed")
}
recordJobPodFinished(job, *oldCounters)
recordJobPodFinished(logger, job, *oldCounters)
// Shallow copy, as it will only be used to detect changes in the counters.
*oldCounters = job.Status
needsFlush = false
@ -1173,6 +1190,7 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali
// of the i-th Pod was successfully removed (if the pod was deleted when this
// function was called, it's considered as the finalizer was removed successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) {
logger := klog.FromContext(ctx)
errCh := make(chan error, len(pods))
succeeded := make([]bool, len(pods))
uids := make([]string, len(pods))
@ -1180,7 +1198,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
uids[i] = string(p.UID)
}
if jobKey != "" {
err := jm.finalizerExpectations.expectFinalizersRemoved(jobKey, uids)
err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids)
if err != nil {
return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err)
}
@ -1196,7 +1214,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
// In case of any failure, we don't expect a Pod update for the
// finalizer removed. Clear expectation now.
if jobKey != "" {
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
}
if !apierrors.IsNotFound(err) {
errCh <- err
@ -1359,6 +1377,7 @@ func jobSuspended(job *batch.Job) bool {
// Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, backoff backoffRecord) (int32, string, error) {
logger := klog.FromContext(ctx)
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
@ -1368,7 +1387,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
}
if jobSuspended(job) {
klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, activePods, int(active))
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
@ -1408,7 +1427,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
}
if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
// While it is possible for a Job to require both pod creations and
@ -1421,7 +1440,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if active < wantActive {
remainingTime := backoff.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff)
if remainingTime > 0 {
jm.enqueueControllerDelayed(job, true, remainingTime)
jm.enqueueControllerDelayed(logger, job, true, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil
}
diff := wantActive - active
@ -1431,7 +1450,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh := make(chan error, diff)
klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
logger.V(4).Info("Too few pods running", "key", jobKey, "need", wantActive, "creating", diff)
wait := sync.WaitGroup{}
@ -1486,7 +1505,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
logger.V(2).Info("Failed creation, decrementing expectations", "job", klog.KObj(job))
jm.expectations.CreationObserved(jobKey)
atomic.AddInt32(&active, -1)
errCh <- err
@ -1497,7 +1516,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := diff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
logger.V(2).Info("Slow-start failure. Skipping creating pods, decrementing expectations", "skippedCount", skippedPods, "job", klog.KObj(job))
active -= skippedPods
for i := int32(0); i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
@ -1715,7 +1734,7 @@ func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType
return nil
}
func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.JobStatus) {
completionMode := completionModeStr(job)
var diff int
@ -1726,7 +1745,7 @@ func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
// now out of range (i.e. index >= spec.Completions).
if isIndexedJob(job) {
if job.Status.CompletedIndexes != oldCounters.CompletedIndexes {
diff = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total()
diff = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total()
}
} else {
diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded)

View File

@ -49,6 +49,8 @@ import (
"k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing"
metricstestutil "k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
@ -116,13 +118,13 @@ func newJob(parallelism, completions, backoffLimit int32, completionMode batch.C
return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode)
}
func newControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) {
return newControllerFromClientWithClock(kubeClient, resyncPeriod, realClock)
func newControllerFromClient(ctx context.Context, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) {
return newControllerFromClientWithClock(ctx, kubeClient, resyncPeriod, realClock)
}
func newControllerFromClientWithClock(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) {
func newControllerFromClientWithClock(ctx context.Context, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) {
sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
jm := newControllerWithClock(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock)
jm := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock)
jm.podControl = &controller.FakePodControl{}
return jm, sharedInformers
}
@ -219,6 +221,7 @@ type jobInitialStatus struct {
}
func TestControllerSyncJob(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
jobConditionComplete := batch.JobComplete
jobConditionFailed := batch.JobFailed
jobConditionSuspended := batch.JobSuspended
@ -790,7 +793,7 @@ func TestControllerSyncJob(t *testing.T) {
fakeClock = clocktesting.NewFakeClock(time.Now())
}
manager, sharedInformerFactory := newControllerFromClientWithClock(clientSet, controller.NoResyncPeriodFunc, fakeClock)
manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientSet, controller.NoResyncPeriodFunc, fakeClock)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -1090,6 +1093,7 @@ func TestGetNewFinshedPods(t *testing.T) {
}
func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
succeededCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now())
failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", realClock.Now())
indexedCompletion := batch.IndexedCompletion
@ -1632,7 +1636,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
manager, _ := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
metrics.JobPodsFinished.Reset()
manager.podControl = &fakePodControl
@ -1648,7 +1652,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
var succeededIndexes orderedIntervals
if isIndexedJob(job) {
succeededIndexes = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions))
succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
}
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{})
if !errors.Is(err, tc.wantErr) {
@ -1684,6 +1688,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
// TestSyncJobPastDeadline verifies tracking of active deadline in a single syncJob call.
func TestSyncJobPastDeadline(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testCases := map[string]struct {
// job setup
parallelism int32
@ -1773,7 +1778,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
t.Run(name, func(t *testing.T) {
// job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -1848,9 +1853,10 @@ func hasTrueCondition(job *batch.Job) *batch.JobConditionType {
// TestPastDeadlineJobFinished ensures that a Job is correctly tracked until
// reaching the active deadline, at which point it is marked as Failed.
func TestPastDeadlineJobFinished(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := fake.NewSimpleClientset()
fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second))
manager, sharedInformerFactory := newControllerFromClientWithClock(clientset, controller.NoResyncPeriodFunc, fakeClock)
manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.expectations = FakeJobExpectations{
@ -1927,8 +1933,9 @@ func TestPastDeadlineJobFinished(t *testing.T) {
}
func TestSingleJobFailedCondition(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -1966,8 +1973,9 @@ func TestSingleJobFailedCondition(t *testing.T) {
}
func TestSyncJobComplete(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -1991,8 +1999,9 @@ func TestSyncJobComplete(t *testing.T) {
}
func TestSyncJobDeleted(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, _ := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -2014,6 +2023,7 @@ func TestSyncJobDeleted(t *testing.T) {
}
func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
now := metav1.Now()
indexedCompletionMode := batch.IndexedCompletion
validObjectMeta := metav1.ObjectMeta{
@ -3038,7 +3048,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -3094,6 +3104,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}
func TestSyncJobUpdateRequeue(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobBackOff = 10 * time.Second }()
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
@ -3114,7 +3125,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -3143,6 +3154,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
}
func TestUpdateJobRequeue(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
cases := map[string]struct {
oldJob *batch.Job
@ -3167,7 +3179,7 @@ func TestUpdateJobRequeue(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
@ -3176,7 +3188,7 @@ func TestUpdateJobRequeue(t *testing.T) {
if tc.updateFn != nil {
tc.updateFn(newJob)
}
manager.updateJob(tc.oldJob, newJob)
manager.updateJob(logger, tc.oldJob, newJob)
gotRequeuedImmediately := manager.queue.Len() > 0
if tc.wantRequeuedImmediately != gotRequeuedImmediately {
t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately)
@ -3186,8 +3198,9 @@ func TestUpdateJobRequeue(t *testing.T) {
}
func TestJobPodLookup(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
testCases := []struct {
@ -3268,6 +3281,7 @@ func TestJobPodLookup(t *testing.T) {
}
func TestGetPodsForJob(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
job := newJob(1, 1, 6, batch.NonIndexedCompletion)
job.Name = "test_job"
otherJob := newJob(1, 1, 6, batch.NonIndexedCompletion)
@ -3330,7 +3344,7 @@ func TestGetPodsForJob(t *testing.T) {
job.DeletionTimestamp = &metav1.Time{}
}
clientSet := fake.NewSimpleClientset(job, otherJob)
jm, informer := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
cachedJob := job.DeepCopy()
@ -3368,8 +3382,11 @@ func TestGetPodsForJob(t *testing.T) {
}
func TestAddPod(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3387,7 +3404,7 @@ func TestAddPod(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.addPod(pod1)
jm.addPod(logger, pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3400,7 +3417,7 @@ func TestAddPod(t *testing.T) {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
jm.addPod(pod2)
jm.addPod(logger, pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3415,8 +3432,9 @@ func TestAddPod(t *testing.T) {
}
func TestAddPodOrphan(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3438,15 +3456,17 @@ func TestAddPodOrphan(t *testing.T) {
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.addPod(pod1)
jm.addPod(logger, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePod(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3466,7 +3486,7 @@ func TestUpdatePod(t *testing.T) {
prev := *pod1
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
jm.updatePod(logger, &prev, pod1)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3481,7 +3501,7 @@ func TestUpdatePod(t *testing.T) {
prev = *pod2
bumpResourceVersion(pod2)
jm.updatePod(&prev, pod2)
jm.updatePod(logger, &prev, pod2)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3496,8 +3516,9 @@ func TestUpdatePod(t *testing.T) {
}
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3518,15 +3539,17 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
prev := *pod1
prev.Labels = map[string]string{"foo2": "bar2"}
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
jm.updatePod(logger, &prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePodChangeControllerRef(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3546,15 +3569,17 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
prev := *pod1
prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)}
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
jm.updatePod(logger, &prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdatePodRelease(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3574,15 +3599,16 @@ func TestUpdatePodRelease(t *testing.T) {
prev := *pod1
pod1.OwnerReferences = nil
bumpResourceVersion(pod1)
jm.updatePod(&prev, pod1)
jm.updatePod(logger, &prev, pod1)
if got, want := jm.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestDeletePod(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3600,7 +3626,7 @@ func TestDeletePod(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.deletePod(pod1, true)
jm.deletePod(logger, pod1, true)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3613,7 +3639,7 @@ func TestDeletePod(t *testing.T) {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
jm.deletePod(pod2, true)
jm.deletePod(logger, pod2, true)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3628,8 +3654,9 @@ func TestDeletePod(t *testing.T) {
}
func TestDeletePodOrphan(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
jm, informer := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
// Disable batching of pod updates.
@ -3650,7 +3677,7 @@ func TestDeletePodOrphan(t *testing.T) {
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.deletePod(pod1, true)
jm.deletePod(logger, pod1, true)
if got, want := jm.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
@ -3670,8 +3697,9 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
// TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods
// and checking expectations.
func TestSyncJobExpectations(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -3704,10 +3732,11 @@ func TestSyncJobExpectations(t *testing.T) {
}
func TestWatchJobs(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := fake.NewSimpleClientset()
fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
@ -3748,11 +3777,12 @@ func TestWatchJobs(t *testing.T) {
}
func TestWatchPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testJob := newJob(2, 2, 6, batch.NonIndexedCompletion)
clientset := fake.NewSimpleClientset(testJob)
fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
@ -3795,9 +3825,10 @@ func TestWatchPods(t *testing.T) {
}
func TestWatchOrphanPods(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
@ -3876,6 +3907,7 @@ type pods struct {
}
func TestJobBackoffReset(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testCases := map[string]struct {
// job setup
parallelism int32
@ -3905,7 +3937,7 @@ func TestJobBackoffReset(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobBackOff = 10 * time.Second }()
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -3966,6 +3998,8 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio
}
func TestJobBackoff(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
logger := klog.FromContext(ctx)
job := newJob(1, 1, 1, batch.NonIndexedCompletion)
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
oldPod.ResourceVersion = "1"
@ -4038,7 +4072,7 @@ func TestJobBackoff(t *testing.T) {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -4053,7 +4087,7 @@ func TestJobBackoff(t *testing.T) {
if tc.oldPodPhase != "" {
oldPod.Status.Phase = tc.oldPodPhase
}
manager.updatePod(oldPod, newPod)
manager.updatePod(logger, oldPod, newPod)
if queue.duration != tc.wantBackoff {
t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff)
}
@ -4062,6 +4096,7 @@ func TestJobBackoff(t *testing.T) {
}
func TestJobBackoffForOnFailure(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
jobConditionComplete := batch.JobComplete
jobConditionFailed := batch.JobFailed
jobConditionSuspended := batch.JobSuspended
@ -4146,7 +4181,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
t.Run(name, func(t *testing.T) {
// job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -4193,6 +4228,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
}
func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
jobConditionFailed := batch.JobFailed
testCases := map[string]struct {
@ -4246,7 +4282,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
t.Run(name, func(t *testing.T) {
// job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -4378,9 +4414,10 @@ func TestEnsureJobConditions(t *testing.T) {
}
func TestFinalizersRemovedExpectations(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")}

View File

@ -74,8 +74,8 @@ func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set
// ExpectDeletions records expectations for the given deleteKeys, against the
// given job-key.
// This is thread-safe across different job keys.
func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deletedKeys []string) error {
klog.V(4).InfoS("Expecting tracking finalizers removed", "job", jobKey, "podUIDs", deletedKeys)
func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []string) error {
logger.V(4).Info("Expecting tracking finalizers removed", "key", jobKey, "podUIDs", deletedKeys)
uids := u.getSet(jobKey)
if uids == nil {
@ -94,12 +94,12 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deleted
}
// FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job.
func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey string) {
func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey, deleteKey string) {
uids := u.getSet(jobKey)
if uids != nil {
uids.Lock()
if uids.set.Has(deleteKey) {
klog.V(4).InfoS("Observed tracking finalizer removed", "job", jobKey, "podUID", deleteKey)
logger.V(4).Info("Observed tracking finalizer removed", "key", jobKey, "podUID", deleteKey)
uids.set.Delete(deleteKey)
}
uids.Unlock()
@ -107,11 +107,11 @@ func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey str
}
// DeleteExpectations deletes the UID set.
func (u *uidTrackingExpectations) deleteExpectations(jobKey string) {
func (u *uidTrackingExpectations) deleteExpectations(logger klog.Logger, jobKey string) {
set := u.getSet(jobKey)
if set != nil {
if err := u.store.Delete(set); err != nil {
klog.ErrorS(err, "Could not delete tracking annotation UID expectations", "job", jobKey)
logger.Error(err, "Could not delete tracking annotation UID expectations", "key", jobKey)
}
}
}

View File

@ -27,10 +27,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller/job/metrics"
)
func TestUIDTrackingExpectations(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
tracks := []struct {
job string
firstRound []string
@ -62,7 +64,7 @@ func TestUIDTrackingExpectations(t *testing.T) {
for i := range tracks {
track := tracks[i]
go func(errID int) {
errs[errID] = expectations.expectFinalizersRemoved(track.job, track.firstRound)
errs[errID] = expectations.expectFinalizersRemoved(logger, track.job, track.firstRound)
wg.Done()
}(i)
}
@ -90,12 +92,12 @@ func TestUIDTrackingExpectations(t *testing.T) {
for _, uid := range track.firstRound {
uid := uid
go func() {
expectations.finalizerRemovalObserved(track.job, uid)
expectations.finalizerRemovalObserved(logger, track.job, uid)
wg.Done()
}()
}
go func(errID int) {
errs[errID] = expectations.expectFinalizersRemoved(track.job, track.secondRound)
errs[errID] = expectations.expectFinalizersRemoved(logger, track.job, track.secondRound)
wg.Done()
}(i)
}
@ -116,7 +118,7 @@ func TestUIDTrackingExpectations(t *testing.T) {
}
}
for _, track := range tracks {
expectations.deleteExpectations(track.job)
expectations.deleteExpectations(logger, track.job)
uids := expectations.getSet(track.job)
if uids != nil {
t.Errorf("Wanted expectations for job %s to be cleared, but they were not", track.job)

View File

@ -52,7 +52,7 @@ func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc
if err != nil {
t.Fatalf("Error creating CronJob controller: %v", err)
}
jc := job.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
jc := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
return server.TearDownFn, cjc, jc, informerSet, clientSet
}

View File

@ -2086,7 +2086,7 @@ func resetMetrics() {
func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller"))
ctx, cancel := context.WithCancel(context.Background())
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
jc := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
return jc, ctx, cancel
}