diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index 92882973db5..6ee175659d1 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -34,11 +34,12 @@ func startJobController(ctx context.Context, controllerContext ControllerContext controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.ClientBuilder.ClientOrDie("job-controller"), - ).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done()) + ).Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs)) return nil, true, nil } func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.InformerFactory.Batch().V1().CronJobs(), controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), @@ -46,6 +47,7 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon if err != nil { return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) } - go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Done()) + + go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs)) return nil, true, nil } diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index a86bcba9e82..94225f42baf 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -66,7 +66,7 @@ func (m *BaseControllerRefManager) CanAdopt(ctx context.Context) error { // own the object. // // No reconciliation will be attempted if the controller is being deleted. -func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt func(context.Context, metav1.Object) error, release func(metav1.Object) error) (bool, error) { +func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt, release func(context.Context, metav1.Object) error) (bool, error) { controllerRef := metav1.GetControllerOfNoCopy(obj) if controllerRef != nil { if controllerRef.UID != m.Controller.GetUID() { @@ -85,7 +85,7 @@ func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.O if m.Controller.GetDeletionTimestamp() != nil { return false, nil } - if err := release(obj); err != nil { + if err := release(ctx, obj); err != nil { // If the pod no longer exists, ignore the error. if errors.IsNotFound(err) { return false, nil @@ -200,8 +200,8 @@ func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, adopt := func(ctx context.Context, obj metav1.Object) error { return m.AdoptPod(ctx, obj.(*v1.Pod)) } - release := func(obj metav1.Object) error { - return m.ReleasePod(obj.(*v1.Pod)) + release := func(ctx context.Context, obj metav1.Object) error { + return m.ReleasePod(ctx, obj.(*v1.Pod)) } for _, pod := range pods { @@ -230,19 +230,19 @@ func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) err if err != nil { return err } - return m.podControl.PatchPod(pod.Namespace, pod.Name, patchBytes) + return m.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patchBytes) } // ReleasePod sends a patch to free the pod from the control of the controller. // It returns the error if the patching fails. 404 and 422 errors are ignored. -func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { +func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error { klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...) if err != nil { return err } - err = m.podControl.PatchPod(pod.Namespace, pod.Name, patchBytes) + err = m.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patchBytes) if err != nil { if errors.IsNotFound(err) { // If the pod no longer exists, ignore it. @@ -326,8 +326,8 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(ctx context.Context, s adopt := func(ctx context.Context, obj metav1.Object) error { return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet)) } - release := func(obj metav1.Object) error { - return m.ReleaseReplicaSet(obj.(*apps.ReplicaSet)) + release := func(ctx context.Context, obj metav1.Object) error { + return m.ReleaseReplicaSet(ctx, obj.(*apps.ReplicaSet)) } for _, rs := range sets { @@ -355,19 +355,19 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs if err != nil { return err } - return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, patchBytes) + return m.rsControl.PatchReplicaSet(ctx, rs.Namespace, rs.Name, patchBytes) } // ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller. // It returns the error if the patching fails. 404 and 422 errors are ignored. -func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.ReplicaSet) error { +func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(ctx context.Context, replicaSet *apps.ReplicaSet) error { klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()}) if err != nil { return err } - err = m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, patchBytes) + err = m.rsControl.PatchReplicaSet(ctx, replicaSet.Namespace, replicaSet.Name, patchBytes) if err != nil { if errors.IsNotFound(err) { // If the ReplicaSet no longer exists, ignore it. @@ -464,8 +464,8 @@ func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(ctx co adopt := func(ctx context.Context, obj metav1.Object) error { return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision)) } - release := func(obj metav1.Object) error { - return m.ReleaseControllerRevision(obj.(*apps.ControllerRevision)) + release := func(ctx context.Context, obj metav1.Object) error { + return m.ReleaseControllerRevision(ctx, obj.(*apps.ControllerRevision)) } for _, h := range histories { @@ -493,12 +493,12 @@ func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx con if err != nil { return err } - return m.crControl.PatchControllerRevision(history.Namespace, history.Name, patchBytes) + return m.crControl.PatchControllerRevision(ctx, history.Namespace, history.Name, patchBytes) } // ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller. // It returns the error if the patching fails. 404 and 422 errors are ignored. -func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(history *apps.ControllerRevision) error { +func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(ctx context.Context, history *apps.ControllerRevision) error { klog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s", history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()}) @@ -506,7 +506,7 @@ func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(histo return err } - err = m.crControl.PatchControllerRevision(history.Namespace, history.Name, patchBytes) + err = m.crControl.PatchControllerRevision(ctx, history.Namespace, history.Name, patchBytes) if err != nil { if errors.IsNotFound(err) { // If the ControllerRevision no longer exists, ignore it. diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 5faba19a89f..62242617583 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -406,7 +406,7 @@ const ( // ReplicaSets, as well as increment or decrement them. It is used // by the deployment controller to ease testing of actions that it takes. type RSControlInterface interface { - PatchReplicaSet(namespace, name string, data []byte) error + PatchReplicaSet(ctx context.Context, namespace, name string, data []byte) error } // RealRSControl is the default implementation of RSControllerInterface. @@ -417,8 +417,8 @@ type RealRSControl struct { var _ RSControlInterface = &RealRSControl{} -func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error { - _, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) +func (r RealRSControl) PatchReplicaSet(ctx context.Context, namespace, name string, data []byte) error { + _, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) return err } @@ -427,7 +427,7 @@ func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) erro // ControllerRevisions, as well as increment or decrement them. It is used // by the daemonset controller to ease testing of actions that it takes. type ControllerRevisionControlInterface interface { - PatchControllerRevision(namespace, name string, data []byte) error + PatchControllerRevision(ctx context.Context, namespace, name string, data []byte) error } // RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface. @@ -437,8 +437,8 @@ type RealControllerRevisionControl struct { var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{} -func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error { - _, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) +func (r RealControllerRevisionControl) PatchControllerRevision(ctx context.Context, namespace, name string, data []byte) error { + _, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) return err } @@ -446,13 +446,13 @@ func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name s // created as an interface to allow testing. type PodControlInterface interface { // CreatePods creates new pods according to the spec, and sets object as the pod's controller. - CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // CreatePodsWithGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName. - CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error + CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error // DeletePod deletes the pod identified by podID. - DeletePod(namespace string, podID string, object runtime.Object) error + DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error // PatchPod patches the pod. - PatchPod(namespace, name string, data []byte) error + PatchPod(ctx context.Context, namespace, name string, data []byte) error } // RealPodControl is the default implementation of PodControlInterface. @@ -513,11 +513,11 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error { return nil } -func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { - return r.CreatePodsWithGenerateName(namespace, template, controllerObject, controllerRef, "") +func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { + return r.CreatePodsWithGenerateName(ctx, namespace, template, controllerObject, controllerRef, "") } -func (r RealPodControl) CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error { +func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error { if err := validateControllerRef(controllerRef); err != nil { return err } @@ -528,11 +528,11 @@ func (r RealPodControl) CreatePodsWithGenerateName(namespace string, template *v if len(generateName) > 0 { pod.ObjectMeta.GenerateName = generateName } - return r.createPods(namespace, pod, controllerObject) + return r.createPods(ctx, namespace, pod, controllerObject) } -func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { - _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) +func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error { + _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) return err } @@ -561,11 +561,11 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec return pod, nil } -func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime.Object) error { +func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error { if len(labels.Set(pod.Labels)) == 0 { return fmt.Errorf("unable to create pods, no labels") } - newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { // only send an event if the namespace isn't terminating if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { @@ -584,13 +584,13 @@ func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime return nil } -func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error { +func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { accessor, err := meta.Accessor(object) if err != nil { return fmt.Errorf("object does not have ObjectMeta, %v", err) } klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) - if err := r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), podID, metav1.DeleteOptions{}); err != nil { + if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID) return err @@ -616,7 +616,7 @@ type FakePodControl struct { var _ PodControlInterface = &FakePodControl{} -func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { +func (f *FakePodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error { f.Lock() defer f.Unlock() f.Patches = append(f.Patches, data) @@ -626,11 +626,11 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { return nil } -func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { - return f.CreatePodsWithGenerateName(namespace, spec, object, controllerRef, "") +func (f *FakePodControl) CreatePods(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { + return f.CreatePodsWithGenerateName(ctx, namespace, spec, object, controllerRef, "") } -func (f *FakePodControl) CreatePodsWithGenerateName(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error { +func (f *FakePodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error { f.Lock() defer f.Unlock() f.CreateCallCount++ @@ -646,7 +646,7 @@ func (f *FakePodControl) CreatePodsWithGenerateName(namespace string, spec *v1.P return nil } -func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { +func (f *FakePodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { f.Lock() defer f.Unlock() f.DeletePodName = append(f.DeletePodName, podID) diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index afcc1091b00..26d89e38978 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -303,7 +303,7 @@ func TestCreatePods(t *testing.T) { controllerRef := metav1.NewControllerRef(controllerSpec, v1.SchemeGroupVersion.WithKind("ReplicationController")) // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template - err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec, controllerRef) + err := podControl.CreatePods(context.TODO(), ns, controllerSpec.Spec.Template, controllerSpec, controllerRef) assert.NoError(t, err, "unexpected error: %v", err) expectedPod := v1.Pod{ @@ -342,7 +342,7 @@ func TestCreatePodsWithGenerateName(t *testing.T) { // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template generateName := "hello-" - err := podControl.CreatePodsWithGenerateName(ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName) + err := podControl.CreatePodsWithGenerateName(context.TODO(), ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName) assert.NoError(t, err, "unexpected error: %v", err) expectedPod := v1.Pod{ @@ -371,7 +371,7 @@ func TestDeletePodsAllowsMissing(t *testing.T) { controllerSpec := newReplicationController(1) - err := podControl.DeletePod("namespace-name", "podName", controllerSpec) + err := podControl.DeletePod(context.TODO(), "namespace-name", "podName", controllerSpec) assert.True(t, apierrors.IsNotFound(err)) } diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index fba64961847..705de615c45 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -17,6 +17,7 @@ limitations under the License. package cronjob import ( + "context" "fmt" "reflect" "sort" @@ -123,37 +124,37 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer } // Run starts the main goroutine responsible for watching and syncing jobs. -func (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) { +func (jm *ControllerV2) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() klog.InfoS("Starting cronjob controller v2") defer klog.InfoS("Shutting down cronjob controller v2") - if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) { + if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(jm.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, jm.worker, time.Second) } - <-stopCh + <-ctx.Done() } -func (jm *ControllerV2) worker() { - for jm.processNextWorkItem() { +func (jm *ControllerV2) worker(ctx context.Context) { + for jm.processNextWorkItem(ctx) { } } -func (jm *ControllerV2) processNextWorkItem() bool { +func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool { key, quit := jm.queue.Get() if quit { return false } defer jm.queue.Done(key) - requeueAfter, err := jm.sync(key.(string)) + requeueAfter, err := jm.sync(ctx, key.(string)) switch { case err != nil: utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err)) @@ -165,7 +166,7 @@ func (jm *ControllerV2) processNextWorkItem() bool { return true } -func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) { +func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) { ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey) if err != nil { return nil, err @@ -187,13 +188,13 @@ func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) { return nil, err } - cronJobCopy, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled) + cronJobCopy, requeueAfter, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled) if err != nil { klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) return nil, err } - err = jm.cleanupFinishedJobs(cronJobCopy, jobsToBeReconciled) + err = jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) if err != nil { klog.V(2).InfoS("Error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err) return nil, err @@ -402,6 +403,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { // It returns a copy of the CronJob that is to be used by other functions // that mutates the object func (jm *ControllerV2) syncCronJob( + ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) { @@ -413,7 +415,7 @@ func (jm *ControllerV2) syncCronJob( childrenJobs[j.ObjectMeta.UID] = true found := inActiveList(*cj, j.ObjectMeta.UID) if !found && !IsJobFinished(j) { - cjCopy, err := jm.cronJobControl.GetCronJob(cj.Namespace, cj.Name) + cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cj.Namespace, cj.Name) if err != nil { return nil, nil, err } @@ -464,7 +466,7 @@ func (jm *ControllerV2) syncCronJob( // the job is missing in the lister but found in api-server } - updatedCJ, err := jm.cronJobControl.UpdateStatus(cj) + updatedCJ, err := jm.cronJobControl.UpdateStatus(ctx, cj) if err != nil { klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) return cj, nil, err @@ -605,7 +607,7 @@ func (jm *ControllerV2) syncCronJob( } cj.Status.Active = append(cj.Status.Active, *jobRef) cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime} - if _, err := jm.cronJobControl.UpdateStatus(cj); err != nil { + if _, err := jm.cronJobControl.UpdateStatus(ctx, cj); err != nil { klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err) } @@ -629,7 +631,7 @@ func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duratio } // cleanupFinishedJobs cleanups finished jobs created by a CronJob -func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1.CronJob, js []*batchv1.Job) error { +func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) error { // If neither limits are active, there is no need to do anything. if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { return nil @@ -660,7 +662,7 @@ func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1.CronJob, js []*batchv1.J } // Update the CronJob, in case jobs were removed from the list. - _, err := jm.cronJobControl.UpdateStatus(cj) + _, err := jm.cronJobControl.UpdateStatus(ctx, cj) return err } diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go index 458992ad4c0..71cd1181642 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2_test.go +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -17,6 +17,7 @@ limitations under the License. package cronjob import ( + "context" "reflect" "strings" "testing" @@ -322,7 +323,7 @@ func TestControllerV2SyncCronJob(t *testing.T) { return tc.now }, } - cjCopy, requeueAfter, err := jm.syncCronJob(&cj, js) + cjCopy, requeueAfter, err := jm.syncCronJob(context.TODO(), &cj, js) if tc.expectErr && err == nil { t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter) } diff --git a/pkg/controller/cronjob/injection.go b/pkg/controller/cronjob/injection.go index 35bd0f3a5c7..56c93451d74 100644 --- a/pkg/controller/cronjob/injection.go +++ b/pkg/controller/cronjob/injection.go @@ -33,9 +33,9 @@ import ( // cjControlInterface is an interface that knows how to update CronJob status // created as an interface to allow testing. type cjControlInterface interface { - UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) + UpdateStatus(ctx context.Context, cj *batchv1.CronJob) (*batchv1.CronJob, error) // GetCronJob retrieves a CronJob. - GetCronJob(namespace, name string) (*batchv1.CronJob, error) + GetCronJob(ctx context.Context, namespace, name string) (*batchv1.CronJob, error) } // realCJControl is the default implementation of cjControlInterface. @@ -43,14 +43,14 @@ type realCJControl struct { KubeClient clientset.Interface } -func (c *realCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, error) { - return c.KubeClient.BatchV1().CronJobs(namespace).Get(context.TODO(), name, metav1.GetOptions{}) +func (c *realCJControl) GetCronJob(ctx context.Context, namespace, name string) (*batchv1.CronJob, error) { + return c.KubeClient.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{}) } var _ cjControlInterface = &realCJControl{} -func (c *realCJControl) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) { - return c.KubeClient.BatchV1().CronJobs(cj.Namespace).UpdateStatus(context.TODO(), cj, metav1.UpdateOptions{}) +func (c *realCJControl) UpdateStatus(ctx context.Context, cj *batchv1.CronJob) (*batchv1.CronJob, error) { + return c.KubeClient.BatchV1().CronJobs(cj.Namespace).UpdateStatus(ctx, cj, metav1.UpdateOptions{}) } // fakeCJControl is the default implementation of cjControlInterface. @@ -59,7 +59,7 @@ type fakeCJControl struct { Updates []batchv1.CronJob } -func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, error) { +func (c *fakeCJControl) GetCronJob(ctx context.Context, namespace, name string) (*batchv1.CronJob, error) { if name == c.CronJob.Name && namespace == c.CronJob.Namespace { return c.CronJob, nil } @@ -71,7 +71,7 @@ func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, er var _ cjControlInterface = &fakeCJControl{} -func (c *fakeCJControl) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) { +func (c *fakeCJControl) UpdateStatus(ctx context.Context, cj *batchv1.CronJob) (*batchv1.CronJob, error) { c.Updates = append(c.Updates, *cj) return cj, nil } diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 5760efe9131..d73f1ea6572 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -933,7 +933,7 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...) // Label new pods using the hash label value of the current history when creating them - if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { + if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { return err } @@ -942,7 +942,7 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, // syncNodes deletes given pods and creates new daemon set pods on the given nodes // returns slice with errors if any -func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { +func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { // We need to set expectations before creating/deleting pods to avoid race conditions. dsKey, err := controller.KeyFunc(ds) if err != nil { @@ -996,7 +996,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) - err := dsc.podControl.CreatePods(ds.Namespace, podTemplate, + err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind)) if err != nil { @@ -1032,7 +1032,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod for i := 0; i < deleteDiff; i++ { go func(ix int) { defer deleteWait.Done() - if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { + if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil { dsc.expectations.DeletionObserved(dsKey) if !apierrors.IsNotFound(err) { klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name) diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index cb7c73d6573..0e873676aeb 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -254,10 +254,10 @@ func newFakePodControl() *fakePodControl { } } -func (f *fakePodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.CreatePods(namespace, template, object, controllerRef); err != nil { + if err := f.FakePodControl.CreatePods(ctx, namespace, template, object, controllerRef); err != nil { return fmt.Errorf("failed to create pod for DaemonSet") } @@ -282,10 +282,10 @@ func (f *fakePodControl) CreatePods(namespace string, template *v1.PodTemplateSp return nil } -func (f *fakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { +func (f *fakePodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.DeletePod(namespace, podID, object); err != nil { + if err := f.FakePodControl.DeletePod(ctx, namespace, podID, object); err != nil { return fmt.Errorf("failed to delete pod %q", podID) } pod, ok := f.podIDMap[podID] diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index dd487d99bbb..4f5bcafbfc5 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -123,7 +123,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae } oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) - return dsc.syncNodes(ds, oldPodsToDelete, nil, hash) + return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash) } // When surging, we create new pods whenever an old pod is unavailable, and we can create up @@ -201,7 +201,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae } newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...) - return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash) + return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash) } // findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index a309abef23a..424350b6ae1 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -85,10 +85,9 @@ type Controller struct { podControl controller.PodControlInterface // To allow injection of the following for testing. - updateStatusHandler func(job *batch.Job) (*batch.Job, error) - patchJobHandler func(job *batch.Job, patch []byte) error - syncHandler func(jobKey string) (bool, error) - + updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error) + patchJobHandler func(ctx context.Context, job *batch.Job, patch []byte) error + syncHandler func(ctx context.Context, jobKey string) (bool, error) // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced @@ -179,7 +178,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor } // Run the main goroutine responsible for watching and syncing jobs. -func (jm *Controller) Run(workers int, stopCh <-chan struct{}) { +func (jm *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() defer jm.orphanQueue.ShutDown() @@ -187,17 +186,17 @@ func (jm *Controller) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting job controller") defer klog.Infof("Shutting down job controller") - if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { + if !cache.WaitForNamedCacheSync("job", ctx.Done(), jm.podStoreSynced, jm.jobStoreSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(jm.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, jm.worker, time.Second) } - go wait.Until(jm.orphanWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, jm.orphanWorker, time.Second) - <-stopCh + <-ctx.Done() } // getPodJobs returns a list of Jobs that potentially match a Pod. @@ -466,19 +465,19 @@ func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (jm *Controller) worker() { - for jm.processNextWorkItem() { +func (jm *Controller) worker(ctx context.Context) { + for jm.processNextWorkItem(ctx) { } } -func (jm *Controller) processNextWorkItem() bool { +func (jm *Controller) processNextWorkItem(ctx context.Context) bool { key, quit := jm.queue.Get() if quit { return false } defer jm.queue.Done(key) - forget, err := jm.syncHandler(key.(string)) + forget, err := jm.syncHandler(ctx, key.(string)) if err == nil { if forget { jm.queue.Forget(key) @@ -497,18 +496,18 @@ func (jm *Controller) processNextWorkItem() bool { return true } -func (jm *Controller) orphanWorker() { - for jm.processNextOrphanPod() { +func (jm *Controller) orphanWorker(ctx context.Context) { + for jm.processNextOrphanPod(ctx) { } } -func (jm Controller) processNextOrphanPod() bool { +func (jm Controller) processNextOrphanPod(ctx context.Context) bool { key, quit := jm.orphanQueue.Get() if quit { return false } defer jm.orphanQueue.Done(key) - err := jm.syncOrphanPod(key.(string)) + err := jm.syncOrphanPod(ctx, key.(string)) if err != nil { utilruntime.HandleError(fmt.Errorf("Error syncing orphan pod: %v", err)) jm.orphanQueue.AddRateLimited(key) @@ -520,7 +519,7 @@ func (jm Controller) processNextOrphanPod() bool { } // syncOrphanPod removes the tracking finalizer from an orphan pod if found. -func (jm Controller) syncOrphanPod(key string) error { +func (jm Controller) syncOrphanPod(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, time.Since(startTime)) @@ -540,7 +539,7 @@ func (jm Controller) syncOrphanPod(key string) error { return err } if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil { - if err := jm.podControl.PatchPod(ns, name, patch); err != nil && !apierrors.IsNotFound(err) { + if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) { return err } } @@ -551,7 +550,7 @@ func (jm Controller) syncOrphanPod(key string) error { // It also reconciles ControllerRef by adopting/orphaning, adding tracking // finalizers, if enabled. // Note that the returned Pods are pointers into the cache. -func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) { +func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) { selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) if err != nil { return nil, fmt.Errorf("couldn't convert Job selector: %v", err) @@ -565,7 +564,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po // If any adoptions are attempted, we should first recheck for deletion // with an uncached quorum read sometime after listing Pods (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { - fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{}) + fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -580,7 +579,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po } cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...) // When adopting Pods, this operation adds an ownerRef and finalizers. - pods, err = cm.ClaimPods(context.TODO(), pods) + pods, err = cm.ClaimPods(ctx, pods) if err != nil || !withFinalizers { return pods, err } @@ -604,7 +603,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *Controller) syncJob(key string) (forget bool, rErr error) { +func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rErr error) { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) @@ -671,7 +670,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) } else if patch := removeTrackingAnnotationPatch(&job); patch != nil { - if err := jm.patchJobHandler(&job, patch); err != nil { + if err := jm.patchJobHandler(ctx, &job, patch); err != nil { return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err) } } @@ -681,7 +680,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { // the store after we've checked the expectation, the job sync is just deferred till the next relist. jobNeedsSync := jm.expectations.SatisfiedExpectations(key) - pods, err := jm.getPodsForJob(&job, uncounted != nil) + pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil) if err != nil { return false, err } @@ -732,7 +731,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { suspendCondChanged := false // Remove active pods if Job failed. if finishedCondition != nil { - deleted, err := jm.deleteActivePods(&job, activePods) + deleted, err := jm.deleteActivePods(ctx, &job, activePods) if uncounted == nil { // Legacy behavior: pretend all active pods were successfully removed. deleted = active @@ -747,7 +746,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { } else { manageJobCalled := false if jobNeedsSync && job.DeletionTimestamp == nil { - active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes) + active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes) manageJobCalled = true } complete := false @@ -810,7 +809,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) job.Status.Active = active job.Status.Ready = ready - err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) + err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) if err != nil { return false, fmt.Errorf("tracking status: %w", err) } @@ -825,7 +824,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { // Legacy path: tracking without finalizers. // Ensure that there are no leftover tracking finalizers. - if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil { + if err := jm.removeTrackingFinalizersFromAllPods(ctx, pods); err != nil { return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err) } @@ -841,7 +840,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { job.Status.UncountedTerminatedPods = nil jm.enactJobFinished(&job, finishedCondition) - if _, err := jm.updateStatusHandler(&job); err != nil { + if _, err := jm.updateStatusHandler(ctx, &job); err != nil { return forget, err } @@ -861,7 +860,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after // which the objects can actually be deleted. // Returns number of successfully deletions issued. -func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, error) { +func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) { errCh := make(chan error, len(pods)) successfulDeletes := int32(len(pods)) wg := sync.WaitGroup{} @@ -869,7 +868,7 @@ func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, e for i := range pods { go func(pod *v1.Pod) { defer wg.Done() - if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) { + if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) { atomic.AddInt32(&successfulDeletes, -1) errCh <- err utilruntime.HandleError(err) @@ -882,7 +881,7 @@ func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, e // deleteJobPods deletes the pods, returns the number of successful removals // and any error. -func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { +func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { errCh := make(chan error, len(pods)) successfulDeletes := int32(len(pods)) @@ -903,12 +902,12 @@ func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Po go func(pod *v1.Pod) { defer wg.Done() if patch := removeTrackingFinalizerPatch(pod); patch != nil { - if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil { + if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil { failDelete(pod, fmt.Errorf("removing completion finalizer: %w", err)) return } } - if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil { + if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil { failDelete(pod, err) } }(pods[i]) @@ -919,7 +918,7 @@ func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Po // removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called // when Job tracking with finalizers is disabled. -func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error { +func (jm *Controller) removeTrackingFinalizersFromAllPods(ctx context.Context, pods []*v1.Pod) error { var podsWithFinalizer []*v1.Pod for _, pod := range pods { if hasJobTrackingFinalizer(pod) { @@ -930,7 +929,7 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error return nil } // Tracking with finalizers is disabled, no need to set expectations. - _, err := jm.removeTrackingFinalizerFromPods("", podsWithFinalizer) + _, err := jm.removeTrackingFinalizerFromPods(ctx, "", podsWithFinalizer) return err } @@ -942,7 +941,7 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // 4. Add Complete condition if satisfied with current counters. // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error { +func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -1014,14 +1013,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { needsFlush = true } if needsFlush { - if _, err := jm.updateStatusHandler(job); err != nil { + if _, err := jm.updateStatusHandler(ctx, job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } recordJobPodFinished(job, oldCounters) @@ -1038,10 +1037,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { var err error if needsFlush { - if job, err = jm.updateStatusHandler(job); err != nil { + if job, err = jm.updateStatusHandler(ctx, job); err != nil { return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } recordJobPodFinished(job, *oldCounters) @@ -1056,7 +1055,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe var rmErr error if len(podsToRemoveFinalizer) > 0 { var rmSucceded []bool - rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(jobKey, podsToRemoveFinalizer) + rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer) for i, p := range podsToRemoveFinalizer { if rmSucceded[i] { uidsWithFinalizer.Delete(string(p.UID)) @@ -1069,7 +1068,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe needsFlush = true } if rmErr != nil && needsFlush { - if job, err := jm.updateStatusHandler(job); err != nil { + if job, err := jm.updateStatusHandler(ctx, job); err != nil { return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) } } @@ -1102,7 +1101,7 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali // returns an array of booleans where the i-th value is true if the finalizer // of the i-th Pod was successfully removed (if the pod was deleted when this // function was called, it's considered as the finalizer was removed successfully). -func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1.Pod) ([]bool, error) { +func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) { errCh := make(chan error, len(pods)) succeeded := make([]bool, len(pods)) uids := make([]string, len(pods)) @@ -1122,7 +1121,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1. pod := pods[i] defer wg.Done() if patch := removeTrackingFinalizerPatch(pod); patch != nil { - if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil { + if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil { // In case of any failure, we don't expect a Pod update for the // finalizer removed. Clear expectation now. if jobKey != "" { @@ -1264,7 +1263,7 @@ func jobSuspended(job *batch.Job) bool { // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. // Does NOT modify . -func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) { +func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) { active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -1277,7 +1276,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) podsToDelete := activePodsForRemoval(job, activePods, int(active)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) - removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) + removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed return active, metrics.JobSyncActionPodsDeleted, err } @@ -1315,7 +1314,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) - removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) + removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed // While it is possible for a Job to require both pod creations and // deletions at the same time (e.g. indexed Jobs with repeated indexes), we @@ -1378,7 +1377,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded generateName = podGenerateNameWithIndex(job.Name, completionIndex) } defer wait.Done() - err := jm.podControl.CreatePodsWithGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName) + err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName) if err != nil { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // If the namespace is being torn down, we can safely ignore @@ -1443,13 +1442,13 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P } // updateJobStatus calls the API to update the job status. -func (jm *Controller) updateJobStatus(job *batch.Job) (*batch.Job, error) { - return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) +func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) { + return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{}) } -func (jm *Controller) patchJob(job *batch.Job, data []byte) error { +func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error { _, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch( - context.TODO(), job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) + ctx, job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) return err } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 52274b1f293..45ab34887c2 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -775,13 +775,13 @@ func TestControllerSyncJob(t *testing.T) { setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) actual := job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } // run - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) // We need requeue syncJob task if podController error if tc.podControllerError != nil { @@ -1016,20 +1016,20 @@ func TestSyncJobLegacyTracking(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady jobPatches := 0 - manager.patchJobHandler = func(*batch.Job, []byte) error { + manager.patchJobHandler = func(context.Context, *batch.Job, []byte) error { jobPatches++ return nil } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job) var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } // Run. - _, err := manager.syncJob(testutil.GetKey(&tc.job, t)) + _, err := manager.syncJob(context.TODO(), testutil.GetKey(&tc.job, t)) if err != nil { t.Fatalf("Syncing job: %v", err) } @@ -1629,7 +1629,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) return job, tc.statusUpdateErr } @@ -1639,7 +1639,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) succeededIndexes := succeededIndexesFromJob(job) - err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) + err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } @@ -1775,7 +1775,7 @@ func TestSyncJobPastDeadline(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } @@ -1791,7 +1791,7 @@ func TestSyncJobPastDeadline(t *testing.T) { setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0) // run - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } @@ -1852,7 +1852,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } @@ -1864,7 +1864,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { job.Status.StartTime = &start job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } @@ -1893,7 +1893,7 @@ func TestSyncJobComplete(t *testing.T) { job := newJob(1, 1, 6, batch.NonIndexedCompletion) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Fatalf("Unexpected error when syncing jobs %v", err) } @@ -1917,11 +1917,11 @@ func TestSyncJobDeleted(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } job := newJob(2, 2, 6, batch.NonIndexedCompletion) - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } @@ -1965,13 +1965,13 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, tc.updateErr } job := newJob(2, 2, 6, batch.NonIndexedCompletion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) manager.queue.Add(testutil.GetKey(job, t)) - manager.processNextWorkItem() + manager.processNextWorkItem(context.TODO()) // With DefaultJobBackOff=0, the queueing is synchronous. requeued := manager.queue.Len() > 0 if requeued != tc.wantRequeue { @@ -2148,7 +2148,7 @@ func TestGetPodsForJob(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(p) } - pods, err := jm.getPodsForJob(job, wFinalizers) + pods, err := jm.getPodsForJob(context.TODO(), job, wFinalizers) if err != nil { t.Fatalf("getPodsForJob() error: %v", err) } @@ -2468,7 +2468,7 @@ func TestSyncJobExpectations(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } @@ -2486,7 +2486,7 @@ func TestSyncJobExpectations(t *testing.T) { podIndexer.Add(pods[1]) }, } - manager.syncJob(testutil.GetKey(job, t)) + manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } @@ -2508,7 +2508,7 @@ func TestWatchJobs(t *testing.T) { // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. - manager.syncHandler = func(key string) (bool, error) { + manager.syncHandler = func(ctx context.Context, key string) (bool, error) { defer close(received) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -2529,7 +2529,7 @@ func TestWatchJobs(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) sharedInformerFactory.Start(stopCh) - go manager.Run(1, stopCh) + go manager.Run(context.TODO(), 1) // We're sending new job to see if it reaches syncHandler. testJob.Namespace = "bar" @@ -2553,7 +2553,7 @@ func TestWatchPods(t *testing.T) { received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. - manager.syncHandler = func(key string) (bool, error) { + manager.syncHandler = func(ctx context.Context, key string) (bool, error) { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error getting namespace/name from key %v: %v", key, err) @@ -2575,7 +2575,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh) - go manager.Run(1, stopCh) + go manager.Run(context.TODO(), 1) pods := newPodList(1, v1.PodRunning, testJob) testPod := pods[0] @@ -2594,7 +2594,7 @@ func TestWatchOrphanPods(t *testing.T) { manager.jobStoreSynced = alwaysReady jobSynced := false - manager.syncHandler = func(jobKey string) (bool, error) { + manager.syncHandler = func(ctx context.Context, jobKey string) (bool, error) { jobSynced = true return true, nil } @@ -2605,7 +2605,7 @@ func TestWatchOrphanPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) - go manager.Run(1, stopCh) + go manager.Run(context.TODO(), 1) orphanPod := buildPod().name("a").job(testJob).deletionTimestamp().trackingFinalizer().Pod orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) @@ -2674,7 +2674,7 @@ func TestJobBackoffReset(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } @@ -2687,7 +2687,7 @@ func TestJobBackoffReset(t *testing.T) { setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0) manager.queue.Add(key) - manager.processNextWorkItem() + manager.processNextWorkItem(context.TODO()) retries := manager.queue.NumRequeues(key) if retries != 1 { t.Errorf("%s: expected exactly 1 retry, got %d", name, retries) @@ -2696,7 +2696,7 @@ func TestJobBackoffReset(t *testing.T) { job = actual sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion) setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0) - manager.processNextWorkItem() + manager.processNextWorkItem(context.TODO()) retries = manager.queue.NumRequeues(key) if retries != 0 { t.Errorf("%s: expected exactly 0 retries, got %d", name, retries) @@ -2854,7 +2854,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } @@ -2870,7 +2870,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { } // run - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if err != nil { t.Errorf("unexpected error syncing job. Got %#v", err) @@ -2956,7 +2956,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady var actual *batch.Job - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { actual = job return job, nil } @@ -2974,7 +2974,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { } // run - forget, err := manager.syncJob(testutil.GetKey(job, t)) + forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) if (err != nil) != tc.isExpectingAnError { t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError) @@ -3094,7 +3094,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} - manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { return job, nil } @@ -3114,7 +3114,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { } jobKey := testutil.GetKey(job, t) - manager.syncJob(jobKey) + manager.syncJob(context.TODO(), jobKey) gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) if len(gotExpectedUIDs) != 0 { t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List()) @@ -3122,7 +3122,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { // Remove failures and re-sync. manager.podControl.(*controller.FakePodControl).Err = nil - manager.syncJob(jobKey) + manager.syncJob(context.TODO(), jobKey) gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 64df1354269..2a1208ee000 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -542,7 +542,7 @@ func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool { // manageReplicas checks and updates replicas for the given ReplicaSet. // Does NOT modify . // It will requeue the replica set in case of an error while creating/deleting pods. -func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { +func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { @@ -570,7 +570,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { - err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) + err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) if err != nil { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // if the namespace is being terminated, we don't have to do @@ -618,7 +618,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps for _, pod := range podsToDelete { go func(targetPod *v1.Pod) { defer wg.Done() - if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil { + if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(targetPod) rsc.expectations.DeletionObserved(rsKey, podKey) @@ -693,7 +693,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { - manageReplicasErr = rsc.manageReplicas(filteredPods, rs) + manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs) } rs = rs.DeepCopy() newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 78128e89158..57eb2555468 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -337,18 +337,18 @@ type podControlAdapter struct { controller.PodControlInterface } -func (pc podControlAdapter) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (pc podControlAdapter) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { rc, err := convertRStoRC(object.(*apps.ReplicaSet)) if err != nil { return err } - return pc.PodControlInterface.CreatePods(namespace, template, rc, controllerRef) + return pc.PodControlInterface.CreatePods(ctx, namespace, template, rc, controllerRef) } -func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error { +func (pc podControlAdapter) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error { rc, err := convertRStoRC(object.(*apps.ReplicaSet)) if err != nil { return err } - return pc.PodControlInterface.DeletePod(namespace, podID, rc) + return pc.PodControlInterface.DeletePod(ctx, namespace, podID, rc) } diff --git a/test/integration/cronjob/cronjob_test.go b/test/integration/cronjob/cronjob_test.go index 4f663288dde..f793ba5fefb 100644 --- a/test/integration/cronjob/cronjob_test.go +++ b/test/integration/cronjob/cronjob_test.go @@ -159,8 +159,8 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) { defer close(stopCh) informerSet.Start(stopCh) - go cjc.Run(1, stopCh) - go jc.Run(1, stopCh) + go cjc.Run(context.TODO(), 1) + go jc.Run(context.TODO(), 1) _, err := cjClient.Create(context.TODO(), newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{}) if err != nil { diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index b614c8c28ba..659802f421f 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1101,7 +1101,7 @@ func startJobController(restConfig *restclient.Config, clientSet clientset.Inter informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod) jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) informerSet.Start(ctx.Done()) - go jc.Run(1, ctx.Done()) + go jc.Run(ctx, 1) return ctx, cancel }