Add integration test for orphan pods when there is GC

Change-Id: I04cd70725fd1830be8daf2dca53f67bc10a379b7
This commit is contained in:
Aldo Culquicondor 2022-03-16 17:01:52 -04:00
parent cc5bf4a3f4
commit f72173e4b4

View File

@ -36,20 +36,26 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/util/feature"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/controller-manager/pkg/informerfactory"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)
const waitInterval = 500 * time.Millisecond
const waitInterval = time.Second
// TestNonParallelJob tests that a Job that only executes one Pod. The test
// recreates the Job controller at some points to make sure a new controller
@ -61,7 +67,7 @@ func TestNonParallelJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer func() {
cancel()
}()
@ -79,7 +85,7 @@ func TestNonParallelJob(t *testing.T) {
// Restarting controller.
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
// Failed Pod is replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
@ -92,7 +98,7 @@ func TestNonParallelJob(t *testing.T) {
// Restarting controller.
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
// No more Pods are created after the Pod succeeds.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
@ -132,7 +138,7 @@ func TestParallelJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -220,7 +226,7 @@ func TestParallelJobParallelism(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -296,7 +302,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
closeFn, restConfig, clientSet, ns := setup(t, "completions")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -376,7 +382,7 @@ func TestIndexedJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer func() {
cancel()
}()
@ -447,7 +453,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer func() {
cancel()
}()
@ -477,7 +483,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
}
// Restart controller.
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
// Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -503,7 +509,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
}
// Restart controller.
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
// Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -513,13 +519,72 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
}, false)
}
func TestOrphanPodsFinalizersCleared(t *testing.T) {
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0)
// Make the job controller significantly slower to trigger race condition.
restConfig.QPS = 1
restConfig.Burst = 1
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
defer cancel()
restConfig.QPS = 200
restConfig.Burst = 200
runGC := createGC(ctx, t, restConfig, informerSet)
informerSet.Start(ctx.Done())
go jc.Run(ctx, 1)
runGC()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(5),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 5,
}, true)
// Delete Job. The GC should delete the pods in cascade.
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete job: %v", err)
}
orphanPods := 0
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
})
if err != nil {
return false, err
}
orphanPods = 0
for _, pod := range pods.Items {
if hasJobTrackingFinalizer(&pod) {
orphanPods++
}
}
return orphanPods == 0, nil
}); err != nil {
t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err)
t.Logf("Last saw %d orphan pods", orphanPods)
}
}
func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
// Step 0: job created while feature is enabled.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer func() {
cancel()
}()
@ -550,15 +615,15 @@ func TestOrphanPodsFinalizersCleared(t *testing.T) {
}
// Restart controller.
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Falied to list Job Pods: %v", err)
t.Fatalf("Failed to list Job Pods: %v", err)
}
sawPods := false
for _, pod := range pods.Items {
if isPodOwnedByJob(&pod, jobObj) {
if metav1.IsControlledBy(&pod, jobObj) {
if hasJobTrackingFinalizer(&pod) {
return false, nil
}
@ -600,7 +665,7 @@ func TestSuspendJob(t *testing.T) {
t.Run(name, func(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer cancel()
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
if err != nil {
@ -650,7 +715,7 @@ func TestSuspendJob(t *testing.T) {
func TestSuspendJobControllerRestart(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer func() {
cancel()
}()
@ -680,7 +745,7 @@ func TestNodeSelectorUpdate(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
ctx, cancel := startJobController(restConfig)
defer cancel()
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
@ -780,7 +845,7 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
active = nil
for _, pod := range pods.Items {
phase := pod.Status.Phase
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
p := pod
active = append(active, &p)
}
@ -806,7 +871,7 @@ func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSe
}
for _, pod := range pods.Items {
phase := pod.Status.Phase
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name)
}
}
@ -830,7 +895,7 @@ func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clients
}
gotActive := sets.NewInt()
for _, pod := range pods.Items {
if isPodOwnedByJob(&pod, jobObj) {
if metav1.IsControlledBy(&pod, jobObj) {
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
ix, err := getCompletionIndex(&pod)
if err != nil {
@ -931,7 +996,7 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
if len(updates) == cnt {
break
}
if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
if p := pod.Status.Phase; metav1.IsControlledBy(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
if !op(&pod) {
continue
}
@ -975,7 +1040,7 @@ func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, job
return fmt.Errorf("listing Job Pods: %w", err)
}
for _, pod := range pods.Items {
if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
if p := pod.Status.Phase; !metav1.IsControlledBy(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
continue
}
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
@ -1001,15 +1066,6 @@ func getCompletionIndex(p *v1.Pod) (int, error) {
return strconv.Atoi(v)
}
func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool {
for _, owner := range p.ObjectMeta.OwnerReferences {
if owner.Kind == "Job" && owner.UID == j.UID {
return true
}
}
return false
}
func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) {
if jobObj.Name == "" {
jobObj.Name = "test-job"
@ -1046,16 +1102,55 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
return closeFn, &config, clientSet, ns
}
func startJobController(restConfig *restclient.Config, clientSet clientset.Interface) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
resyncPeriod := 12 * time.Hour
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod)
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
func startJobController(restConfig *restclient.Config) (context.Context, context.CancelFunc) {
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
informerSet.Start(ctx.Done())
go jc.Run(ctx, 1)
return ctx, cancel
}
func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller"))
ctx, cancel := context.WithCancel(context.Background())
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
return jc, ctx, cancel
}
func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() {
restConfig = restclient.AddUserAgent(restConfig, "gc-controller")
clientSet := clientset.NewForConfigOrDie(restConfig)
metadataClient, err := metadata.NewForConfig(restConfig)
if err != nil {
t.Fatalf("Failed to create metadataClient: %v", err)
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
restMapper.Reset()
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
alwaysStarted := make(chan struct{})
close(alwaysStarted)
gc, err := garbagecollector.NewGarbageCollector(
clientSet,
metadataClient,
restMapper,
garbagecollector.DefaultIgnoredResources(),
informerfactory.NewInformerFactory(informerSet, metadataInformers),
alwaysStarted,
)
if err != nil {
t.Fatalf("Failed creating garbage collector")
}
startGC := func() {
syncPeriod := 5 * time.Second
go wait.Until(func() {
restMapper.Reset()
}, syncPeriod, ctx.Done())
go gc.Run(ctx, 1)
go gc.Sync(clientSet.Discovery(), syncPeriod, ctx.Done())
}
return startGC
}
func hasJobTrackingFinalizer(obj metav1.Object) bool {
for _, fin := range obj.GetFinalizers() {
if fin == batchv1.JobTrackingFinalizer {