Wire contexts to Batch controllers (#105491)

* Wire contexts to Batch controllers

* (hold) feedback + updates that overlap with Apps controllers

* fixup errors
This commit is contained in:
Mike Dame 2021-11-10 17:56:46 -05:00 committed by GitHub
parent 82379431df
commit 80c01707e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 186 additions and 182 deletions

View File

@ -34,11 +34,12 @@ func startJobController(ctx context.Context, controllerContext ControllerContext
controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.ClientBuilder.ClientOrDie("job-controller"), controllerContext.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Done()) ).Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs))
return nil, true, nil return nil, true, nil
} }
func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(), cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.InformerFactory.Batch().V1().CronJobs(), controllerContext.InformerFactory.Batch().V1().CronJobs(),
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
@ -46,6 +47,7 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon
if err != nil { if err != nil {
return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err)
} }
go cj2c.Run(int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Done())
go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs))
return nil, true, nil return nil, true, nil
} }

View File

@ -66,7 +66,7 @@ func (m *BaseControllerRefManager) CanAdopt(ctx context.Context) error {
// own the object. // own the object.
// //
// No reconciliation will be attempted if the controller is being deleted. // No reconciliation will be attempted if the controller is being deleted.
func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt func(context.Context, metav1.Object) error, release func(metav1.Object) error) (bool, error) { func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt, release func(context.Context, metav1.Object) error) (bool, error) {
controllerRef := metav1.GetControllerOfNoCopy(obj) controllerRef := metav1.GetControllerOfNoCopy(obj)
if controllerRef != nil { if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() { if controllerRef.UID != m.Controller.GetUID() {
@ -85,7 +85,7 @@ func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.O
if m.Controller.GetDeletionTimestamp() != nil { if m.Controller.GetDeletionTimestamp() != nil {
return false, nil return false, nil
} }
if err := release(obj); err != nil { if err := release(ctx, obj); err != nil {
// If the pod no longer exists, ignore the error. // If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
return false, nil return false, nil
@ -200,8 +200,8 @@ func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod,
adopt := func(ctx context.Context, obj metav1.Object) error { adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptPod(ctx, obj.(*v1.Pod)) return m.AdoptPod(ctx, obj.(*v1.Pod))
} }
release := func(obj metav1.Object) error { release := func(ctx context.Context, obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod)) return m.ReleasePod(ctx, obj.(*v1.Pod))
} }
for _, pod := range pods { for _, pod := range pods {
@ -230,19 +230,19 @@ func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) err
if err != nil { if err != nil {
return err return err
} }
return m.podControl.PatchPod(pod.Namespace, pod.Name, patchBytes) return m.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patchBytes)
} }
// ReleasePod sends a patch to free the pod from the control of the controller. // ReleasePod sends a patch to free the pod from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error {
klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...)
if err != nil { if err != nil {
return err return err
} }
err = m.podControl.PatchPod(pod.Namespace, pod.Name, patchBytes) err = m.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patchBytes)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// If the pod no longer exists, ignore it. // If the pod no longer exists, ignore it.
@ -326,8 +326,8 @@ func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(ctx context.Context, s
adopt := func(ctx context.Context, obj metav1.Object) error { adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet)) return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet))
} }
release := func(obj metav1.Object) error { release := func(ctx context.Context, obj metav1.Object) error {
return m.ReleaseReplicaSet(obj.(*apps.ReplicaSet)) return m.ReleaseReplicaSet(ctx, obj.(*apps.ReplicaSet))
} }
for _, rs := range sets { for _, rs := range sets {
@ -355,19 +355,19 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs
if err != nil { if err != nil {
return err return err
} }
return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, patchBytes) return m.rsControl.PatchReplicaSet(ctx, rs.Namespace, rs.Name, patchBytes)
} }
// ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller. // ReleaseReplicaSet sends a patch to free the ReplicaSet from the control of the Deployment controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.ReplicaSet) error { func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(ctx context.Context, replicaSet *apps.ReplicaSet) error {
klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s", klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s",
replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()}) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()})
if err != nil { if err != nil {
return err return err
} }
err = m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, patchBytes) err = m.rsControl.PatchReplicaSet(ctx, replicaSet.Namespace, replicaSet.Name, patchBytes)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// If the ReplicaSet no longer exists, ignore it. // If the ReplicaSet no longer exists, ignore it.
@ -464,8 +464,8 @@ func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(ctx co
adopt := func(ctx context.Context, obj metav1.Object) error { adopt := func(ctx context.Context, obj metav1.Object) error {
return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision)) return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision))
} }
release := func(obj metav1.Object) error { release := func(ctx context.Context, obj metav1.Object) error {
return m.ReleaseControllerRevision(obj.(*apps.ControllerRevision)) return m.ReleaseControllerRevision(ctx, obj.(*apps.ControllerRevision))
} }
for _, h := range histories { for _, h := range histories {
@ -493,12 +493,12 @@ func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx con
if err != nil { if err != nil {
return err return err
} }
return m.crControl.PatchControllerRevision(history.Namespace, history.Name, patchBytes) return m.crControl.PatchControllerRevision(ctx, history.Namespace, history.Name, patchBytes)
} }
// ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller. // ReleaseControllerRevision sends a patch to free the ControllerRevision from the control of its controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored. // It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(history *apps.ControllerRevision) error { func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(ctx context.Context, history *apps.ControllerRevision) error {
klog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s", klog.V(2).Infof("patching ControllerRevision %s_%s to remove its controllerRef to %s/%s:%s",
history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) history.Namespace, history.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()}) patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()})
@ -506,7 +506,7 @@ func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(histo
return err return err
} }
err = m.crControl.PatchControllerRevision(history.Namespace, history.Name, patchBytes) err = m.crControl.PatchControllerRevision(ctx, history.Namespace, history.Name, patchBytes)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
// If the ControllerRevision no longer exists, ignore it. // If the ControllerRevision no longer exists, ignore it.

View File

