Initial Commit

This commit is contained in:
Sharpz7 2023-08-15 00:20:27 +00:00
parent 297f04b74a
commit cf32ae9453
2 changed files with 98 additions and 28 deletions

View File

@ -470,7 +470,7 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
}
// if curJob is finished, remove the finalizer as a backup check.
if curJob.Status.CompletionTime != nil {
if IsJobFinished(curJob) {
jm.backupRemovePodFinalizers(curJob)
}
@ -1873,20 +1873,16 @@ func onlyReplaceFailedPods(job *batch.Job) bool {
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
}
func (jm *Controller) backupRemovePodFinalizers(obj interface{}) {
jobObj, ok := obj.(*batch.Job)
if !ok {
return
}
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
func (jm *Controller) backupRemovePodFinalizers(job *batch.Job) {
// Listing pods shouldn't really fail, as we are just querying the informer cache.
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
return
}
pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector)
pods, _ := jm.podStore.Pods(job.Namespace).List(selector)
for _, pod := range pods {
if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) {
if metav1.IsControlledBy(pod, job) && hasJobTrackingFinalizer(pod) {
jm.enqueueOrphanPod(pod)
}
}

View File

@ -62,12 +62,16 @@ import (
"k8s.io/utils/pointer"
)
var realClock = &clock.RealClock{}
var alwaysReady = func() bool { return true }
var (
realClock = &clock.RealClock{}
alwaysReady = func() bool { return true }
)
const fastSyncJobBatchPeriod = 10 * time.Millisecond
const fastJobApiBackoff = 10 * time.Millisecond
const fastRequeue = 10 * time.Millisecond
const (
fastSyncJobBatchPeriod = 10 * time.Millisecond
fastJobApiBackoff = 10 * time.Millisecond
fastRequeue = 10 * time.Millisecond
)
// testFinishedAt represents time one second later than unix epoch
// this will be used in various test cases where we don't want back-off to kick in
@ -2151,7 +2155,6 @@ func TestSingleJobFailedCondition(t *testing.T) {
if failedConditions[0].Status != v1.ConditionTrue {
t.Errorf("Unexpected status for the failed condition. Expected: %v, saw %v\n", v1.ConditionTrue, failedConditions[0].Status)
}
}
func TestSyncJobComplete(t *testing.T) {
@ -4698,9 +4701,11 @@ func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
func (f *fakeRateLimitingQueue) Forget(item interface{}) {
f.requeues = 0
}
func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
return f.requeues
}
func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
f.item = item
f.duration = duration
@ -4788,57 +4793,79 @@ func TestJobBackoffForOnFailure(t *testing.T) {
}{
"backoffLimit 0 should have 1 pod active": {
1, 1, 0,
false, []int32{0}, v1.PodRunning,
false,
[]int32{0},
v1.PodRunning,
1, 0, 0, nil, "",
},
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
1, 1, 1,
false, []int32{0}, v1.PodRunning,
false,
[]int32{0},
v1.PodRunning,
1, 0, 0, nil, "",
},
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
1, 1, 1,
false, []int32{1}, v1.PodRunning,
false,
[]int32{1},
v1.PodRunning,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
1, 1, 1,
false, []int32{1}, v1.PodPending,
false,
[]int32{1},
v1.PodPending,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podRunning - single pod": {
1, 5, 2,
false, []int32{2}, v1.PodRunning,
false,
[]int32{2},
v1.PodRunning,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podPending - single pod": {
1, 5, 2,
false, []int32{2}, v1.PodPending,
false,
[]int32{2},
v1.PodPending,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podRunning - multiple pods": {
2, 5, 2,
false, []int32{1, 1}, v1.PodRunning,
false,
[]int32{1, 1},
v1.PodRunning,
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podPending - multiple pods": {
2, 5, 2,
false, []int32{1, 1}, v1.PodPending,
false,
[]int32{1, 1},
v1.PodPending,
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures": {
2, 5, 3,
false, []int32{1, 1}, v1.PodRunning,
false,
[]int32{1, 1},
v1.PodRunning,
2, 0, 0, nil, "",
},
"suspending a job": {
2, 4, 6,
true, []int32{1, 1}, v1.PodRunning,
true,
[]int32{1, 1},
v1.PodRunning,
0, 0, 0, &jobConditionSuspended, "JobSuspended",
},
"finshed job": {
2, 4, 6,
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
true,
[]int32{1, 1, 2, 0},
v1.PodSucceeded,
0, 4, 0, &jobConditionComplete, "",
},
}
@ -4871,7 +4898,6 @@ func TestJobBackoffForOnFailure(t *testing.T) {
// run
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Errorf("unexpected error syncing job. Got %#v", err)
}
@ -5172,6 +5198,54 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
}
}
func TestBackupFinalizers(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
clientset := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
stopCh := make(chan struct{})
defer close(stopCh)
podInformer := sharedInformers.Core().V1().Pods().Informer()
go podInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
// 1. Create the controller but do not start the workers.
// This is done above by initializing the manager and not calling manager.Run() yet.
// 2. Create a job.
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
// 3. Create the pods.
podBuilder := buildPod().name("test_pod").deletionTimestamp().trackingFinalizer().job(job)
pod, err := clientset.CoreV1().Pods("default").Create(context.Background(), podBuilder.Pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Creating pod: %v", err)
}
// 4. Finish the job.
job.Status.Conditions = append(job.Status.Conditions, batch.JobCondition{
Type: batch.JobComplete,
Status: v1.ConditionTrue,
})
// 5. Start the workers.
go manager.Run(context.TODO(), 1)
// Check if the finalizer has been removed from the pod.
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
p, err := clientset.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return !hasJobTrackingFinalizer(p), nil
}); err != nil {
t.Errorf("Waiting for Pod to get the finalizer removed: %v", err)
}
}
func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) {
t.Helper()
labels := p.GetLabels()