From 3bb965b651a52759a073acfb13e8c0db7bb0f3e6 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 6 May 2022 10:58:10 +0200 Subject: [PATCH] merge test integration scheduler util --- test/integration/scheduler/framework_test.go | 38 +- test/integration/scheduler/predicates_test.go | 18 +- test/integration/scheduler/preemption_test.go | 82 +-- test/integration/scheduler/priorities_test.go | 12 +- test/integration/scheduler/scheduler_test.go | 14 +- test/integration/scheduler/util.go | 547 +----------------- test/integration/util/util.go | 494 ++++++++++++++++ 7 files changed, 605 insertions(+), 600 deletions(-) diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index ef1d82903c0..b8faeee5a5f 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -563,7 +563,7 @@ func TestPreFilterPlugin(t *testing.T) { preFilterPlugin.rejectPreFilter = test.reject // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -704,7 +704,7 @@ func TestPostFilterPlugin(t *testing.T) { defer testutils.CleanupTest(t, testCtx) // Create a best effort pod. - pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -771,7 +771,7 @@ func TestScorePlugin(t *testing.T) { scorePlugin.failScore = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Fatalf("Error while creating a test pod: %v", err) } @@ -817,7 +817,7 @@ func TestNormalizeScorePlugin(t *testing.T) { // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Fatalf("Error while creating a test pod: %v", err) } @@ -867,7 +867,7 @@ func TestReservePluginReserve(t *testing.T) { reservePlugin.failReserve = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1000,7 +1000,7 @@ func TestPrebindPlugin(t *testing.T) { preBindPlugin.succeedOnRetry = test.succeedOnRetry // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1146,7 +1146,7 @@ func TestUnReserveReservePlugins(t *testing.T) { // Create a best effort pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1242,7 +1242,7 @@ func TestUnReservePermitPlugins(t *testing.T) { // Create a best effort pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1318,7 +1318,7 @@ func TestUnReservePreBindPlugins(t *testing.T) { // Create a pause pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1394,7 +1394,7 @@ func TestUnReserveBindPlugins(t *testing.T) { // Create a pause pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1539,7 +1539,7 @@ func TestBindPlugin(t *testing.T) { // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1649,7 +1649,7 @@ func TestPostBindPlugin(t *testing.T) { // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1746,7 +1746,7 @@ func TestPermitPlugin(t *testing.T) { // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1796,7 +1796,7 @@ func TestMultiplePermitPlugins(t *testing.T) { // Create a test pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1851,7 +1851,7 @@ func TestPermitPluginsCancelled(t *testing.T) { // Create a test pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -1914,12 +1914,12 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) { // Create two pods. First pod to enter Permit() will wait and a second one will either // reject or allow first one. podA, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating the first pod: %v", err) } podB, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating the second pod: %v", err) } @@ -1991,7 +1991,7 @@ func TestFilterPlugin(t *testing.T) { filterPlugin.failFilter = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } @@ -2049,7 +2049,7 @@ func TestPreScorePlugin(t *testing.T) { preScorePlugin.failPreScore = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, - initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) + initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } diff --git a/test/integration/scheduler/predicates_test.go b/test/integration/scheduler/predicates_test.go index 9716406ef29..90e8a78155c 100644 --- a/test/integration/scheduler/predicates_test.go +++ b/test/integration/scheduler/predicates_test.go @@ -1283,12 +1283,12 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) { tests := []struct { name string init func(kubernetes.Interface, string) error - pod *pausePodConfig + pod *testutils.PausePodConfig update func(kubernetes.Interface, string) error }{ { name: "node gets added", - pod: &pausePodConfig{ + pod: &testutils.PausePodConfig{ Name: "pod-1", }, update: func(cs kubernetes.Interface, _ string) error { @@ -1312,7 +1312,7 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) { } return nil }, - pod: &pausePodConfig{ + pod: &testutils.PausePodConfig{ Name: "pod-1", }, update: func(cs kubernetes.Interface, _ string) error { @@ -1331,13 +1331,13 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) { if err != nil { return fmt.Errorf("cannot create node: %v", err) } - _, err = createPausePod(cs, initPausePod(&pausePodConfig{Name: "pod-to-be-deleted", Namespace: ns})) + _, err = createPausePod(cs, initPausePod(&testutils.PausePodConfig{Name: "pod-to-be-deleted", Namespace: ns})) if err != nil { return fmt.Errorf("cannot create pod: %v", err) } return nil }, - pod: &pausePodConfig{ + pod: &testutils.PausePodConfig{ Name: "pod-1", }, update: func(cs kubernetes.Interface, ns string) error { @@ -1356,7 +1356,7 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) { } return nil }, - pod: &pausePodConfig{ + pod: &testutils.PausePodConfig{ Name: "pod-1", Affinity: &v1.Affinity{ PodAffinity: &v1.PodAffinity{ @@ -1374,7 +1374,7 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) { }, }, update: func(cs kubernetes.Interface, ns string) error { - podConfig := &pausePodConfig{ + podConfig := &testutils.PausePodConfig{ Name: "pod-with-affinity", Namespace: ns, Labels: map[string]string{ @@ -1394,12 +1394,12 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) { if err != nil { return fmt.Errorf("cannot create node: %v", err) } - if _, err := createPausePod(cs, initPausePod(&pausePodConfig{Name: "pod-to-be-updated", Namespace: ns})); err != nil { + if _, err := createPausePod(cs, initPausePod(&testutils.PausePodConfig{Name: "pod-to-be-updated", Namespace: ns})); err != nil { return fmt.Errorf("cannot create pod: %v", err) } return nil }, - pod: &pausePodConfig{ + pod: &testutils.PausePodConfig{ Name: "pod-1", Affinity: &v1.Affinity{ PodAffinity: &v1.PodAffinity{ diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index aad4698c87e..f88a9450913 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -178,7 +178,7 @@ func TestPreemption(t *testing.T) { name: "basic pod preemption", initTokens: maxTokens, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "victim-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -188,7 +188,7 @@ func TestPreemption(t *testing.T) { }, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -203,7 +203,7 @@ func TestPreemption(t *testing.T) { name: "basic pod preemption with filter", initTokens: 1, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "victim-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -213,7 +213,7 @@ func TestPreemption(t *testing.T) { }, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -230,7 +230,7 @@ func TestPreemption(t *testing.T) { initTokens: 1, unresolvable: true, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "victim-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -240,7 +240,7 @@ func TestPreemption(t *testing.T) { }, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -255,13 +255,13 @@ func TestPreemption(t *testing.T) { name: "preemption is performed to satisfy anti-affinity", initTokens: maxTokens, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "pod-0", Namespace: testCtx.NS.Name, Priority: &mediumPriority, Labels: map[string]string{"pod": "p0"}, Resources: defaultPodRes, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "pod-1", Namespace: testCtx.NS.Name, Priority: &lowPriority, Labels: map[string]string{"pod": "p1"}, @@ -287,7 +287,7 @@ func TestPreemption(t *testing.T) { }), }, // A higher priority pod with anti-affinity. - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -319,13 +319,13 @@ func TestPreemption(t *testing.T) { name: "preemption is not performed when anti-affinity is not satisfied", initTokens: maxTokens, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "pod-0", Namespace: testCtx.NS.Name, Priority: &mediumPriority, Labels: map[string]string{"pod": "p0"}, Resources: defaultPodRes, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "pod-1", Namespace: testCtx.NS.Name, Priority: &highPriority, Labels: map[string]string{"pod": "p1"}, @@ -351,7 +351,7 @@ func TestPreemption(t *testing.T) { }), }, // A higher priority pod with anti-affinity. - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -454,7 +454,7 @@ func TestNonPreemption(t *testing.T) { PreemptionPolicy: &preemptNever, }, } - victim := initPausePod(&pausePodConfig{ + victim := initPausePod(&testutils.PausePodConfig{ Name: "victim-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -464,7 +464,7 @@ func TestNonPreemption(t *testing.T) { }, }) - preemptor := initPausePod(&pausePodConfig{ + preemptor := initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -528,7 +528,7 @@ func TestDisablePreemption(t *testing.T) { { name: "pod preemption will not happen", existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "victim-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -538,7 +538,7 @@ func TestDisablePreemption(t *testing.T) { }, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -628,7 +628,7 @@ func TestPodPriorityResolution(t *testing.T) { Name: "SystemNodeCritical priority class", PriorityClass: scheduling.SystemNodeCritical, ExpectedPriority: scheduling.SystemCriticalPriority + 1000, - Pod: initPausePod(&pausePodConfig{ + Pod: initPausePod(&testutils.PausePodConfig{ Name: fmt.Sprintf("pod1-%v", scheduling.SystemNodeCritical), Namespace: metav1.NamespaceSystem, PriorityClassName: scheduling.SystemNodeCritical, @@ -638,7 +638,7 @@ func TestPodPriorityResolution(t *testing.T) { Name: "SystemClusterCritical priority class", PriorityClass: scheduling.SystemClusterCritical, ExpectedPriority: scheduling.SystemCriticalPriority, - Pod: initPausePod(&pausePodConfig{ + Pod: initPausePod(&testutils.PausePodConfig{ Name: fmt.Sprintf("pod2-%v", scheduling.SystemClusterCritical), Namespace: metav1.NamespaceSystem, PriorityClassName: scheduling.SystemClusterCritical, @@ -648,7 +648,7 @@ func TestPodPriorityResolution(t *testing.T) { Name: "Invalid priority class should result in error", PriorityClass: "foo", ExpectedPriority: scheduling.SystemCriticalPriority, - Pod: initPausePod(&pausePodConfig{ + Pod: initPausePod(&testutils.PausePodConfig{ Name: fmt.Sprintf("pod3-%v", scheduling.SystemClusterCritical), Namespace: metav1.NamespaceSystem, PriorityClassName: "foo", @@ -702,7 +702,7 @@ func mkPriorityPodWithGrace(tc *testutils.TestContext, name string, priority int v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, } - pod := initPausePod(&pausePodConfig{ + pod := initPausePod(&testutils.PausePodConfig{ Name: name, Namespace: tc.NS.Name, Priority: &priority, @@ -735,7 +735,7 @@ func TestPreemptionStarvation(t *testing.T) { name: "starvation test: higher priority pod is scheduled before the lower priority ones", numExistingPod: 10, numExpectedPending: 5, - preemptor: initPausePod(&pausePodConfig{ + preemptor: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -836,7 +836,7 @@ func TestPreemptionRaces(t *testing.T) { numInitialPods: 2, numAdditionalPods: 20, numRepetitions: 5, - preemptor: initPausePod(&pausePodConfig{ + preemptor: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -1198,28 +1198,28 @@ func TestPDBInPreemption(t *testing.T) { }, pdbPodNum: []int32{2}, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod1", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: defaultPodRes, Labels: map[string]string{"foo": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod2", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: defaultPodRes, Labels: map[string]string{"foo": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "mid-pod3", Namespace: testCtx.NS.Name, Priority: &mediumPriority, Resources: defaultPodRes, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -1238,7 +1238,7 @@ func TestPDBInPreemption(t *testing.T) { }, pdbPodNum: []int32{1}, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod1", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -1246,7 +1246,7 @@ func TestPDBInPreemption(t *testing.T) { NodeName: "node-1", Labels: map[string]string{"foo": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "mid-pod2", Namespace: testCtx.NS.Name, Priority: &mediumPriority, @@ -1254,7 +1254,7 @@ func TestPDBInPreemption(t *testing.T) { Resources: defaultPodRes, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -1274,7 +1274,7 @@ func TestPDBInPreemption(t *testing.T) { }, pdbPodNum: []int32{1, 5}, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod1", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -1282,14 +1282,14 @@ func TestPDBInPreemption(t *testing.T) { NodeName: "node-1", Labels: map[string]string{"foo1": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "mid-pod1", Namespace: testCtx.NS.Name, Priority: &mediumPriority, Resources: defaultPodRes, NodeName: "node-1", }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod2", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -1297,7 +1297,7 @@ func TestPDBInPreemption(t *testing.T) { NodeName: "node-2", Labels: map[string]string{"foo2": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "mid-pod2", Namespace: testCtx.NS.Name, Priority: &mediumPriority, @@ -1305,7 +1305,7 @@ func TestPDBInPreemption(t *testing.T) { NodeName: "node-2", Labels: map[string]string{"foo2": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod4", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -1313,7 +1313,7 @@ func TestPDBInPreemption(t *testing.T) { NodeName: "node-3", Labels: map[string]string{"foo2": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod5", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -1321,7 +1321,7 @@ func TestPDBInPreemption(t *testing.T) { NodeName: "node-3", Labels: map[string]string{"foo2": "bar"}, }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod6", Namespace: testCtx.NS.Name, Priority: &lowPriority, @@ -1330,7 +1330,7 @@ func TestPDBInPreemption(t *testing.T) { Labels: map[string]string{"foo2": "bar"}, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, @@ -1454,14 +1454,14 @@ func TestPreferNominatedNode(t *testing.T) { name: "nominated node released all resource, preemptor is scheduled to the nominated node", nodeNames: []string{"node-1", "node-2"}, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod1", Priority: &lowPriority, NodeName: "node-2", Resources: defaultPodRes, }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod", Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ @@ -1475,14 +1475,14 @@ func TestPreferNominatedNode(t *testing.T) { name: "nominated node cannot pass all the filters, preemptor should find a different node", nodeNames: []string{"node-1", "node-2"}, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "low-pod", Priority: &lowPriority, Resources: defaultPodRes, NodeName: "node-1", }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "preemptor-pod1", Priority: &highPriority, Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ diff --git a/test/integration/scheduler/priorities_test.go b/test/integration/scheduler/priorities_test.go index dca57d17e8f..08d84c4be9d 100644 --- a/test/integration/scheduler/priorities_test.go +++ b/test/integration/scheduler/priorities_test.go @@ -192,7 +192,7 @@ func TestNodeAffinityScoring(t *testing.T) { // Create a pod with node affinity. podName := "pod-with-node-affinity" - pod, err := runPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{ + pod, err := runPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{ Name: podName, Namespace: testCtx.NS.Name, Affinity: &v1.Affinity{ @@ -233,11 +233,11 @@ func TestPodAffinityScoring(t *testing.T) { topologyValue := "topologyvalue" tests := []struct { name string - podConfig *pausePodConfig + podConfig *testutils.PausePodConfig }{ { name: "pod affinity", - podConfig: &pausePodConfig{ + podConfig: &testutils.PausePodConfig{ Name: "pod1", Namespace: "ns1", Affinity: &v1.Affinity{ @@ -265,7 +265,7 @@ func TestPodAffinityScoring(t *testing.T) { }, { name: "pod affinity with namespace selector", - podConfig: &pausePodConfig{ + podConfig: &testutils.PausePodConfig{ Name: "pod1", Namespace: "ns2", Affinity: &v1.Affinity{ @@ -306,7 +306,7 @@ func TestPodAffinityScoring(t *testing.T) { t.Fatal(err) } // Add a pod with a label and wait for it to schedule. - _, err = runPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{ + _, err = runPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{ Name: "attractor-pod", Namespace: "ns1", Labels: map[string]string{labelKey: labelValue}, @@ -363,7 +363,7 @@ func TestImageLocalityScoring(t *testing.T) { // Create a pod with containers each having the specified image. podName := "pod-using-large-image" - pod, err := runPodWithContainers(testCtx.ClientSet, initPodWithContainers(testCtx.ClientSet, &podWithContainersConfig{ + pod, err := runPodWithContainers(testCtx.ClientSet, initPodWithContainers(testCtx.ClientSet, &testutils.PodWithContainersConfig{ Name: podName, Namespace: testCtx.NS.Name, Containers: makeContainersWithImages([]string{imageName}), diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 285788815a2..5e2a6112b31 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -218,13 +218,13 @@ func TestMultipleSchedulers(t *testing.T) { } defaultScheduler := "default-scheduler" - testPodFitsDefault, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "pod-fits-default", Namespace: testCtx.NS.Name, SchedulerName: defaultScheduler})) + testPodFitsDefault, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "pod-fits-default", Namespace: testCtx.NS.Name, SchedulerName: defaultScheduler})) if err != nil { t.Fatalf("Failed to create pod: %v", err) } fooScheduler := "foo-scheduler" - testPodFitsFoo, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "pod-fits-foo", Namespace: testCtx.NS.Name, SchedulerName: fooScheduler})) + testPodFitsFoo, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "pod-fits-foo", Namespace: testCtx.NS.Name, SchedulerName: fooScheduler})) if err != nil { t.Fatalf("Failed to create pod: %v", err) } @@ -357,7 +357,7 @@ func TestMultipleSchedulingProfiles(t *testing.T) { } defer evs.Stop() - for _, pc := range []*pausePodConfig{ + for _, pc := range []*testutils.PausePodConfig{ {Name: "foo", Namespace: testCtx.NS.Name}, {Name: "bar", Namespace: testCtx.NS.Name, SchedulerName: "unknown-scheduler"}, {Name: "baz", Namespace: testCtx.NS.Name, SchedulerName: "default-scheduler"}, @@ -503,7 +503,7 @@ func TestSchedulerInformers(t *testing.T) { name: "Pod cannot be scheduled when node is occupied by pods scheduled by other schedulers", nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}}, existingPods: []*v1.Pod{ - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "pod1", Namespace: testCtx.NS.Name, Resources: defaultPodRes, @@ -511,7 +511,7 @@ func TestSchedulerInformers(t *testing.T) { NodeName: "node-1", SchedulerName: "foo-scheduler", }), - initPausePod(&pausePodConfig{ + initPausePod(&testutils.PausePodConfig{ Name: "pod2", Namespace: testCtx.NS.Name, Resources: defaultPodRes, @@ -520,7 +520,7 @@ func TestSchedulerInformers(t *testing.T) { SchedulerName: "bar-scheduler", }), }, - pod: initPausePod(&pausePodConfig{ + pod: initPausePod(&testutils.PausePodConfig{ Name: "unschedulable-pod", Namespace: testCtx.NS.Name, Resources: defaultPodRes, @@ -562,7 +562,7 @@ func TestSchedulerInformers(t *testing.T) { // Cleanup pods = append(pods, unschedulable) testutils.CleanupPods(cs, t, pods) - cs.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) + cs.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) }) } diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 96343f1d4f1..4f1d6087217 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -17,524 +17,35 @@ limitations under the License. package scheduler import ( - "context" - "fmt" - "testing" - "time" - - v1 "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - cacheddiscovery "k8s.io/client-go/discovery/cached/memory" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/informers" - clientset "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - restclient "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" - "k8s.io/client-go/scale" - "k8s.io/kube-scheduler/config/v1beta3" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller/disruption" - "k8s.io/kubernetes/pkg/scheduler" - configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" - "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" - st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" - imageutils "k8s.io/kubernetes/test/utils/image" - "k8s.io/utils/pointer" ) -// initDisruptionController initializes and runs a Disruption Controller to properly -// update PodDisuptionBudget objects. -func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *disruption.DisruptionController { - informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour) - - discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery()) - mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) - - config := restclient.Config{Host: testCtx.HTTPServer.URL} - scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery()) - scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) - if err != nil { - t.Fatalf("Error in create scaleClient: %v", err) - } - - dc := disruption.NewDisruptionController( - informers.Core().V1().Pods(), - informers.Policy().V1().PodDisruptionBudgets(), - informers.Core().V1().ReplicationControllers(), - informers.Apps().V1().ReplicaSets(), - informers.Apps().V1().Deployments(), - informers.Apps().V1().StatefulSets(), - testCtx.ClientSet, - mapper, - scaleClient, - testCtx.ClientSet.Discovery()) - - informers.Start(testCtx.Scheduler.StopEverything) - informers.WaitForCacheSync(testCtx.Scheduler.StopEverything) - go dc.Run(testCtx.Ctx) - return dc -} - -// initTest initializes a test environment and creates API server and scheduler with default -// configuration. -func initTest(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { - testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), opts...) - testutils.SyncInformerFactory(testCtx) - go testCtx.Scheduler.Run(testCtx.Ctx) - return testCtx -} - -// initTestDisablePreemption initializes a test environment and creates API server and scheduler with default -// configuration but with pod preemption disabled. -func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestContext { - cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{ - Profiles: []v1beta3.KubeSchedulerProfile{{ - SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), - Plugins: &v1beta3.Plugins{ - PostFilter: v1beta3.PluginSet{ - Disabled: []v1beta3.Plugin{ - {Name: defaultpreemption.Name}, - }, - }, - }, - }}, - }) - testCtx := testutils.InitTestSchedulerWithOptions( - t, testutils.InitTestAPIServer(t, nsPrefix, nil), - scheduler.WithProfiles(cfg.Profiles...)) - testutils.SyncInformerFactory(testCtx) - go testCtx.Scheduler.Run(testCtx.Ctx) - return testCtx -} - -// waitForReflection waits till the passFunc confirms that the object it expects -// to see is in the store. Used to observe reflected events. -func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, - passFunc func(n interface{}) bool) error { - var nodes []*v1.Node - err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { - n, err := nodeLister.Get(key) - - switch { - case err == nil && passFunc(n): - return true, nil - case apierrors.IsNotFound(err): - nodes = append(nodes, nil) - case err != nil: - t.Errorf("Unexpected error: %v", err) - default: - nodes = append(nodes, n) - } - - return false, nil - }) - if err != nil { - t.Logf("Logging consecutive node versions received from store:") - for i, n := range nodes { - t.Logf("%d: %#v", i, n) - } - } - return err -} - -func updateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { - return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) -} - -func createNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { - return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) -} - -// createNodes creates `numNodes` nodes. The created node names will be in the -// form of "`prefix`-X" where X is an ordinal. -// DEPRECATED -// use createAndWaitForNodesInCache instead, which ensures the created nodes -// to be present in scheduler cache. -func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { - nodes := make([]*v1.Node, numNodes) - for i := 0; i < numNodes; i++ { - nodeName := fmt.Sprintf("%v-%d", prefix, i) - node, err := createNode(cs, wrapper.Name(nodeName).Obj()) - if err != nil { - return nodes[:], err - } - nodes[i] = node - } - return nodes[:], nil -} - -// createAndWaitForNodesInCache calls createNodes(), and wait for the created -// nodes to be present in scheduler cache. -func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { - existingNodes := testCtx.Scheduler.Cache.NodeCount() - nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) - if err != nil { - return nodes, fmt.Errorf("cannot create nodes: %v", err) - } - return nodes, waitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes) -} - -// waitForNodesInCache ensures at least nodes are present in scheduler cache -// within 30 seconds; otherwise returns false. -func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { - err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { - return sched.Cache.NodeCount() >= nodeCount, nil - }) - if err != nil { - return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) - } - return nil -} - -type pausePodConfig struct { - Name string - Namespace string - Affinity *v1.Affinity - Annotations, Labels, NodeSelector map[string]string - Resources *v1.ResourceRequirements - Tolerations []v1.Toleration - NodeName string - SchedulerName string - Priority *int32 - PreemptionPolicy *v1.PreemptionPolicy - PriorityClassName string -} - -// initPausePod initializes a pod API object from the given config. It is used -// mainly in pod creation process. -func initPausePod(conf *pausePodConfig) *v1.Pod { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: conf.Name, - Namespace: conf.Namespace, - Labels: conf.Labels, - Annotations: conf.Annotations, - }, - Spec: v1.PodSpec{ - NodeSelector: conf.NodeSelector, - Affinity: conf.Affinity, - Containers: []v1.Container{ - { - Name: conf.Name, - Image: imageutils.GetPauseImageName(), - }, - }, - Tolerations: conf.Tolerations, - NodeName: conf.NodeName, - SchedulerName: conf.SchedulerName, - Priority: conf.Priority, - PreemptionPolicy: conf.PreemptionPolicy, - PriorityClassName: conf.PriorityClassName, - }, - } - if conf.Resources != nil { - pod.Spec.Containers[0].Resources = *conf.Resources - } - return pod -} - -// createPausePod creates a pod with "Pause" image and the given config and -// return its pointer and error status. -func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) { - return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{}) -} - -// createPausePodWithResource creates a pod with "Pause" image and the given -// resources and returns its pointer and error status. The resource list can be -// nil. -func createPausePodWithResource(cs clientset.Interface, podName string, - nsName string, res *v1.ResourceList) (*v1.Pod, error) { - var conf pausePodConfig - if res == nil { - conf = pausePodConfig{ - Name: podName, - Namespace: nsName, - } - } else { - conf = pausePodConfig{ - Name: podName, - Namespace: nsName, - Resources: &v1.ResourceRequirements{ - Requests: *res, - }, - } - } - return createPausePod(cs, initPausePod(&conf)) -} - -// runPausePod creates a pod with "Pause" image and the given config and waits -// until it is scheduled. It returns its pointer and error status. -func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { - pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to create pause pod: %v", err) - } - if err = testutils.WaitForPodToSchedule(cs, pod); err != nil { - return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err) - } - if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { - return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err) - } - return pod, nil -} - -type podWithContainersConfig struct { - Name string - Namespace string - Containers []v1.Container -} - -// initPodWithContainers initializes a pod API object from the given config. This is used primarily for generating -// pods with containers each having a specific image. -func initPodWithContainers(cs clientset.Interface, conf *podWithContainersConfig) *v1.Pod { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: conf.Name, - Namespace: conf.Namespace, - }, - Spec: v1.PodSpec{ - Containers: conf.Containers, - }, - } - return pod -} - -// runPodWithContainers creates a pod with given config and containers and waits -// until it is scheduled. It returns its pointer and error status. -func runPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { - pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to create pod-with-containers: %v", err) - } - if err = testutils.WaitForPodToSchedule(cs, pod); err != nil { - return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err) - } - if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { - return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err) - } - return pod, nil -} - -// podIsGettingEvicted returns true if the pod's deletion timestamp is set. -func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - return false, err - } - if pod.DeletionTimestamp != nil { - return true, nil - } - return false, nil - } -} - -// podScheduledIn returns true if a given pod is placed onto one of the expected nodes. -func podScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - // This could be a connection error so we want to retry. - return false, nil - } - if pod.Spec.NodeName == "" { - return false, nil - } - for _, nodeName := range nodeNames { - if pod.Spec.NodeName == nodeName { - return true, nil - } - } - return false, nil - } -} - -// podUnschedulable returns a condition function that returns true if the given pod -// gets unschedulable status. -func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - // This could be a connection error so we want to retry. - return false, nil - } - _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) - return cond != nil && cond.Status == v1.ConditionFalse && - cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil - } -} - -// podSchedulingError returns a condition function that returns true if the given pod -// gets unschedulable status for reasons other than "Unschedulable". The scheduler -// records such reasons in case of error. -func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - // This could be a connection error so we want to retry. - return false, nil - } - _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) - return cond != nil && cond.Status == v1.ConditionFalse && - cond.Reason != v1.PodReasonUnschedulable, nil - } -} - -// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns -// an error if it does not become unschedulable within the given timeout. -func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { - return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name)) -} - -// waitForPodUnschedule waits for a pod to fail scheduling and returns -// an error if it does not become unschedulable within the timeout duration (30 seconds). -func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { - return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) -} - -// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns -// an error if it does not scheduled within the given timeout. -func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { - return wait.Poll(100*time.Millisecond, timeout, podScheduled(cs, pod.Namespace, pod.Name)) -} - -// waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to -// the expected values. -func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { - return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { - pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return false, err - } - if len(pdbList.Items) != len(pdbs) { - return false, nil - } - for i, pdb := range pdbs { - found := false - for _, cpdb := range pdbList.Items { - if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { - found = true - if cpdb.Status.CurrentHealthy != pdbPodNum[i] { - return false, nil - } - } - } - if !found { - return false, nil - } - } - return true, nil - }) -} - -// waitCachedPodsStable waits until scheduler cache has the given pods. -func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error { - return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { - cachedPods, err := testCtx.Scheduler.Cache.PodCount() - if err != nil { - return false, err - } - if len(pods) != cachedPods { - return false, nil - } - for _, p := range pods { - actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) - if err1 != nil { - return false, err1 - } - cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod) - if err2 != nil || cachedPod == nil { - return false, err2 - } - } - return true, nil - }) -} - -// deletePod deletes the given pod in the given namespace. -func deletePod(cs clientset.Interface, podName string, nsName string) error { - return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0)) -} - -func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) { - return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) -} - -// podScheduled returns true if a node is assigned to the given pod. -func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { - return func() (bool, error) { - pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - // This could be a connection error so we want to retry. - return false, nil - } - return pod.Spec.NodeName != "", nil - } -} - -func createNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error { - for _, n := range namespaces { - ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}} - if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil { - return err - } - } - return nil -} - -// timeout returns a timeout error if the given `f` function doesn't -// complete within `d` duration; otherwise it returns nil. -func timeout(ctx context.Context, d time.Duration, f func()) error { - ctx, cancel := context.WithTimeout(ctx, d) - defer cancel() - - done := make(chan struct{}) - go func() { - f() - done <- struct{}{} - }() - - select { - case <-done: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -// nextPodOrDie returns the next Pod in the scheduler queue. -// The operation needs to be completed within 5 seconds; otherwise the test gets aborted. -func nextPodOrDie(t *testing.T, testCtx *testutils.TestContext) *framework.QueuedPodInfo { - t.Helper() - - var podInfo *framework.QueuedPodInfo - // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on - // default go testing timeout (10m) to abort. - if err := timeout(testCtx.Ctx, time.Second*5, func() { - podInfo = testCtx.Scheduler.NextPod() - }); err != nil { - t.Fatalf("Timed out waiting for the Pod to be popped: %v", err) - } - return podInfo -} - -// nextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout. -func nextPod(t *testing.T, testCtx *testutils.TestContext) *framework.QueuedPodInfo { - t.Helper() - - var podInfo *framework.QueuedPodInfo - // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on - // default go testing timeout (10m) to abort. - if err := timeout(testCtx.Ctx, time.Second*5, func() { - podInfo = testCtx.Scheduler.NextPod() - }); err != nil { - return nil - } - return podInfo -} +var ( + createPausePod = testutils.CreatePausePod + createPausePodWithResource = testutils.CreatePausePodWithResource + createNode = testutils.CreateNode + initPausePod = testutils.InitPausePod + runPausePod = testutils.RunPausePod + waitForPodUnschedulable = testutils.WaitForPodUnschedulable + waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout + waitCachedPodsStable = testutils.WaitCachedPodsStable + waitForPDBsStable = testutils.WaitForPDBsStable + waitForReflection = testutils.WaitForReflection + waitForNodesInCache = testutils.WaitForNodesInCache + createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache + getPod = testutils.GetPod + deletePod = testutils.DeletePod + updateNode = testutils.UpdateNode + podSchedulingError = testutils.PodSchedulingError + podScheduledIn = testutils.PodScheduledIn + podUnschedulable = testutils.PodUnschedulable + podIsGettingEvicted = testutils.PodIsGettingEvicted + initTest = testutils.InitTestSchedulerWithNS + initTestDisablePreemption = testutils.InitTestDisablePreemption + initDisruptionController = testutils.InitDisruptionController + createNamespacesWithLabels = testutils.CreateNamespacesWithLabels + runPodWithContainers = testutils.RunPodWithContainers + initPodWithContainers = testutils.InitPodWithContainers + nextPodOrDie = testutils.NextPodOrDie + nextPod = testutils.NextPod +) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 7b34f2dc341..dfb6ca265bc 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -26,27 +26,41 @@ import ( "time" v1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" + cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" pvutil "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" + "k8s.io/kube-scheduler/config/v1beta3" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/scheduler" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" "k8s.io/kubernetes/pkg/scheduler/profile" + st "k8s.io/kubernetes/pkg/scheduler/testing" taintutils "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/test/integration/framework" + imageutils "k8s.io/kubernetes/test/utils/image" + "k8s.io/utils/pointer" ) // ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module @@ -456,3 +470,483 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond return true, nil } } + +// InitDisruptionController initializes and runs a Disruption Controller to properly +// update PodDisuptionBudget objects. +func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController { + informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour) + + discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery()) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + + config := restclient.Config{Host: testCtx.HTTPServer.URL} + scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery()) + scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) + if err != nil { + t.Fatalf("Error in create scaleClient: %v", err) + } + + dc := disruption.NewDisruptionController( + informers.Core().V1().Pods(), + informers.Policy().V1().PodDisruptionBudgets(), + informers.Core().V1().ReplicationControllers(), + informers.Apps().V1().ReplicaSets(), + informers.Apps().V1().Deployments(), + informers.Apps().V1().StatefulSets(), + testCtx.ClientSet, + mapper, + scaleClient, + testCtx.ClientSet.Discovery()) + + informers.Start(testCtx.Scheduler.StopEverything) + informers.WaitForCacheSync(testCtx.Scheduler.StopEverything) + go dc.Run(testCtx.Ctx) + return dc +} + +// InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default +// configuration. +func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext { + testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), opts...) + SyncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + return testCtx +} + +// InitTestDisablePreemption initializes a test environment and creates API server and scheduler with default +// configuration but with pod preemption disabled. +func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { + cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{ + Profiles: []v1beta3.KubeSchedulerProfile{{ + SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), + Plugins: &v1beta3.Plugins{ + PostFilter: v1beta3.PluginSet{ + Disabled: []v1beta3.Plugin{ + {Name: defaultpreemption.Name}, + }, + }, + }, + }}, + }) + testCtx := InitTestSchedulerWithOptions( + t, InitTestAPIServer(t, nsPrefix, nil), + scheduler.WithProfiles(cfg.Profiles...)) + SyncInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + return testCtx +} + +// WaitForReflection waits till the passFunc confirms that the object it expects +// to see is in the store. Used to observe reflected events. +func WaitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, + passFunc func(n interface{}) bool) error { + var nodes []*v1.Node + err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { + n, err := nodeLister.Get(key) + + switch { + case err == nil && passFunc(n): + return true, nil + case apierrors.IsNotFound(err): + nodes = append(nodes, nil) + case err != nil: + t.Errorf("Unexpected error: %v", err) + default: + nodes = append(nodes, n) + } + + return false, nil + }) + if err != nil { + t.Logf("Logging consecutive node versions received from store:") + for i, n := range nodes { + t.Logf("%d: %#v", i, n) + } + } + return err +} + +func UpdateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { + return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) +} + +func CreateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { + return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) +} + +func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { + nodes := make([]*v1.Node, numNodes) + for i := 0; i < numNodes; i++ { + nodeName := fmt.Sprintf("%v-%d", prefix, i) + node, err := CreateNode(cs, wrapper.Name(nodeName).Obj()) + if err != nil { + return nodes[:], err + } + nodes[i] = node + } + return nodes[:], nil +} + +// CreateAndWaitForNodesInCache calls createNodes(), and wait for the created +// nodes to be present in scheduler cache. +func CreateAndWaitForNodesInCache(testCtx *TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { + existingNodes := testCtx.Scheduler.Cache.NodeCount() + nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) + if err != nil { + return nodes, fmt.Errorf("cannot create nodes: %v", err) + } + return nodes, WaitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes) +} + +// WaitForNodesInCache ensures at least nodes are present in scheduler cache +// within 30 seconds; otherwise returns false. +func WaitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { + err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return sched.Cache.NodeCount() >= nodeCount, nil + }) + if err != nil { + return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) + } + return nil +} + +type PausePodConfig struct { + Name string + Namespace string + Affinity *v1.Affinity + Annotations, Labels, NodeSelector map[string]string + Resources *v1.ResourceRequirements + Tolerations []v1.Toleration + NodeName string + SchedulerName string + Priority *int32 + PreemptionPolicy *v1.PreemptionPolicy + PriorityClassName string +} + +// InitPausePod initializes a pod API object from the given config. It is used +// mainly in pod creation process. +func InitPausePod(conf *PausePodConfig) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: conf.Name, + Namespace: conf.Namespace, + Labels: conf.Labels, + Annotations: conf.Annotations, + }, + Spec: v1.PodSpec{ + NodeSelector: conf.NodeSelector, + Affinity: conf.Affinity, + Containers: []v1.Container{ + { + Name: conf.Name, + Image: imageutils.GetPauseImageName(), + }, + }, + Tolerations: conf.Tolerations, + NodeName: conf.NodeName, + SchedulerName: conf.SchedulerName, + Priority: conf.Priority, + PreemptionPolicy: conf.PreemptionPolicy, + PriorityClassName: conf.PriorityClassName, + }, + } + if conf.Resources != nil { + pod.Spec.Containers[0].Resources = *conf.Resources + } + return pod +} + +// CreatePausePod creates a pod with "Pause" image and the given config and +// return its pointer and error status. +func CreatePausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) { + return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{}) +} + +// CreatePausePodWithResource creates a pod with "Pause" image and the given +// resources and returns its pointer and error status. The resource list can be +// nil. +func CreatePausePodWithResource(cs clientset.Interface, podName string, + nsName string, res *v1.ResourceList) (*v1.Pod, error) { + var conf PausePodConfig + if res == nil { + conf = PausePodConfig{ + Name: podName, + Namespace: nsName, + } + } else { + conf = PausePodConfig{ + Name: podName, + Namespace: nsName, + Resources: &v1.ResourceRequirements{ + Requests: *res, + }, + } + } + return CreatePausePod(cs, InitPausePod(&conf)) +} + +// RunPausePod creates a pod with "Pause" image and the given config and waits +// until it is scheduled. It returns its pointer and error status. +func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { + pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create pause pod: %v", err) + } + if err = WaitForPodToSchedule(cs, pod); err != nil { + return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err) + } + if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { + return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err) + } + return pod, nil +} + +type PodWithContainersConfig struct { + Name string + Namespace string + Containers []v1.Container +} + +// InitPodWithContainers initializes a pod API object from the given config. This is used primarily for generating +// pods with containers each having a specific image. +func InitPodWithContainers(cs clientset.Interface, conf *PodWithContainersConfig) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: conf.Name, + Namespace: conf.Namespace, + }, + Spec: v1.PodSpec{ + Containers: conf.Containers, + }, + } + return pod +} + +// RunPodWithContainers creates a pod with given config and containers and waits +// until it is scheduled. It returns its pointer and error status. +func RunPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) { + pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create pod-with-containers: %v", err) + } + if err = WaitForPodToSchedule(cs, pod); err != nil { + return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err) + } + if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { + return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err) + } + return pod, nil +} + +// PodIsGettingEvicted returns true if the pod's deletion timestamp is set. +func PodIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if pod.DeletionTimestamp != nil { + return true, nil + } + return false, nil + } +} + +// PodScheduledIn returns true if a given pod is placed onto one of the expected nodes. +func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + if pod.Spec.NodeName == "" { + return false, nil + } + for _, nodeName := range nodeNames { + if pod.Spec.NodeName == nodeName { + return true, nil + } + } + return false, nil + } +} + +// PodUnschedulable returns a condition function that returns true if the given pod +// gets unschedulable status. +func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond != nil && cond.Status == v1.ConditionFalse && + cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil + } +} + +// PodSchedulingError returns a condition function that returns true if the given pod +// gets unschedulable status for reasons other than "Unschedulable". The scheduler +// records such reasons in case of error. +func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) + return cond != nil && cond.Status == v1.ConditionFalse && + cond.Reason != v1.PodReasonUnschedulable, nil + } +} + +// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns +// an error if it does not become unschedulable within the given timeout. +func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { + return wait.Poll(100*time.Millisecond, timeout, PodUnschedulable(cs, pod.Namespace, pod.Name)) +} + +// waitForPodUnschedule waits for a pod to fail scheduling and returns +// an error if it does not become unschedulable within the timeout duration (30 seconds). +func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error { + return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second) +} + +// WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to +// the expected values. +func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error { + return wait.Poll(time.Second, 60*time.Second, func() (bool, error) { + pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + if len(pdbList.Items) != len(pdbs) { + return false, nil + } + for i, pdb := range pdbs { + found := false + for _, cpdb := range pdbList.Items { + if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace { + found = true + if cpdb.Status.CurrentHealthy != pdbPodNum[i] { + return false, nil + } + } + } + if !found { + return false, nil + } + } + return true, nil + }) +} + +// WaitCachedPodsStable waits until scheduler cache has the given pods. +func WaitCachedPodsStable(testCtx *TestContext, pods []*v1.Pod) error { + return wait.Poll(time.Second, 30*time.Second, func() (bool, error) { + cachedPods, err := testCtx.Scheduler.Cache.PodCount() + if err != nil { + return false, err + } + if len(pods) != cachedPods { + return false, nil + } + for _, p := range pods { + actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) + if err1 != nil { + return false, err1 + } + cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod) + if err2 != nil || cachedPod == nil { + return false, err2 + } + } + return true, nil + }) +} + +// DeletePod deletes the given pod in the given namespace. +func DeletePod(cs clientset.Interface, podName string, nsName string) error { + return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0)) +} + +func GetPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) { + return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) +} + +// podScheduled returns true if a node is assigned to the given pod. +func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + return false, nil + } + return pod.Spec.NodeName != "", nil + } +} + +func CreateNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error { + for _, n := range namespaces { + ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}} + if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil { + return err + } + } + return nil +} + +// timeout returns a timeout error if the given `f` function doesn't +// complete within `d` duration; otherwise it returns nil. +func timeout(ctx context.Context, d time.Duration, f func()) error { + ctx, cancel := context.WithTimeout(ctx, d) + defer cancel() + + done := make(chan struct{}) + go func() { + f() + done <- struct{}{} + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// NextPodOrDie returns the next Pod in the scheduler queue. +// The operation needs to be completed within 5 seconds; otherwise the test gets aborted. +func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo { + t.Helper() + + var podInfo *schedulerframework.QueuedPodInfo + // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on + // default go testing timeout (10m) to abort. + if err := timeout(testCtx.Ctx, time.Second*5, func() { + podInfo = testCtx.Scheduler.NextPod() + }); err != nil { + t.Fatalf("Timed out waiting for the Pod to be popped: %v", err) + } + return podInfo +} + +// NextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout. +func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo { + t.Helper() + + var podInfo *schedulerframework.QueuedPodInfo + // NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on + // default go testing timeout (10m) to abort. + if err := timeout(testCtx.Ctx, time.Second*5, func() { + podInfo = testCtx.Scheduler.NextPod() + }); err != nil { + return nil + } + return podInfo +}