@ -406,7 +406,7 @@ const (
// ReplicaSets, as well as increment or decrement them. It is used // ReplicaSets, as well as increment or decrement them. It is used
// by the deployment controller to ease testing of actions that it takes. // by the deployment controller to ease testing of actions that it takes.
type RSControlInterface interface { type RSControlInterface interface {
PatchReplicaSet(namespace, name string, data []byte) error PatchReplicaSet(ctx context.Context, namespace, name string, data []byte) error
} }
// RealRSControl is the default implementation of RSControllerInterface. // RealRSControl is the default implementation of RSControllerInterface.
@ -417,8 +417,8 @@ type RealRSControl struct {
var _ RSControlInterface = &RealRSControl{} var _ RSControlInterface = &RealRSControl{}
func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error { func (r RealRSControl) PatchReplicaSet(ctx context.Context, namespace, name string, data []byte) error {
_, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) _, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err return err
} }
@ -427,7 +427,7 @@ func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) erro
// ControllerRevisions, as well as increment or decrement them. It is used // ControllerRevisions, as well as increment or decrement them. It is used
// by the daemonset controller to ease testing of actions that it takes. // by the daemonset controller to ease testing of actions that it takes.
type ControllerRevisionControlInterface interface { type ControllerRevisionControlInterface interface {
PatchControllerRevision(namespace, name string, data []byte) error PatchControllerRevision(ctx context.Context, namespace, name string, data []byte) error
} }
// RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface. // RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface.
@ -437,8 +437,8 @@ type RealControllerRevisionControl struct {
var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{} var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{}
func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error { func (r RealControllerRevisionControl) PatchControllerRevision(ctx context.Context, namespace, name string, data []byte) error {
_, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) _, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err return err
} }
@ -446,13 +446,13 @@ func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name s
// created as an interface to allow testing. // created as an interface to allow testing.
type PodControlInterface interface { type PodControlInterface interface {
// CreatePods creates new pods according to the spec, and sets object as the pod's controller. // CreatePods creates new pods according to the spec, and sets object as the pod's controller.
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName. // CreatePodsWithGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName.
CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error
// DeletePod deletes the pod identified by podID. // DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod. // PatchPod patches the pod.
PatchPod(namespace, name string, data []byte) error PatchPod(ctx context.Context, namespace, name string, data []byte) error
} }
// RealPodControl is the default implementation of PodControlInterface. // RealPodControl is the default implementation of PodControlInterface.
@ -513,11 +513,11 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error {
return nil return nil
} }
func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
return r.CreatePodsWithGenerateName(namespace, template, controllerObject, controllerRef, "") return r.CreatePodsWithGenerateName(ctx, namespace, template, controllerObject, controllerRef, "")
} }
func (r RealPodControl) CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error { func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error {
if err := validateControllerRef(controllerRef); err != nil { if err := validateControllerRef(controllerRef); err != nil {
return err return err
} }
@ -528,11 +528,11 @@ func (r RealPodControl) CreatePodsWithGenerateName(namespace string, template *v
if len(generateName) > 0 { if len(generateName) > 0 {
pod.ObjectMeta.GenerateName = generateName pod.ObjectMeta.GenerateName = generateName
} }
return r.createPods(namespace, pod, controllerObject) return r.createPods(ctx, namespace, pod, controllerObject)
} }
func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { func (r RealPodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error {
_, err := r.KubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err return err
} }
@ -561,11 +561,11 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec
return pod, nil return pod, nil
} }
func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime.Object) error { func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error {
if len(labels.Set(pod.Labels)) == 0 { if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels") return fmt.Errorf("unable to create pods, no labels")
} }
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil { if err != nil {
// only send an event if the namespace isn't terminating // only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
@ -584,13 +584,13 @@ func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime
return nil return nil
} }
func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error { func (r RealPodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error {
accessor, err := meta.Accessor(object) accessor, err := meta.Accessor(object)
if err != nil { if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err) return fmt.Errorf("object does not have ObjectMeta, %v", err)
} }
klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID))
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), podID, metav1.DeleteOptions{}); err != nil { if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID) klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID)
return err return err
@ -616,7 +616,7 @@ type FakePodControl struct {
var _ PodControlInterface = &FakePodControl{} var _ PodControlInterface = &FakePodControl{}
func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { func (f *FakePodControl) PatchPod(ctx context.Context, namespace, name string, data []byte) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
f.Patches = append(f.Patches, data) f.Patches = append(f.Patches, data)
@ -626,11 +626,11 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
return nil return nil
} }
func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { func (f *FakePodControl) CreatePods(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
return f.CreatePodsWithGenerateName(namespace, spec, object, controllerRef, "") return f.CreatePodsWithGenerateName(ctx, namespace, spec, object, controllerRef, "")
} }
func (f *FakePodControl) CreatePodsWithGenerateName(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error { func (f *FakePodControl) CreatePodsWithGenerateName(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
f.CreateCallCount++ f.CreateCallCount++
@ -646,7 +646,7 @@ func (f *FakePodControl) CreatePodsWithGenerateName(namespace string, spec *v1.P
return nil return nil
} }
func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { func (f *FakePodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
f.DeletePodName = append(f.DeletePodName, podID) f.DeletePodName = append(f.DeletePodName, podID)

View File

@ -303,7 +303,7 @@ func TestCreatePods(t *testing.T) {
controllerRef := metav1.NewControllerRef(controllerSpec, v1.SchemeGroupVersion.WithKind("ReplicationController")) controllerRef := metav1.NewControllerRef(controllerSpec, v1.SchemeGroupVersion.WithKind("ReplicationController"))
// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec, controllerRef) err := podControl.CreatePods(context.TODO(), ns, controllerSpec.Spec.Template, controllerSpec, controllerRef)
assert.NoError(t, err, "unexpected error: %v", err) assert.NoError(t, err, "unexpected error: %v", err)
expectedPod := v1.Pod{ expectedPod := v1.Pod{
@ -342,7 +342,7 @@ func TestCreatePodsWithGenerateName(t *testing.T) {
// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
generateName := "hello-" generateName := "hello-"
err := podControl.CreatePodsWithGenerateName(ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName) err := podControl.CreatePodsWithGenerateName(context.TODO(), ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName)
assert.NoError(t, err, "unexpected error: %v", err) assert.NoError(t, err, "unexpected error: %v", err)
expectedPod := v1.Pod{ expectedPod := v1.Pod{
@ -371,7 +371,7 @@ func TestDeletePodsAllowsMissing(t *testing.T) {
controllerSpec := newReplicationController(1) controllerSpec := newReplicationController(1)
err := podControl.DeletePod("namespace-name", "podName", controllerSpec) err := podControl.DeletePod(context.TODO(), "namespace-name", "podName", controllerSpec)
assert.True(t, apierrors.IsNotFound(err)) assert.True(t, apierrors.IsNotFound(err))
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package cronjob package cronjob
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
@ -123,37 +124,37 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer
} }
// Run starts the main goroutine responsible for watching and syncing jobs. // Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) { func (jm *ControllerV2) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
klog.InfoS("Starting cronjob controller v2") klog.InfoS("Starting cronjob controller v2")
defer klog.InfoS("Shutting down cronjob controller v2") defer klog.InfoS("Shutting down cronjob controller v2")
if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) { if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) {
return return
} }
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh) go wait.UntilWithContext(ctx, jm.worker, time.Second)
} }
<-stopCh <-ctx.Done()
} }
func (jm *ControllerV2) worker() { func (jm *ControllerV2) worker(ctx context.Context) {
for jm.processNextWorkItem() { for jm.processNextWorkItem(ctx) {
} }
} }
func (jm *ControllerV2) processNextWorkItem() bool { func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
key, quit := jm.queue.Get() key, quit := jm.queue.Get()
if quit { if quit {
return false return false
} }
defer jm.queue.Done(key) defer jm.queue.Done(key)
requeueAfter, err := jm.sync(key.(string)) requeueAfter, err := jm.sync(ctx, key.(string))
switch { switch {
case err != nil: case err != nil:
utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err)) utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
@ -165,7 +166,7 @@ func (jm *ControllerV2) processNextWorkItem() bool {
return true return true
} }
func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) { func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) {
ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey) ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
if err != nil { if err != nil {
return nil, err return nil, err
@ -187,13 +188,13 @@ func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) {
return nil, err return nil, err
} }
cronJobCopy, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled) cronJobCopy, requeueAfter, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
if err != nil { if err != nil {
klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
return nil, err return nil, err
} }
err = jm.cleanupFinishedJobs(cronJobCopy, jobsToBeReconciled) err = jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
if err != nil { if err != nil {
klog.V(2).InfoS("Error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err) klog.V(2).InfoS("Error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err)
return nil, err return nil, err
@ -402,6 +403,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
// It returns a copy of the CronJob that is to be used by other functions // It returns a copy of the CronJob that is to be used by other functions
// that mutates the object // that mutates the object
func (jm *ControllerV2) syncCronJob( func (jm *ControllerV2) syncCronJob(
ctx context.Context,
cj *batchv1.CronJob, cj *batchv1.CronJob,
js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) { js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) {
@ -413,7 +415,7 @@ func (jm *ControllerV2) syncCronJob(
childrenJobs[j.ObjectMeta.UID] = true childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*cj, j.ObjectMeta.UID) found := inActiveList(*cj, j.ObjectMeta.UID)
if !found && !IsJobFinished(j) { if !found && !IsJobFinished(j) {
cjCopy, err := jm.cronJobControl.GetCronJob(cj.Namespace, cj.Name) cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cj.Namespace, cj.Name)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -464,7 +466,7 @@ func (jm *ControllerV2) syncCronJob(
// the job is missing in the lister but found in api-server // the job is missing in the lister but found in api-server
} }
updatedCJ, err := jm.cronJobControl.UpdateStatus(cj) updatedCJ, err := jm.cronJobControl.UpdateStatus(ctx, cj)
if err != nil { if err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
return cj, nil, err return cj, nil, err
@ -605,7 +607,7 @@ func (jm *ControllerV2) syncCronJob(
} }
cj.Status.Active = append(cj.Status.Active, *jobRef) cj.Status.Active = append(cj.Status.Active, *jobRef)
cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime} cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
if _, err := jm.cronJobControl.UpdateStatus(cj); err != nil { if _, err := jm.cronJobControl.UpdateStatus(ctx, cj); err != nil {
klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err) klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err) return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err)
} }
@ -629,7 +631,7 @@ func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duratio
} }
// cleanupFinishedJobs cleanups finished jobs created by a CronJob // cleanupFinishedJobs cleanups finished jobs created by a CronJob
func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1.CronJob, js []*batchv1.Job) error { func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) error {
// If neither limits are active, there is no need to do anything. // If neither limits are active, there is no need to do anything.
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
return nil return nil
@ -660,7 +662,7 @@ func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1.CronJob, js []*batchv1.J
} }
// Update the CronJob, in case jobs were removed from the list. // Update the CronJob, in case jobs were removed from the list.
_, err := jm.cronJobControl.UpdateStatus(cj) _, err := jm.cronJobControl.UpdateStatus(ctx, cj)
return err return err
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package cronjob package cronjob
import ( import (
"context"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
@ -322,7 +323,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
return tc.now return tc.now
}, },
} }
cjCopy, requeueAfter, err := jm.syncCronJob(&cj, js) cjCopy, requeueAfter, err := jm.syncCronJob(context.TODO(), &cj, js)
if tc.expectErr && err == nil { if tc.expectErr && err == nil {
t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter) t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter)
} }

