mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
Merge pull request #108752 from alculquicondor/job-orphan-pods
Fix: Clean job tracking finalizer from orphan pods
This commit is contained in:
commit
4348c8ecaf
@ -151,9 +151,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
|
||||
jm.enqueueController(obj, true)
|
||||
},
|
||||
UpdateFunc: jm.updateJob,
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
jm.enqueueController(obj, true)
|
||||
},
|
||||
DeleteFunc: jm.deleteJob,
|
||||
})
|
||||
jm.jobLister = jobInformer.Lister()
|
||||
jm.jobStoreSynced = jobInformer.Informer().HasSynced
|
||||
@ -238,7 +236,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta
|
||||
return job
|
||||
}
|
||||
|
||||
// When a pod is created, enqueue the controller that manages it and update it's expectations.
|
||||
// When a pod is created, enqueue the controller that manages it and update its expectations.
|
||||
func (jm *Controller) addPod(obj interface{}) {
|
||||
pod := obj.(*v1.Pod)
|
||||
if pod.DeletionTimestamp != nil {
|
||||
@ -263,7 +261,12 @@ func (jm *Controller) addPod(obj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, it's an orphan. Get a list of all matching controllers and sync
|
||||
// Otherwise, it's an orphan.
|
||||
// Clean the finalizer.
|
||||
if hasJobTrackingFinalizer(pod) {
|
||||
jm.enqueueOrphanPod(pod)
|
||||
}
|
||||
// Get a list of all matching controllers and sync
|
||||
// them to see if anyone wants to adopt it.
|
||||
// DO NOT observe creation because no controller should be waiting for an
|
||||
// orphan.
|
||||
@ -333,7 +336,12 @@ func (jm *Controller) updatePod(old, cur interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, it's an orphan. If anything changed, sync matching controllers
|
||||
// Otherwise, it's an orphan.
|
||||
// Clean the finalizer.
|
||||
if hasJobTrackingFinalizer(curPod) {
|
||||
jm.enqueueOrphanPod(curPod)
|
||||
}
|
||||
// If anything changed, sync matching controllers
|
||||
// to see if anyone wants to adopt it now.
|
||||
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
||||
if labelChanged || controllerRefChanged {
|
||||
@ -366,13 +374,18 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {
|
||||
}
|
||||
|
||||
controllerRef := metav1.GetControllerOf(pod)
|
||||
hasFinalizer := hasJobTrackingFinalizer(pod)
|
||||
if controllerRef == nil {
|
||||
// No controller should care about orphans being deleted.
|
||||
// But this pod might have belonged to a Job and the GC removed the reference.
|
||||
if hasFinalizer {
|
||||
jm.enqueueOrphanPod(pod)
|
||||
}
|
||||
return
|
||||
}
|
||||
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
|
||||
if job == nil {
|
||||
if hasJobTrackingFinalizer(pod) {
|
||||
if hasFinalizer {
|
||||
jm.enqueueOrphanPod(pod)
|
||||
}
|
||||
return
|
||||
@ -385,7 +398,7 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {
|
||||
|
||||
// Consider the finalizer removed if this is the final delete. Otherwise,
|
||||
// it's an update for the deletion timestamp, then check finalizer.
|
||||
if final || !hasJobTrackingFinalizer(pod) {
|
||||
if final || !hasFinalizer {
|
||||
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
|
||||
}
|
||||
|
||||
@ -421,6 +434,37 @@ func (jm *Controller) updateJob(old, cur interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// deleteJob enqueues the job and all the pods associated with it that still
|
||||
// have a finalizer.
|
||||
func (jm *Controller) deleteJob(obj interface{}) {
|
||||
jm.enqueueController(obj, true)
|
||||
jobObj, ok := obj.(*batch.Job)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
|
||||
return
|
||||
}
|
||||
jobObj, ok = tombstone.Obj.(*batch.Job)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a job %+v", obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
// Listing pods shouldn't really fail, as we are just querying the informer cache.
|
||||
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
|
||||
return
|
||||
}
|
||||
pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector)
|
||||
for _, pod := range pods {
|
||||
if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) {
|
||||
jm.enqueueOrphanPod(pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
|
||||
// immediate tells the controller to update the status right away, and should
|
||||
// happen ONLY when there was a successful pod run.
|
||||
@ -538,6 +582,14 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
// Make sure the pod is still orphaned.
|
||||
if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
|
||||
job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
|
||||
if job != nil {
|
||||
// The pod was adopted. Do not remove finalizer.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil {
|
||||
if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
|
@ -36,20 +36,26 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
|
||||
"k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/metadata/metadatainformer"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/util/retry"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/controller-manager/pkg/informerfactory"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
const waitInterval = 500 * time.Millisecond
|
||||
const waitInterval = time.Second
|
||||
|
||||
// TestNonParallelJob tests that a Job that only executes one Pod. The test
|
||||
// recreates the Job controller at some points to make sure a new controller
|
||||
@ -61,7 +67,7 @@ func TestNonParallelJob(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@ -79,7 +85,7 @@ func TestNonParallelJob(t *testing.T) {
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
@ -92,7 +98,7 @@ func TestNonParallelJob(t *testing.T) {
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
@ -132,7 +138,7 @@ func TestParallelJob(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@ -220,7 +226,7 @@ func TestParallelJobParallelism(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@ -296,7 +302,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "completions")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
@ -376,7 +382,7 @@ func TestIndexedJob(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@ -447,7 +453,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@ -477,7 +483,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
|
||||
// Ensure Job continues to be tracked and finalizers are removed.
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
@ -503,7 +509,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
|
||||
// Ensure Job continues to be tracked and finalizers are removed.
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
@ -513,13 +519,77 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
||||
}, false)
|
||||
}
|
||||
|
||||
func TestOrphanPodsFinalizersCleared(t *testing.T) {
|
||||
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground} {
|
||||
t.Run(string(policy), func(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0)
|
||||
// Make the job controller significantly slower to trigger race condition.
|
||||
restConfig.QPS = 1
|
||||
restConfig.Burst = 1
|
||||
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
|
||||
defer cancel()
|
||||
restConfig.QPS = 200
|
||||
restConfig.Burst = 200
|
||||
runGC := createGC(ctx, t, restConfig, informerSet)
|
||||
informerSet.Start(ctx.Done())
|
||||
go jc.Run(ctx, 1)
|
||||
runGC()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(5),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver didn't add the tracking annotation")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
}, true)
|
||||
|
||||
// Delete Job. The GC should delete the pods in cascade.
|
||||
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{
|
||||
PropagationPolicy: &policy,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete job: %v", err)
|
||||
}
|
||||
orphanPods := 0
|
||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
orphanPods = 0
|
||||
for _, pod := range pods.Items {
|
||||
if hasJobTrackingFinalizer(&pod) {
|
||||
orphanPods++
|
||||
}
|
||||
}
|
||||
return orphanPods == 0, nil
|
||||
}); err != nil {
|
||||
t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err)
|
||||
t.Logf("Last saw %d orphan pods", orphanPods)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
|
||||
// Step 0: job created while feature is enabled.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@ -550,15 +620,15 @@ func TestOrphanPodsFinalizersCleared(t *testing.T) {
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
ctx, cancel = startJobController(restConfig)
|
||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Falied to list Job Pods: %v", err)
|
||||
t.Fatalf("Failed to list Job Pods: %v", err)
|
||||
}
|
||||
sawPods := false
|
||||
for _, pod := range pods.Items {
|
||||
if isPodOwnedByJob(&pod, jobObj) {
|
||||
if metav1.IsControlledBy(&pod, jobObj) {
|
||||
if hasJobTrackingFinalizer(&pod) {
|
||||
return false, nil
|
||||
}
|
||||
@ -600,7 +670,7 @@ func TestSuspendJob(t *testing.T) {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer cancel()
|
||||
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
@ -650,7 +720,7 @@ func TestSuspendJob(t *testing.T) {
|
||||
func TestSuspendJobControllerRestart(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
@ -680,7 +750,7 @@ func TestNodeSelectorUpdate(t *testing.T) {
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
ctx, cancel := startJobController(restConfig)
|
||||
defer cancel()
|
||||
|
||||
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
|
||||
@ -780,7 +850,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
|
||||
active = nil
|
||||
for _, pod := range pods.Items {
|
||||
phase := pod.Status.Phase
|
||||
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
|
||||
if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
|
||||
p := pod
|
||||
active = append(active, &p)
|
||||
}
|
||||
@ -806,7 +876,7 @@ func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSe
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
phase := pod.Status.Phase
|
||||
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
|
||||
if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
|
||||
t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name)
|
||||
}
|
||||
}
|
||||
@ -830,7 +900,7 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients
|
||||
}
|
||||
gotActive := sets.NewInt()
|
||||
for _, pod := range pods.Items {
|
||||
if isPodOwnedByJob(&pod, jobObj) {
|
||||
if metav1.IsControlledBy(&pod, jobObj) {
|
||||
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
|
||||
ix, err := getCompletionIndex(&pod)
|
||||
if err != nil {
|
||||
@ -931,7 +1001,7 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
|
||||
if len(updates) == cnt {
|
||||
break
|
||||
}
|
||||
if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
|
||||
if p := pod.Status.Phase; metav1.IsControlledBy(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
|
||||
if !op(&pod) {
|
||||
continue
|
||||
}
|
||||
@ -975,7 +1045,7 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job
|
||||
return fmt.Errorf("listing Job Pods: %w", err)
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
|
||||
if p := pod.Status.Phase; !metav1.IsControlledBy(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
|
||||
continue
|
||||
}
|
||||
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
|
||||
@ -1001,15 +1071,6 @@ func getCompletionIndex(p *v1.Pod) (int, error) {
|
||||
return strconv.Atoi(v)
|
||||
}
|
||||
|
||||
func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool {
|
||||
for _, owner := range p.ObjectMeta.OwnerReferences {
|
||||
if owner.Kind == "Job" && owner.UID == j.UID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) {
|
||||
if jobObj.Name == "" {
|
||||
jobObj.Name = "test-job"
|
||||
@ -1046,16 +1107,55 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
|
||||
return closeFn, &config, clientSet, ns
|
||||
}
|
||||
|
||||
func startJobController(restConfig *restclient.Config, clientSet clientset.Interface) (context.Context, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
resyncPeriod := 12 * time.Hour
|
||||
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod)
|
||||
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
|
||||
func startJobController(restConfig *restclient.Config) (context.Context, context.CancelFunc) {
|
||||
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
|
||||
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
|
||||
informerSet.Start(ctx.Done())
|
||||
go jc.Run(ctx, 1)
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
|
||||
clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller"))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
|
||||
return jc, ctx, cancel
|
||||
}
|
||||
|
||||
func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
restConfig = restclient.AddUserAgent(restConfig, "gc-controller")
|
||||
clientSet := clientset.NewForConfigOrDie(restConfig)
|
||||
metadataClient, err := metadata.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create metadataClient: %v", err)
|
||||
}
|
||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
|
||||
restMapper.Reset()
|
||||
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
|
||||
alwaysStarted := make(chan struct{})
|
||||
close(alwaysStarted)
|
||||
gc, err := garbagecollector.NewGarbageCollector(
|
||||
clientSet,
|
||||
metadataClient,
|
||||
restMapper,
|
||||
garbagecollector.DefaultIgnoredResources(),
|
||||
informerfactory.NewInformerFactory(informerSet, metadataInformers),
|
||||
alwaysStarted,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed creating garbage collector")
|
||||
}
|
||||
startGC := func() {
|
||||
syncPeriod := 5 * time.Second
|
||||
go wait.Until(func() {
|
||||
restMapper.Reset()
|
||||
}, syncPeriod, ctx.Done())
|
||||
go gc.Run(ctx, 1)
|
||||
go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done())
|
||||
}
|
||||
return startGC
|
||||
}
|
||||
|
||||
func hasJobTrackingFinalizer(obj metav1.Object) bool {
|
||||
for _, fin := range obj.GetFinalizers() {
|
||||
if fin == batchv1.JobTrackingFinalizer {
|
||||
|
Loading…
Reference in New Issue
Block a user