View File

@ -33,9 +33,9 @@ import (
// cjControlInterface is an interface that knows how to update CronJob status // cjControlInterface is an interface that knows how to update CronJob status
// created as an interface to allow testing. // created as an interface to allow testing.
type cjControlInterface interface { type cjControlInterface interface {
UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) UpdateStatus(ctx context.Context, cj *batchv1.CronJob) (*batchv1.CronJob, error)
// GetCronJob retrieves a CronJob. // GetCronJob retrieves a CronJob.
GetCronJob(namespace, name string) (*batchv1.CronJob, error) GetCronJob(ctx context.Context, namespace, name string) (*batchv1.CronJob, error)
} }
// realCJControl is the default implementation of cjControlInterface. // realCJControl is the default implementation of cjControlInterface.
@ -43,14 +43,14 @@ type realCJControl struct {
KubeClient clientset.Interface KubeClient clientset.Interface
} }
func (c *realCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, error) { func (c *realCJControl) GetCronJob(ctx context.Context, namespace, name string) (*batchv1.CronJob, error) {
return c.KubeClient.BatchV1().CronJobs(namespace).Get(context.TODO(), name, metav1.GetOptions{}) return c.KubeClient.BatchV1().CronJobs(namespace).Get(ctx, name, metav1.GetOptions{})
} }
var _ cjControlInterface = &realCJControl{} var _ cjControlInterface = &realCJControl{}
func (c *realCJControl) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) { func (c *realCJControl) UpdateStatus(ctx context.Context, cj *batchv1.CronJob) (*batchv1.CronJob, error) {
return c.KubeClient.BatchV1().CronJobs(cj.Namespace).UpdateStatus(context.TODO(), cj, metav1.UpdateOptions{}) return c.KubeClient.BatchV1().CronJobs(cj.Namespace).UpdateStatus(ctx, cj, metav1.UpdateOptions{})
} }
// fakeCJControl is the default implementation of cjControlInterface. // fakeCJControl is the default implementation of cjControlInterface.
@ -59,7 +59,7 @@ type fakeCJControl struct {
Updates []batchv1.CronJob Updates []batchv1.CronJob
} }
func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, error) { func (c *fakeCJControl) GetCronJob(ctx context.Context, namespace, name string) (*batchv1.CronJob, error) {
if name == c.CronJob.Name && namespace == c.CronJob.Namespace { if name == c.CronJob.Name && namespace == c.CronJob.Namespace {
return c.CronJob, nil return c.CronJob, nil
} }
@ -71,7 +71,7 @@ func (c *fakeCJControl) GetCronJob(namespace, name string) (*batchv1.CronJob, er
var _ cjControlInterface = &fakeCJControl{} var _ cjControlInterface = &fakeCJControl{}
func (c *fakeCJControl) UpdateStatus(cj *batchv1.CronJob) (*batchv1.CronJob, error) { func (c *fakeCJControl) UpdateStatus(ctx context.Context, cj *batchv1.CronJob) (*batchv1.CronJob, error) {
c.Updates = append(c.Updates, *cj) c.Updates = append(c.Updates, *cj)
return cj, nil return cj, nil
} }

View File

@ -933,7 +933,7 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet,
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...) podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// Label new pods using the hash label value of the current history when creating them // Label new pods using the hash label value of the current history when creating them
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err return err
} }
@ -942,7 +942,7 @@ func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet,
// syncNodes deletes given pods and creates new daemon set pods on the given nodes // syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with errors if any // returns slice with errors if any
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions. // We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds) dsKey, err := controller.KeyFunc(ds)
if err != nil { if err != nil {
@ -996,7 +996,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
err := dsc.podControl.CreatePods(ds.Namespace, podTemplate, err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind)) ds, metav1.NewControllerRef(ds, controllerKind))
if err != nil { if err != nil {
@ -1032,7 +1032,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
for i := 0; i < deleteDiff; i++ { for i := 0; i < deleteDiff; i++ {
go func(ix int) { go func(ix int) {
defer deleteWait.Done() defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey) dsc.expectations.DeletionObserved(dsKey)
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name) klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)

View File

@ -254,10 +254,10 @@ func newFakePodControl() *fakePodControl {
} }
} }
func (f *fakePodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
if err := f.FakePodControl.CreatePods(namespace, template, object, controllerRef); err != nil { if err := f.FakePodControl.CreatePods(ctx, namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet") return fmt.Errorf("failed to create pod for DaemonSet")
} }
@ -282,10 +282,10 @@ func (f *fakePodControl) CreatePods(namespace string, template *v1.PodTemplateSp
return nil return nil
} }
func (f *fakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { func (f *fakePodControl) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
if err := f.FakePodControl.DeletePod(namespace, podID, object); err != nil { if err := f.FakePodControl.DeletePod(ctx, namespace, podID, object); err != nil {
return fmt.Errorf("failed to delete pod %q", podID) return fmt.Errorf("failed to delete pod %q", podID)
} }
pod, ok := f.podIDMap[podID] pod, ok := f.podIDMap[podID]

View File

@ -123,7 +123,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
} }
oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...)
return dsc.syncNodes(ds, oldPodsToDelete, nil, hash) return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash)
} }
// When surging, we create new pods whenever an old pod is unavailable, and we can create up // When surging, we create new pods whenever an old pod is unavailable, and we can create up
@ -201,7 +201,7 @@ func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.Dae
} }
newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...) newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...)
return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash) return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
} }
// findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there // findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there

View File

@ -85,10 +85,9 @@ type Controller struct {
podControl controller.PodControlInterface podControl controller.PodControlInterface
// To allow injection of the following for testing. // To allow injection of the following for testing.
updateStatusHandler func(job *batch.Job) (*batch.Job, error) updateStatusHandler func(ctx context.Context, job *batch.Job) (*batch.Job, error)
patchJobHandler func(job *batch.Job, patch []byte) error patchJobHandler func(ctx context.Context, job *batch.Job, patch []byte) error
syncHandler func(jobKey string) (bool, error) syncHandler func(ctx context.Context, jobKey string) (bool, error)
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced podStoreSynced cache.InformerSynced
@ -179,7 +178,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
} }
// Run the main goroutine responsible for watching and syncing jobs. // Run the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(workers int, stopCh <-chan struct{}) { func (jm *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
defer jm.orphanQueue.ShutDown() defer jm.orphanQueue.ShutDown()
@ -187,17 +186,17 @@ func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting job controller") klog.Infof("Starting job controller")
defer klog.Infof("Shutting down job controller") defer klog.Infof("Shutting down job controller")
if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { if !cache.WaitForNamedCacheSync("job", ctx.Done(), jm.podStoreSynced, jm.jobStoreSynced) {
return return
} }
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh) go wait.UntilWithContext(ctx, jm.worker, time.Second)
} }
go wait.Until(jm.orphanWorker, time.Second, stopCh) go wait.UntilWithContext(ctx, jm.orphanWorker, time.Second)
<-stopCh <-ctx.Done()
} }
// getPodJobs returns a list of Jobs that potentially match a Pod. // getPodJobs returns a list of Jobs that potentially match a Pod.
@ -466,19 +465,19 @@ func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *Controller) worker() { func (jm *Controller) worker(ctx context.Context) {
for jm.processNextWorkItem() { for jm.processNextWorkItem(ctx) {
} }
} }
func (jm *Controller) processNextWorkItem() bool { func (jm *Controller) processNextWorkItem(ctx context.Context) bool {
key, quit := jm.queue.Get() key, quit := jm.queue.Get()
if quit { if quit {
return false return false
} }
defer jm.queue.Done(key) defer jm.queue.Done(key)
forget, err := jm.syncHandler(key.(string)) forget, err := jm.syncHandler(ctx, key.(string))
if err == nil { if err == nil {
if forget { if forget {
jm.queue.Forget(key) jm.queue.Forget(key)
@ -497,18 +496,18 @@ func (jm *Controller) processNextWorkItem() bool {
return true return true
} }
func (jm *Controller) orphanWorker() { func (jm *Controller) orphanWorker(ctx context.Context) {
for jm.processNextOrphanPod() { for jm.processNextOrphanPod(ctx) {
} }
} }
func (jm Controller) processNextOrphanPod() bool { func (jm Controller) processNextOrphanPod(ctx context.Context) bool {
key, quit := jm.orphanQueue.Get() key, quit := jm.orphanQueue.Get()
if quit { if quit {
return false return false
} }
defer jm.orphanQueue.Done(key) defer jm.orphanQueue.Done(key)
err := jm.syncOrphanPod(key.(string)) err := jm.syncOrphanPod(ctx, key.(string))
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing orphan pod: %v", err)) utilruntime.HandleError(fmt.Errorf("Error syncing orphan pod: %v", err))
jm.orphanQueue.AddRateLimited(key) jm.orphanQueue.AddRateLimited(key)
@ -520,7 +519,7 @@ func (jm Controller) processNextOrphanPod() bool {
} }
// syncOrphanPod removes the tracking finalizer from an orphan pod if found. // syncOrphanPod removes the tracking finalizer from an orphan pod if found.
func (jm Controller) syncOrphanPod(key string) error { func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, time.Since(startTime))
@ -540,7 +539,7 @@ func (jm Controller) syncOrphanPod(key string) error {
return err return err
} }
if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil { if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil {
if err := jm.podControl.PatchPod(ns, name, patch); err != nil && !apierrors.IsNotFound(err) { if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) {
return err return err
} }
} }
@ -551,7 +550,7 @@ func (jm Controller) syncOrphanPod(key string) error {
// It also reconciles ControllerRef by adopting/orphaning, adding tracking // It also reconciles ControllerRef by adopting/orphaning, adding tracking
// finalizers, if enabled. // finalizers, if enabled.
// Note that the returned Pods are pointers into the cache. // Note that the returned Pods are pointers into the cache.
func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) { func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err) return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
@ -565,7 +564,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po
// If any adoptions are attempted, we should first recheck for deletion // If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639). // with an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) { canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{}) fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -580,7 +579,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po
} }
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...) cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...)
// When adopting Pods, this operation adds an ownerRef and finalizers. // When adopting Pods, this operation adds an ownerRef and finalizers.
pods, err = cm.ClaimPods(context.TODO(), pods) pods, err = cm.ClaimPods(ctx, pods)
if err != nil || !withFinalizers { if err != nil || !withFinalizers {
return pods, err return pods, err
} }
@ -604,7 +603,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Po
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key. // concurrently with the same key.
func (jm *Controller) syncJob(key string) (forget bool, rErr error) { func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rErr error) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
@ -671,7 +670,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil { } else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
if err := jm.patchJobHandler(&job, patch); err != nil { if err := jm.patchJobHandler(ctx, &job, patch); err != nil {
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err) return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
} }
} }
@ -681,7 +680,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
// the store after we've checked the expectation, the job sync is just deferred till the next relist. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobNeedsSync := jm.expectations.SatisfiedExpectations(key) jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(&job, uncounted != nil) pods, err := jm.getPodsForJob(ctx, &job, uncounted != nil)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -732,7 +731,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
suspendCondChanged := false suspendCondChanged := false
// Remove active pods if Job failed. // Remove active pods if Job failed.
if finishedCondition != nil { if finishedCondition != nil {
deleted, err := jm.deleteActivePods(&job, activePods) deleted, err := jm.deleteActivePods(ctx, &job, activePods)
if uncounted == nil { if uncounted == nil {
// Legacy behavior: pretend all active pods were successfully removed. // Legacy behavior: pretend all active pods were successfully removed.
deleted = active deleted = active
@ -747,7 +746,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
} else { } else {
manageJobCalled := false manageJobCalled := false
if jobNeedsSync && job.DeletionTimestamp == nil { if jobNeedsSync && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes) active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes)
manageJobCalled = true manageJobCalled = true
} }
complete := false complete := false
@ -810,7 +809,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
job.Status.Active = active job.Status.Active = active
job.Status.Ready = ready job.Status.Ready = ready
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
if err != nil { if err != nil {
return false, fmt.Errorf("tracking status: %w", err) return false, fmt.Errorf("tracking status: %w", err)
} }
@ -825,7 +824,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
// Legacy path: tracking without finalizers. // Legacy path: tracking without finalizers.
// Ensure that there are no leftover tracking finalizers. // Ensure that there are no leftover tracking finalizers.
if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil { if err := jm.removeTrackingFinalizersFromAllPods(ctx, pods); err != nil {
return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err) return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
} }
@ -841,7 +840,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
job.Status.UncountedTerminatedPods = nil job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition) jm.enactJobFinished(&job, finishedCondition)
if _, err := jm.updateStatusHandler(&job); err != nil { if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
return forget, err return forget, err
} }
@ -861,7 +860,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
// The method trackJobStatusAndRemoveFinalizers removes the finalizers, after // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
// which the objects can actually be deleted. // which the objects can actually be deleted.
// Returns number of successfully deletions issued. // Returns number of successfully deletions issued.
func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, error) { func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) {
errCh := make(chan error, len(pods)) errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods)) successfulDeletes := int32(len(pods))
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -869,7 +868,7 @@ func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, e
for i := range pods { for i := range pods {
go func(pod *v1.Pod) { go func(pod *v1.Pod) {
defer wg.Done() defer wg.Done()
if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) { if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) {
atomic.AddInt32(&successfulDeletes, -1) atomic.AddInt32(&successfulDeletes, -1)
errCh <- err errCh <- err
utilruntime.HandleError(err) utilruntime.HandleError(err)
@ -882,7 +881,7 @@ func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, e
// deleteJobPods deletes the pods, returns the number of successful removals // deleteJobPods deletes the pods, returns the number of successful removals
// and any error. // and any error.
func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
errCh := make(chan error, len(pods)) errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods)) successfulDeletes := int32(len(pods))
@ -903,12 +902,12 @@ func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Po
go func(pod *v1.Pod) { go func(pod *v1.Pod) {
defer wg.Done() defer wg.Done()
if patch := removeTrackingFinalizerPatch(pod); patch != nil { if patch := removeTrackingFinalizerPatch(pod); patch != nil {
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil { if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil {
failDelete(pod, fmt.Errorf("removing completion finalizer: %w", err)) failDelete(pod, fmt.Errorf("removing completion finalizer: %w", err))
return return
} }
} }
if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil { if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil {
failDelete(pod, err) failDelete(pod, err)
} }
}(pods[i]) }(pods[i])
@ -919,7 +918,7 @@ func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Po
// removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called // removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called
// when Job tracking with finalizers is disabled. // when Job tracking with finalizers is disabled.
func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error { func (jm *Controller) removeTrackingFinalizersFromAllPods(ctx context.Context, pods []*v1.Pod) error {
var podsWithFinalizer []*v1.Pod var podsWithFinalizer []*v1.Pod
for _, pod := range pods { for _, pod := range pods {
if hasJobTrackingFinalizer(pod) { if hasJobTrackingFinalizer(pod) {
@ -930,7 +929,7 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
return nil return nil
} }
// Tracking with finalizers is disabled, no need to set expectations. // Tracking with finalizers is disabled, no need to set expectations.
_, err := jm.removeTrackingFinalizerFromPods("", podsWithFinalizer) _, err := jm.removeTrackingFinalizerFromPods(ctx, "", podsWithFinalizer)
return err return err
} }
@ -942,7 +941,7 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
// 4. Add Complete condition if satisfied with current counters. // 4. Add Complete condition if satisfied with current counters.
// It does this up to a limited number of Pods so that the size of .status // It does this up to a limited number of Pods so that the size of .status
// doesn't grow too much and this sync doesn't starve other Jobs. // doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error { func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error {
isIndexed := isIndexedJob(job) isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods uncountedStatus := job.Status.UncountedTerminatedPods
@ -1014,14 +1013,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String() job.Status.CompletedIndexes = succeededIndexes.String()
} }
var err error var err error
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
return err return err
} }
if jm.enactJobFinished(job, finishedCond) { if jm.enactJobFinished(job, finishedCond) {
needsFlush = true needsFlush = true
} }
if needsFlush { if needsFlush {
if _, err := jm.updateStatusHandler(job); err != nil { if _, err := jm.updateStatusHandler(ctx, job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err) return fmt.Errorf("removing uncounted pods from status: %w", err)
} }
recordJobPodFinished(job, oldCounters) recordJobPodFinished(job, oldCounters)
@ -1038,10 +1037,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
// 4. (if not all removals succeeded) flush Job status again. // 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be // Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls. // flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
var err error var err error
if needsFlush { if needsFlush {
if job, err = jm.updateStatusHandler(job); err != nil { if job, err = jm.updateStatusHandler(ctx, job); err != nil {
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
} }
recordJobPodFinished(job, *oldCounters) recordJobPodFinished(job, *oldCounters)
@ -1056,7 +1055,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe
var rmErr error var rmErr error
if len(podsToRemoveFinalizer) > 0 { if len(podsToRemoveFinalizer) > 0 {
var rmSucceded []bool var rmSucceded []bool
rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(jobKey, podsToRemoveFinalizer) rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer)
for i, p := range podsToRemoveFinalizer { for i, p := range podsToRemoveFinalizer {
if rmSucceded[i] { if rmSucceded[i] {
uidsWithFinalizer.Delete(string(p.UID)) uidsWithFinalizer.Delete(string(p.UID))
@ -1069,7 +1068,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe
needsFlush = true needsFlush = true
} }
if rmErr != nil && needsFlush { if rmErr != nil && needsFlush {
if job, err := jm.updateStatusHandler(job); err != nil { if job, err := jm.updateStatusHandler(ctx, job); err != nil {
return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
} }
} }
@ -1102,7 +1101,7 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali
// returns an array of booleans where the i-th value is true if the finalizer // returns an array of booleans where the i-th value is true if the finalizer
// of the i-th Pod was successfully removed (if the pod was deleted when this // of the i-th Pod was successfully removed (if the pod was deleted when this
// function was called, it's considered as the finalizer was removed successfully). // function was called, it's considered as the finalizer was removed successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1.Pod) ([]bool, error) { func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKey string, pods []*v1.Pod) ([]bool, error) {
errCh := make(chan error, len(pods)) errCh := make(chan error, len(pods))
succeeded := make([]bool, len(pods)) succeeded := make([]bool, len(pods))
uids := make([]string, len(pods)) uids := make([]string, len(pods))
@ -1122,7 +1121,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1.
pod := pods[i] pod := pods[i]
defer wg.Done() defer wg.Done()
if patch := removeTrackingFinalizerPatch(pod); patch != nil { if patch := removeTrackingFinalizerPatch(pod); patch != nil {
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil { if err := jm.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patch); err != nil {
// In case of any failure, we don't expect a Pod update for the // In case of any failure, we don't expect a Pod update for the
// finalizer removed. Clear expectation now. // finalizer removed. Clear expectation now.
if jobKey != "" { if jobKey != "" {
@ -1264,7 +1263,7 @@ func jobSuspended(job *batch.Job) bool {
// manageJob is the core method responsible for managing the number of running // manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec. // pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>. // Does NOT modify <activePods>.
func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) { func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) {
active := int32(len(activePods)) active := int32(len(activePods))
parallelism := *job.Spec.Parallelism parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(job)
@ -1277,7 +1276,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, activePods, int(active)) podsToDelete := activePodsForRemoval(job, activePods, int(active))
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
return active, metrics.JobSyncActionPodsDeleted, err return active, metrics.JobSyncActionPodsDeleted, err
} }
@ -1315,7 +1314,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
if len(podsToDelete) > 0 { if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
// While it is possible for a Job to require both pod creations and // While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we // deletions at the same time (e.g. indexed Jobs with repeated indexes), we
@ -1378,7 +1377,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
generateName = podGenerateNameWithIndex(job.Name, completionIndex) generateName = podGenerateNameWithIndex(job.Name, completionIndex)
} }
defer wait.Done() defer wait.Done()
err := jm.podControl.CreatePodsWithGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName) err := jm.podControl.CreatePodsWithGenerateName(ctx, job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
if err != nil { if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore // If the namespace is being torn down, we can safely ignore
@ -1443,13 +1442,13 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P
} }
// updateJobStatus calls the API to update the job status. // updateJobStatus calls the API to update the job status.
func (jm *Controller) updateJobStatus(job *batch.Job) (*batch.Job, error) { func (jm *Controller) updateJobStatus(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{}) return jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(ctx, job, metav1.UpdateOptions{})
} }
func (jm *Controller) patchJob(job *batch.Job, data []byte) error { func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) error {
_, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch( _, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch(
context.TODO(), job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) ctx, job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err return err
} }

View File

@ -775,13 +775,13 @@ func TestControllerSyncJob(t *testing.T) {
setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
actual := job actual := job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
// run // run
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
// We need requeue syncJob task if podController error // We need requeue syncJob task if podController error
if tc.podControllerError != nil { if tc.podControllerError != nil {
@ -1016,20 +1016,20 @@ func TestSyncJobLegacyTracking(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
jobPatches := 0 jobPatches := 0
manager.patchJobHandler = func(*batch.Job, []byte) error { manager.patchJobHandler = func(context.Context, *batch.Job, []byte) error {
jobPatches++ jobPatches++
return nil return nil
} }
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(&tc.job)
var actual *batch.Job var actual *batch.Job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
// Run. // Run.
_, err := manager.syncJob(testutil.GetKey(&tc.job, t)) _, err := manager.syncJob(context.TODO(), testutil.GetKey(&tc.job, t))
if err != nil { if err != nil {
t.Fatalf("Syncing job: %v", err) t.Fatalf("Syncing job: %v", err)
} }
@ -1629,7 +1629,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
metrics.JobPodsFinished.Reset() metrics.JobPodsFinished.Reset()
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
var statusUpdates []batch.JobStatus var statusUpdates []batch.JobStatus
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) statusUpdates = append(statusUpdates, *job.Status.DeepCopy())
return job, tc.statusUpdateErr return job, tc.statusUpdateErr
} }
@ -1639,7 +1639,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
} }
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
succeededIndexes := succeededIndexesFromJob(job) succeededIndexes := succeededIndexesFromJob(job)
err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush)
if !errors.Is(err, tc.wantErr) { if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr) t.Errorf("Got error %v, want %v", err, tc.wantErr)
} }
@ -1775,7 +1775,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
@ -1791,7 +1791,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0) setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0)
// run // run
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil { if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err) t.Errorf("Unexpected error when syncing jobs %v", err)
} }
@ -1852,7 +1852,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
@ -1864,7 +1864,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
job.Status.StartTime = &start job.Status.StartTime = &start
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline"))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil { if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err) t.Errorf("Unexpected error when syncing jobs %v", err)
} }
@ -1893,7 +1893,7 @@ func TestSyncJobComplete(t *testing.T) {
job := newJob(1, 1, 6, batch.NonIndexedCompletion) job := newJob(1, 1, 6, batch.NonIndexedCompletion)
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "")) job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", ""))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil { if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err) t.Fatalf("Unexpected error when syncing jobs %v", err)
} }
@ -1917,11 +1917,11 @@ func TestSyncJobDeleted(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return job, nil return job, nil
} }
job := newJob(2, 2, 6, batch.NonIndexedCompletion) job := newJob(2, 2, 6, batch.NonIndexedCompletion)
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil { if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err) t.Errorf("Unexpected error when syncing jobs %v", err)
} }
@ -1965,13 +1965,13 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return job, tc.updateErr return job, tc.updateErr
} }
job := newJob(2, 2, 6, batch.NonIndexedCompletion) job := newJob(2, 2, 6, batch.NonIndexedCompletion)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
manager.queue.Add(testutil.GetKey(job, t)) manager.queue.Add(testutil.GetKey(job, t))
manager.processNextWorkItem() manager.processNextWorkItem(context.TODO())
// With DefaultJobBackOff=0, the queueing is synchronous. // With DefaultJobBackOff=0, the queueing is synchronous.
requeued := manager.queue.Len() > 0 requeued := manager.queue.Len() > 0
if requeued != tc.wantRequeue { if requeued != tc.wantRequeue {
@ -2148,7 +2148,7 @@ func TestGetPodsForJob(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(p) informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
} }
pods, err := jm.getPodsForJob(job, wFinalizers) pods, err := jm.getPodsForJob(context.TODO(), job, wFinalizers)
if err != nil { if err != nil {
t.Fatalf("getPodsForJob() error: %v", err) t.Fatalf("getPodsForJob() error: %v", err)
} }
@ -2468,7 +2468,7 @@ func TestSyncJobExpectations(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return job, nil return job, nil
} }
@ -2486,7 +2486,7 @@ func TestSyncJobExpectations(t *testing.T) {
podIndexer.Add(pods[1]) podIndexer.Add(pods[1])
}, },
} }
manager.syncJob(testutil.GetKey(job, t)) manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if len(fakePodControl.Templates) != 0 { if len(fakePodControl.Templates) != 0 {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
} }
@ -2508,7 +2508,7 @@ func TestWatchJobs(t *testing.T) {
// The update sent through the fakeWatcher should make its way into the workqueue, // The update sent through the fakeWatcher should make its way into the workqueue,
// and eventually into the syncHandler. // and eventually into the syncHandler.
manager.syncHandler = func(key string) (bool, error) { manager.syncHandler = func(ctx context.Context, key string) (bool, error) {
defer close(received) defer close(received)
ns, name, err := cache.SplitMetaNamespaceKey(key) ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
@ -2529,7 +2529,7 @@ func TestWatchJobs(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
sharedInformerFactory.Start(stopCh) sharedInformerFactory.Start(stopCh)
go manager.Run(1, stopCh) go manager.Run(context.TODO(), 1)
// We're sending new job to see if it reaches syncHandler. // We're sending new job to see if it reaches syncHandler.
testJob.Namespace = "bar" testJob.Namespace = "bar"
@ -2553,7 +2553,7 @@ func TestWatchPods(t *testing.T) {
received := make(chan struct{}) received := make(chan struct{})
// The pod update sent through the fakeWatcher should figure out the managing job and // The pod update sent through the fakeWatcher should figure out the managing job and
// send it into the syncHandler. // send it into the syncHandler.
manager.syncHandler = func(key string) (bool, error) { manager.syncHandler = func(ctx context.Context, key string) (bool, error) {
ns, name, err := cache.SplitMetaNamespaceKey(key) ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
t.Errorf("Error getting namespace/name from key %v: %v", key, err) t.Errorf("Error getting namespace/name from key %v: %v", key, err)
@ -2575,7 +2575,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh) go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
go manager.Run(1, stopCh) go manager.Run(context.TODO(), 1)
pods := newPodList(1, v1.PodRunning, testJob) pods := newPodList(1, v1.PodRunning, testJob)
testPod := pods[0] testPod := pods[0]
@ -2594,7 +2594,7 @@ func TestWatchOrphanPods(t *testing.T) {
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
jobSynced := false jobSynced := false
manager.syncHandler = func(jobKey string) (bool, error) { manager.syncHandler = func(ctx context.Context, jobKey string) (bool, error) {
jobSynced = true jobSynced = true
return true, nil return true, nil
} }
@ -2605,7 +2605,7 @@ func TestWatchOrphanPods(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) go sharedInformers.Core().V1().Pods().Informer().Run(stopCh)
go manager.Run(1, stopCh) go manager.Run(context.TODO(), 1)
orphanPod := buildPod().name("a").job(testJob).deletionTimestamp().trackingFinalizer().Pod orphanPod := buildPod().name("a").job(testJob).deletionTimestamp().trackingFinalizer().Pod
orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{})
@ -2674,7 +2674,7 @@ func TestJobBackoffReset(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
@ -2687,7 +2687,7 @@ func TestJobBackoffReset(t *testing.T) {
setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0) setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0)
manager.queue.Add(key) manager.queue.Add(key)
manager.processNextWorkItem() manager.processNextWorkItem(context.TODO())
retries := manager.queue.NumRequeues(key) retries := manager.queue.NumRequeues(key)
if retries != 1 { if retries != 1 {
t.Errorf("%s: expected exactly 1 retry, got %d", name, retries) t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
@ -2696,7 +2696,7 @@ func TestJobBackoffReset(t *testing.T) {
job = actual job = actual
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0) setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0)
manager.processNextWorkItem() manager.processNextWorkItem(context.TODO())
retries = manager.queue.NumRequeues(key) retries = manager.queue.NumRequeues(key)
if retries != 0 { if retries != 0 {
t.Errorf("%s: expected exactly 0 retries, got %d", name, retries) t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
@ -2854,7 +2854,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
@ -2870,7 +2870,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
} }
// run // run
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil { if err != nil {
t.Errorf("unexpected error syncing job. Got %#v", err) t.Errorf("unexpected error syncing job. Got %#v", err)
@ -2956,7 +2956,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
var actual *batch.Job var actual *batch.Job
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job actual = job
return job, nil return job, nil
} }
@ -2974,7 +2974,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
} }
// run // run
forget, err := manager.syncJob(testutil.GetKey(job, t)) forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if (err != nil) != tc.isExpectingAnError { if (err != nil) != tc.isExpectingAnError {
t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError) t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError)
@ -3094,7 +3094,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")}
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return job, nil return job, nil
} }
@ -3114,7 +3114,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
} }
jobKey := testutil.GetKey(job, t) jobKey := testutil.GetKey(job, t)
manager.syncJob(jobKey) manager.syncJob(context.TODO(), jobKey)
gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey)
if len(gotExpectedUIDs) != 0 { if len(gotExpectedUIDs) != 0 {
t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List()) t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List())
@ -3122,7 +3122,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
// Remove failures and re-sync. // Remove failures and re-sync.
manager.podControl.(*controller.FakePodControl).Err = nil manager.podControl.(*controller.FakePodControl).Err = nil
manager.syncJob(jobKey) manager.syncJob(context.TODO(), jobKey)
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff) t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff)

View File

@ -542,7 +542,7 @@ func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
// manageReplicas checks and updates replicas for the given ReplicaSet. // manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>. // Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods. // It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas)) diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs) rsKey, err := controller.KeyFunc(rs)
if err != nil { if err != nil {
@ -570,7 +570,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
// after one of its pods fails. Conveniently, this also prevents the // after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate. // event spam that those failures would generate.
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil { if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// if the namespace is being terminated, we don't have to do // if the namespace is being terminated, we don't have to do
@ -618,7 +618,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
for _, pod := range podsToDelete { for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) { go func(targetPod *v1.Pod) {
defer wg.Done() defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil { if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion // Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod) podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(rsKey, podKey) rsc.expectations.DeletionObserved(rsKey, podKey)
@ -693,7 +693,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
var manageReplicasErr error var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil { if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs) manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
} }
rs = rs.DeepCopy() rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

View File

@ -337,18 +337,18 @@ type podControlAdapter struct {
controller.PodControlInterface controller.PodControlInterface
} }
func (pc podControlAdapter) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { func (pc podControlAdapter) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
rc, err := convertRStoRC(object.(*apps.ReplicaSet)) rc, err := convertRStoRC(object.(*apps.ReplicaSet))
if err != nil { if err != nil {
return err return err
} }
return pc.PodControlInterface.CreatePods(namespace, template, rc, controllerRef) return pc.PodControlInterface.CreatePods(ctx, namespace, template, rc, controllerRef)
} }
func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error { func (pc podControlAdapter) DeletePod(ctx context.Context, namespace string, podID string, object runtime.Object) error {
rc, err := convertRStoRC(object.(*apps.ReplicaSet)) rc, err := convertRStoRC(object.(*apps.ReplicaSet))
if err != nil { if err != nil {
return err return err
} }
return pc.PodControlInterface.DeletePod(namespace, podID, rc) return pc.PodControlInterface.DeletePod(ctx, namespace, podID, rc)
} }

View File

@ -159,8 +159,8 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
defer close(stopCh) defer close(stopCh)
informerSet.Start(stopCh) informerSet.Start(stopCh)
go cjc.Run(1, stopCh) go cjc.Run(context.TODO(), 1)
go jc.Run(1, stopCh) go jc.Run(context.TODO(), 1)
_, err := cjClient.Create(context.TODO(), newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{}) _, err := cjClient.Create(context.TODO(), newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{})
if err != nil { if err != nil {

View File

@ -1101,7 +1101,7 @@ func startJobController(restConfig *restclient.Config, clientSet clientset.Inter
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod) informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod)
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
informerSet.Start(ctx.Done()) informerSet.Start(ctx.Done())
go jc.Run(1, ctx.Done()) go jc.Run(ctx, 1)
return ctx, cancel return ctx, cancel
} }