Merge pull request #109834 from aojea/split_scheduling_integration

integration: refactor and split scheduler tests
This commit is contained in:
Kubernetes Prow Robot 2022-05-06 09:07:32 -07:00 committed by GitHub
commit 83b25e371f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 828 additions and 611 deletions

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package extender
// This file tests scheduler extender.
@ -40,6 +40,11 @@ import (
imageutils "k8s.io/kubernetes/test/utils/image"
)
// imported from testutils
var (
createNode = testutils.CreateNode
)
const (
filter = "filter"
prioritize = "prioritize"

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package extender
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package filters
import (
"context"
@ -36,6 +36,20 @@ import (
"k8s.io/utils/pointer"
)
var (
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
createNamespacesWithLabels = testutils.CreateNamespacesWithLabels
createNode = testutils.CreateNode
createPausePod = testutils.CreatePausePod
deletePod = testutils.DeletePod
getPod = testutils.GetPod
initPausePod = testutils.InitPausePod
initTest = testutils.InitTestSchedulerWithNS
podScheduledIn = testutils.PodScheduledIn
podUnschedulable = testutils.PodUnschedulable
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
)
// This file tests the scheduler predicates functionality.
const pollInterval = 100 * time.Millisecond
@ -1276,19 +1290,18 @@ func TestPodTopologySpreadFilter(t *testing.T) {
var (
hardSpread = v1.DoNotSchedule
softSpread = v1.ScheduleAnyway
)
func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
tests := []struct {
name string
init func(kubernetes.Interface, string) error
pod *pausePodConfig
pod *testutils.PausePodConfig
update func(kubernetes.Interface, string) error
}{
{
name: "node gets added",
pod: &pausePodConfig{
pod: &testutils.PausePodConfig{
Name: "pod-1",
},
update: func(cs kubernetes.Interface, _ string) error {
@ -1312,7 +1325,7 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
}
return nil
},
pod: &pausePodConfig{
pod: &testutils.PausePodConfig{
Name: "pod-1",
},
update: func(cs kubernetes.Interface, _ string) error {
@ -1331,13 +1344,13 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
if err != nil {
return fmt.Errorf("cannot create node: %v", err)
}
_, err = createPausePod(cs, initPausePod(&pausePodConfig{Name: "pod-to-be-deleted", Namespace: ns}))
_, err = createPausePod(cs, initPausePod(&testutils.PausePodConfig{Name: "pod-to-be-deleted", Namespace: ns}))
if err != nil {
return fmt.Errorf("cannot create pod: %v", err)
}
return nil
},
pod: &pausePodConfig{
pod: &testutils.PausePodConfig{
Name: "pod-1",
},
update: func(cs kubernetes.Interface, ns string) error {
@ -1356,7 +1369,7 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
}
return nil
},
pod: &pausePodConfig{
pod: &testutils.PausePodConfig{
Name: "pod-1",
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
@ -1374,7 +1387,7 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
},
},
update: func(cs kubernetes.Interface, ns string) error {
podConfig := &pausePodConfig{
podConfig := &testutils.PausePodConfig{
Name: "pod-with-affinity",
Namespace: ns,
Labels: map[string]string{
@ -1394,12 +1407,12 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
if err != nil {
return fmt.Errorf("cannot create node: %v", err)
}
if _, err := createPausePod(cs, initPausePod(&pausePodConfig{Name: "pod-to-be-updated", Namespace: ns})); err != nil {
if _, err := createPausePod(cs, initPausePod(&testutils.PausePodConfig{Name: "pod-to-be-updated", Namespace: ns})); err != nil {
return fmt.Errorf("cannot create pod: %v", err)
}
return nil
},
pod: &pausePodConfig{
pod: &testutils.PausePodConfig{
Name: "pod-1",
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package plugins
import (
"context"
@ -47,6 +47,19 @@ import (
"k8s.io/utils/pointer"
)
// imported from testutils
var (
createPausePod = testutils.CreatePausePod
initPausePod = testutils.InitPausePod
getPod = testutils.GetPod
deletePod = testutils.DeletePod
podUnschedulable = testutils.PodUnschedulable
podSchedulingError = testutils.PodSchedulingError
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout
)
type PreFilterPlugin struct {
numPreFilterCalled int
failPreFilter bool
@ -563,7 +576,7 @@ func TestPreFilterPlugin(t *testing.T) {
preFilterPlugin.rejectPreFilter = test.reject
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -704,7 +717,7 @@ func TestPostFilterPlugin(t *testing.T) {
defer testutils.CleanupTest(t, testCtx)
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -771,7 +784,7 @@ func TestScorePlugin(t *testing.T) {
scorePlugin.failScore = test.fail
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Fatalf("Error while creating a test pod: %v", err)
}
@ -817,7 +830,7 @@ func TestNormalizeScorePlugin(t *testing.T) {
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Fatalf("Error while creating a test pod: %v", err)
}
@ -867,7 +880,7 @@ func TestReservePluginReserve(t *testing.T) {
reservePlugin.failReserve = test.fail
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1000,7 +1013,7 @@ func TestPrebindPlugin(t *testing.T) {
preBindPlugin.succeedOnRetry = test.succeedOnRetry
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1146,7 +1159,7 @@ func TestUnReserveReservePlugins(t *testing.T) {
// Create a best effort pod.
podName := "test-pod"
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1242,7 +1255,7 @@ func TestUnReservePermitPlugins(t *testing.T) {
// Create a best effort pod.
podName := "test-pod"
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1318,7 +1331,7 @@ func TestUnReservePreBindPlugins(t *testing.T) {
// Create a pause pod.
podName := "test-pod"
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1394,7 +1407,7 @@ func TestUnReserveBindPlugins(t *testing.T) {
// Create a pause pod.
podName := "test-pod"
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1539,7 +1552,7 @@ func TestBindPlugin(t *testing.T) {
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1649,7 +1662,7 @@ func TestPostBindPlugin(t *testing.T) {
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1746,7 +1759,7 @@ func TestPermitPlugin(t *testing.T) {
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1796,7 +1809,7 @@ func TestMultiplePermitPlugins(t *testing.T) {
// Create a test pod.
podName := "test-pod"
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1851,7 +1864,7 @@ func TestPermitPluginsCancelled(t *testing.T) {
// Create a test pod.
podName := "test-pod"
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -1914,12 +1927,12 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
// Create two pods. First pod to enter Permit() will wait and a second one will either
// reject or allow first one.
podA, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating the first pod: %v", err)
}
podB, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating the second pod: %v", err)
}
@ -1991,7 +2004,7 @@ func TestFilterPlugin(t *testing.T) {
filterPlugin.failFilter = test.fail
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
@ -2049,7 +2062,7 @@ func TestPreScorePlugin(t *testing.T) {
preScorePlugin.failPreScore = test.fail
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -16,7 +16,7 @@ limitations under the License.
// This file tests preemption functionality of the scheduler.
package scheduler
package preemption
import (
"context"
@ -26,7 +26,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -52,6 +52,26 @@ import (
"k8s.io/utils/pointer"
)
// imported from testutils
var (
initPausePod = testutils.InitPausePod
createNode = testutils.CreateNode
createPausePod = testutils.CreatePausePod
runPausePod = testutils.RunPausePod
deletePod = testutils.DeletePod
initTest = testutils.InitTestSchedulerWithNS
initTestDisablePreemption = testutils.InitTestDisablePreemption
initDisruptionController = testutils.InitDisruptionController
waitCachedPodsStable = testutils.WaitCachedPodsStable
podIsGettingEvicted = testutils.PodIsGettingEvicted
podUnschedulable = testutils.PodUnschedulable
waitForPDBsStable = testutils.WaitForPDBsStable
waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
)
const filterPluginName = "filter-plugin"
var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300)
func waitForNominatedNodeNameWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
@ -178,7 +198,7 @@ func TestPreemption(t *testing.T) {
name: "basic pod preemption",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -188,7 +208,7 @@ func TestPreemption(t *testing.T) {
},
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -203,7 +223,7 @@ func TestPreemption(t *testing.T) {
name: "basic pod preemption with filter",
initTokens: 1,
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -213,7 +233,7 @@ func TestPreemption(t *testing.T) {
},
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -230,7 +250,7 @@ func TestPreemption(t *testing.T) {
initTokens: 1,
unresolvable: true,
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -240,7 +260,7 @@ func TestPreemption(t *testing.T) {
},
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -255,13 +275,13 @@ func TestPreemption(t *testing.T) {
name: "preemption is performed to satisfy anti-affinity",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "pod-0", Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
Labels: map[string]string{"pod": "p0"},
Resources: defaultPodRes,
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "pod-1", Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Labels: map[string]string{"pod": "p1"},
@ -287,7 +307,7 @@ func TestPreemption(t *testing.T) {
}),
},
// A higher priority pod with anti-affinity.
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -319,13 +339,13 @@ func TestPreemption(t *testing.T) {
name: "preemption is not performed when anti-affinity is not satisfied",
initTokens: maxTokens,
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "pod-0", Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
Labels: map[string]string{"pod": "p0"},
Resources: defaultPodRes,
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "pod-1", Namespace: testCtx.NS.Name,
Priority: &highPriority,
Labels: map[string]string{"pod": "p1"},
@ -351,7 +371,7 @@ func TestPreemption(t *testing.T) {
}),
},
// A higher priority pod with anti-affinity.
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -454,7 +474,7 @@ func TestNonPreemption(t *testing.T) {
PreemptionPolicy: &preemptNever,
},
}
victim := initPausePod(&pausePodConfig{
victim := initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -464,7 +484,7 @@ func TestNonPreemption(t *testing.T) {
},
})
preemptor := initPausePod(&pausePodConfig{
preemptor := initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -528,7 +548,7 @@ func TestDisablePreemption(t *testing.T) {
{
name: "pod preemption will not happen",
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -538,7 +558,7 @@ func TestDisablePreemption(t *testing.T) {
},
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -628,7 +648,7 @@ func TestPodPriorityResolution(t *testing.T) {
Name: "SystemNodeCritical priority class",
PriorityClass: scheduling.SystemNodeCritical,
ExpectedPriority: scheduling.SystemCriticalPriority + 1000,
Pod: initPausePod(&pausePodConfig{
Pod: initPausePod(&testutils.PausePodConfig{
Name: fmt.Sprintf("pod1-%v", scheduling.SystemNodeCritical),
Namespace: metav1.NamespaceSystem,
PriorityClassName: scheduling.SystemNodeCritical,
@ -638,7 +658,7 @@ func TestPodPriorityResolution(t *testing.T) {
Name: "SystemClusterCritical priority class",
PriorityClass: scheduling.SystemClusterCritical,
ExpectedPriority: scheduling.SystemCriticalPriority,
Pod: initPausePod(&pausePodConfig{
Pod: initPausePod(&testutils.PausePodConfig{
Name: fmt.Sprintf("pod2-%v", scheduling.SystemClusterCritical),
Namespace: metav1.NamespaceSystem,
PriorityClassName: scheduling.SystemClusterCritical,
@ -648,7 +668,7 @@ func TestPodPriorityResolution(t *testing.T) {
Name: "Invalid priority class should result in error",
PriorityClass: "foo",
ExpectedPriority: scheduling.SystemCriticalPriority,
Pod: initPausePod(&pausePodConfig{
Pod: initPausePod(&testutils.PausePodConfig{
Name: fmt.Sprintf("pod3-%v", scheduling.SystemClusterCritical),
Namespace: metav1.NamespaceSystem,
PriorityClassName: "foo",
@ -702,7 +722,7 @@ func mkPriorityPodWithGrace(tc *testutils.TestContext, name string, priority int
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)},
}
pod := initPausePod(&pausePodConfig{
pod := initPausePod(&testutils.PausePodConfig{
Name: name,
Namespace: tc.NS.Name,
Priority: &priority,
@ -735,7 +755,7 @@ func TestPreemptionStarvation(t *testing.T) {
name: "starvation test: higher priority pod is scheduled before the lower priority ones",
numExistingPod: 10,
numExpectedPending: 5,
preemptor: initPausePod(&pausePodConfig{
preemptor: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -836,7 +856,7 @@ func TestPreemptionRaces(t *testing.T) {
numInitialPods: 2,
numAdditionalPods: 20,
numRepetitions: 5,
preemptor: initPausePod(&pausePodConfig{
preemptor: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -1198,28 +1218,28 @@ func TestPDBInPreemption(t *testing.T) {
},
pdbPodNum: []int32{2},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod2",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Resources: defaultPodRes,
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "mid-pod3",
Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -1238,7 +1258,7 @@ func TestPDBInPreemption(t *testing.T) {
},
pdbPodNum: []int32{1},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -1246,7 +1266,7 @@ func TestPDBInPreemption(t *testing.T) {
NodeName: "node-1",
Labels: map[string]string{"foo": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "mid-pod2",
Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
@ -1254,7 +1274,7 @@ func TestPDBInPreemption(t *testing.T) {
Resources: defaultPodRes,
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -1274,7 +1294,7 @@ func TestPDBInPreemption(t *testing.T) {
},
pdbPodNum: []int32{1, 5},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod1",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -1282,14 +1302,14 @@ func TestPDBInPreemption(t *testing.T) {
NodeName: "node-1",
Labels: map[string]string{"foo1": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "mid-pod1",
Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
Resources: defaultPodRes,
NodeName: "node-1",
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod2",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -1297,7 +1317,7 @@ func TestPDBInPreemption(t *testing.T) {
NodeName: "node-2",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "mid-pod2",
Namespace: testCtx.NS.Name,
Priority: &mediumPriority,
@ -1305,7 +1325,7 @@ func TestPDBInPreemption(t *testing.T) {
NodeName: "node-2",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod4",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -1313,7 +1333,7 @@ func TestPDBInPreemption(t *testing.T) {
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod5",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -1321,7 +1341,7 @@ func TestPDBInPreemption(t *testing.T) {
NodeName: "node-3",
Labels: map[string]string{"foo2": "bar"},
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod6",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
@ -1330,7 +1350,7 @@ func TestPDBInPreemption(t *testing.T) {
Labels: map[string]string{"foo2": "bar"},
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
@ -1374,7 +1394,7 @@ func TestPDBInPreemption(t *testing.T) {
// Create PDBs.
for _, pdb := range test.pdbs {
_, err := testCtx.ClientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).Create(context.TODO(), pdb, metav1.CreateOptions{})
_, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create PDB: %v", err)
}
@ -1411,7 +1431,7 @@ func TestPDBInPreemption(t *testing.T) {
// Cleanup
pods = append(pods, preemptor)
testutils.CleanupPods(cs, t, pods)
cs.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
cs.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
})
}
@ -1454,14 +1474,14 @@ func TestPreferNominatedNode(t *testing.T) {
name: "nominated node released all resource, preemptor is scheduled to the nominated node",
nodeNames: []string{"node-1", "node-2"},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod1",
Priority: &lowPriority,
NodeName: "node-2",
Resources: defaultPodRes,
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
@ -1475,14 +1495,14 @@ func TestPreferNominatedNode(t *testing.T) {
name: "nominated node cannot pass all the filters, preemptor should find a different node",
nodeNames: []string{"node-1", "node-2"},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "low-pod",
Priority: &lowPriority,
Resources: defaultPodRes,
NodeName: "node-1",
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod1",
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{

View File

@ -218,13 +218,13 @@ func TestMultipleSchedulers(t *testing.T) {
}
defaultScheduler := "default-scheduler"
testPodFitsDefault, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "pod-fits-default", Namespace: testCtx.NS.Name, SchedulerName: defaultScheduler}))
testPodFitsDefault, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "pod-fits-default", Namespace: testCtx.NS.Name, SchedulerName: defaultScheduler}))
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
fooScheduler := "foo-scheduler"
testPodFitsFoo, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "pod-fits-foo", Namespace: testCtx.NS.Name, SchedulerName: fooScheduler}))
testPodFitsFoo, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "pod-fits-foo", Namespace: testCtx.NS.Name, SchedulerName: fooScheduler}))
if err != nil {
t.Fatalf("Failed to create pod: %v", err)
}
@ -357,7 +357,7 @@ func TestMultipleSchedulingProfiles(t *testing.T) {
}
defer evs.Stop()
for _, pc := range []*pausePodConfig{
for _, pc := range []*testutils.PausePodConfig{
{Name: "foo", Namespace: testCtx.NS.Name},
{Name: "bar", Namespace: testCtx.NS.Name, SchedulerName: "unknown-scheduler"},
{Name: "baz", Namespace: testCtx.NS.Name, SchedulerName: "default-scheduler"},
@ -503,7 +503,7 @@ func TestSchedulerInformers(t *testing.T) {
name: "Pod cannot be scheduled when node is occupied by pods scheduled by other schedulers",
nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
existingPods: []*v1.Pod{
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "pod1",
Namespace: testCtx.NS.Name,
Resources: defaultPodRes,
@ -511,7 +511,7 @@ func TestSchedulerInformers(t *testing.T) {
NodeName: "node-1",
SchedulerName: "foo-scheduler",
}),
initPausePod(&pausePodConfig{
initPausePod(&testutils.PausePodConfig{
Name: "pod2",
Namespace: testCtx.NS.Name,
Resources: defaultPodRes,
@ -520,7 +520,7 @@ func TestSchedulerInformers(t *testing.T) {
SchedulerName: "bar-scheduler",
}),
},
pod: initPausePod(&pausePodConfig{
pod: initPausePod(&testutils.PausePodConfig{
Name: "unschedulable-pod",
Namespace: testCtx.NS.Name,
Resources: defaultPodRes,
@ -562,7 +562,7 @@ func TestSchedulerInformers(t *testing.T) {
// Cleanup
pods = append(pods, unschedulable)
testutils.CleanupPods(cs, t, pods)
cs.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
cs.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
})
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scoring
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package scoring
import (
"context"
"fmt"
"strings"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -42,8 +43,27 @@ import (
"k8s.io/utils/pointer"
)
// imported from testutils
var (
runPausePod = testutils.RunPausePod
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
createNode = testutils.CreateNode
createNamespacesWithLabels = testutils.CreateNamespacesWithLabels
runPodWithContainers = testutils.RunPodWithContainers
initPausePod = testutils.InitPausePod
initPodWithContainers = testutils.InitPodWithContainers
podScheduledIn = testutils.PodScheduledIn
podUnschedulable = testutils.PodUnschedulable
)
var (
hardSpread = v1.DoNotSchedule
softSpread = v1.ScheduleAnyway
)
const (
resourceGPU = "example.com/gpu"
resourceGPU = "example.com/gpu"
pollInterval = 100 * time.Millisecond
)
// This file tests the scheduler priority functions.
@ -192,7 +212,7 @@ func TestNodeAffinityScoring(t *testing.T) {
// Create a pod with node affinity.
podName := "pod-with-node-affinity"
pod, err := runPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{
pod, err := runPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{
Name: podName,
Namespace: testCtx.NS.Name,
Affinity: &v1.Affinity{
@ -233,11 +253,11 @@ func TestPodAffinityScoring(t *testing.T) {
topologyValue := "topologyvalue"
tests := []struct {
name string
podConfig *pausePodConfig
podConfig *testutils.PausePodConfig
}{
{
name: "pod affinity",
podConfig: &pausePodConfig{
podConfig: &testutils.PausePodConfig{
Name: "pod1",
Namespace: "ns1",
Affinity: &v1.Affinity{
@ -265,7 +285,7 @@ func TestPodAffinityScoring(t *testing.T) {
},
{
name: "pod affinity with namespace selector",
podConfig: &pausePodConfig{
podConfig: &testutils.PausePodConfig{
Name: "pod1",
Namespace: "ns2",
Affinity: &v1.Affinity{
@ -306,7 +326,7 @@ func TestPodAffinityScoring(t *testing.T) {
t.Fatal(err)
}
// Add a pod with a label and wait for it to schedule.
_, err = runPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{
_, err = runPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{
Name: "attractor-pod",
Namespace: "ns1",
Labels: map[string]string{labelKey: labelValue},
@ -363,7 +383,7 @@ func TestImageLocalityScoring(t *testing.T) {
// Create a pod with containers each having the specified image.
podName := "pod-using-large-image"
pod, err := runPodWithContainers(testCtx.ClientSet, initPodWithContainers(testCtx.ClientSet, &podWithContainersConfig{
pod, err := runPodWithContainers(testCtx.ClientSet, initPodWithContainers(testCtx.ClientSet, &testutils.PodWithContainersConfig{
Name: podName,
Namespace: testCtx.NS.Name,
Containers: makeContainersWithImages([]string{imageName}),

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package taint
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
package taint
// This file tests the Taint feature.
@ -37,6 +37,11 @@ import (
testutils "k8s.io/kubernetes/test/integration/util"
)
// imported from testutils
var (
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
)
func newPod(nsName, name string, req, limit v1.ResourceList) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{

View File

@ -17,524 +17,21 @@ limitations under the License.
package scheduler
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/kube-scheduler/config/v1beta3"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/scheduler"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/pointer"
)
// initDisruptionController initializes and runs a Disruption Controller to properly
// update PodDisuptionBudget objects.
func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *disruption.DisruptionController {
informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
config := restclient.Config{Host: testCtx.HTTPServer.URL}
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery())
scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
t.Fatalf("Error in create scaleClient: %v", err)
}
dc := disruption.NewDisruptionController(
informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
informers.Apps().V1().ReplicaSets(),
informers.Apps().V1().Deployments(),
informers.Apps().V1().StatefulSets(),
testCtx.ClientSet,
mapper,
scaleClient,
testCtx.ClientSet.Discovery())
informers.Start(testCtx.Scheduler.StopEverything)
informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
go dc.Run(testCtx.Ctx)
return dc
}
// initTest initializes a test environment and creates API server and scheduler with default
// configuration.
func initTest(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext {
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), opts...)
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
}
// initTestDisablePreemption initializes a test environment and creates API server and scheduler with default
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestContext {
cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{
Profiles: []v1beta3.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &v1beta3.Plugins{
PostFilter: v1beta3.PluginSet{
Disabled: []v1beta3.Plugin{
{Name: defaultpreemption.Name},
},
},
},
}},
})
testCtx := testutils.InitTestSchedulerWithOptions(
t, testutils.InitTestAPIServer(t, nsPrefix, nil),
scheduler.WithProfiles(cfg.Profiles...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
}
// waitForReflection waits till the passFunc confirms that the object it expects
// to see is in the store. Used to observe reflected events.
func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
passFunc func(n interface{}) bool) error {
var nodes []*v1.Node
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
n, err := nodeLister.Get(key)
switch {
case err == nil && passFunc(n):
return true, nil
case apierrors.IsNotFound(err):
nodes = append(nodes, nil)
case err != nil:
t.Errorf("Unexpected error: %v", err)
default:
nodes = append(nodes, n)
}
return false, nil
})
if err != nil {
t.Logf("Logging consecutive node versions received from store:")
for i, n := range nodes {
t.Logf("%d: %#v", i, n)
}
}
return err
}
func updateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
}
func createNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
}
// createNodes creates `numNodes` nodes. The created node names will be in the
// form of "`prefix`-X" where X is an ordinal.
// DEPRECATED
// use createAndWaitForNodesInCache instead, which ensures the created nodes
// to be present in scheduler cache.
func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
nodes := make([]*v1.Node, numNodes)
for i := 0; i < numNodes; i++ {
nodeName := fmt.Sprintf("%v-%d", prefix, i)
node, err := createNode(cs, wrapper.Name(nodeName).Obj())
if err != nil {
return nodes[:], err
}
nodes[i] = node
}
return nodes[:], nil
}
// createAndWaitForNodesInCache calls createNodes(), and wait for the created
// nodes to be present in scheduler cache.
func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
existingNodes := testCtx.Scheduler.Cache.NodeCount()
nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
if err != nil {
return nodes, fmt.Errorf("cannot create nodes: %v", err)
}
return nodes, waitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
}
// waitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache
// within 30 seconds; otherwise returns false.
func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return sched.Cache.NodeCount() >= nodeCount, nil
})
if err != nil {
return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
}
return nil
}
type pausePodConfig struct {
Name string
Namespace string
Affinity *v1.Affinity
Annotations, Labels, NodeSelector map[string]string
Resources *v1.ResourceRequirements
Tolerations []v1.Toleration
NodeName string
SchedulerName string
Priority *int32
PreemptionPolicy *v1.PreemptionPolicy
PriorityClassName string
}
// initPausePod initializes a pod API object from the given config. It is used
// mainly in pod creation process.
func initPausePod(conf *pausePodConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
Labels: conf.Labels,
Annotations: conf.Annotations,
},
Spec: v1.PodSpec{
NodeSelector: conf.NodeSelector,
Affinity: conf.Affinity,
Containers: []v1.Container{
{
Name: conf.Name,
Image: imageutils.GetPauseImageName(),
},
},
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
SchedulerName: conf.SchedulerName,
Priority: conf.Priority,
PreemptionPolicy: conf.PreemptionPolicy,
PriorityClassName: conf.PriorityClassName,
},
}
if conf.Resources != nil {
pod.Spec.Containers[0].Resources = *conf.Resources
}
return pod
}
// createPausePod creates a pod with "Pause" image and the given config and
// return its pointer and error status.
func createPausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
}
// createPausePodWithResource creates a pod with "Pause" image and the given
// resources and returns its pointer and error status. The resource list can be
// nil.
func createPausePodWithResource(cs clientset.Interface, podName string,
nsName string, res *v1.ResourceList) (*v1.Pod, error) {
var conf pausePodConfig
if res == nil {
conf = pausePodConfig{
Name: podName,
Namespace: nsName,
}
} else {
conf = pausePodConfig{
Name: podName,
Namespace: nsName,
Resources: &v1.ResourceRequirements{
Requests: *res,
},
}
}
return createPausePod(cs, initPausePod(&conf))
}
// runPausePod creates a pod with "Pause" image and the given config and waits
// until it is scheduled. It returns its pointer and error status.
func runPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create pause pod: %v", err)
}
if err = testutils.WaitForPodToSchedule(cs, pod); err != nil {
return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
}
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err)
}
return pod, nil
}
type podWithContainersConfig struct {
Name string
Namespace string
Containers []v1.Container
}
// initPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
// pods with containers each having a specific image.
func initPodWithContainers(cs clientset.Interface, conf *podWithContainersConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
},
Spec: v1.PodSpec{
Containers: conf.Containers,
},
}
return pod
}
// runPodWithContainers creates a pod with given config and containers and waits
// until it is scheduled. It returns its pointer and error status.
func runPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create pod-with-containers: %v", err)
}
if err = testutils.WaitForPodToSchedule(cs, pod); err != nil {
return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
}
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err)
}
return pod, nil
}
// podIsGettingEvicted returns true if the pod's deletion timestamp is set.
func podIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
if pod.DeletionTimestamp != nil {
return true, nil
}
return false, nil
}
}
// podScheduledIn returns true if a given pod is placed onto one of the expected nodes.
func podScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
if pod.Spec.NodeName == "" {
return false, nil
}
for _, nodeName := range nodeNames {
if pod.Spec.NodeName == nodeName {
return true, nil
}
}
return false, nil
}
}
// podUnschedulable returns a condition function that returns true if the given pod
// gets unschedulable status.
func podUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil
}
}
// podSchedulingError returns a condition function that returns true if the given pod
// gets unschedulable status for reasons other than "Unschedulable". The scheduler
// records such reasons in case of error.
func podSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason != v1.PodReasonUnschedulable, nil
}
}
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the given timeout.
func waitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
return wait.Poll(100*time.Millisecond, timeout, podUnschedulable(cs, pod.Namespace, pod.Name))
}
// waitForPodUnschedule waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the timeout duration (30 seconds).
func waitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
return waitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
}
// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
// an error if it does not scheduled within the given timeout.
func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
return wait.Poll(100*time.Millisecond, timeout, podScheduled(cs, pod.Namespace, pod.Name))
}
// waitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
// the expected values.
func waitForPDBsStable(testCtx *testutils.TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
pdbList, err := testCtx.ClientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
if len(pdbList.Items) != len(pdbs) {
return false, nil
}
for i, pdb := range pdbs {
found := false
for _, cpdb := range pdbList.Items {
if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
found = true
if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
return false, nil
}
}
}
if !found {
return false, nil
}
}
return true, nil
})
}
// waitCachedPodsStable waits until scheduler cache has the given pods.
func waitCachedPodsStable(testCtx *testutils.TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.Cache.PodCount()
if err != nil {
return false, err
}
if len(pods) != cachedPods {
return false, nil
}
for _, p := range pods {
actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
if err1 != nil {
return false, err1
}
cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
if err2 != nil || cachedPod == nil {
return false, err2
}
}
return true, nil
})
}
// deletePod deletes the given pod in the given namespace.
func deletePod(cs clientset.Interface, podName string, nsName string) error {
return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0))
}
func getPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
}
// podScheduled returns true if a node is assigned to the given pod.
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
return pod.Spec.NodeName != "", nil
}
}
func createNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error {
for _, n := range namespaces {
ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}}
if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil {
return err
}
}
return nil
}
// timeout returns a timeout error if the given `f` function doesn't
// complete within `d` duration; otherwise it returns nil.
func timeout(ctx context.Context, d time.Duration, f func()) error {
ctx, cancel := context.WithTimeout(ctx, d)
defer cancel()
done := make(chan struct{})
go func() {
f()
done <- struct{}{}
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// nextPodOrDie returns the next Pod in the scheduler queue.
// The operation needs to be completed within 5 seconds; otherwise the test gets aborted.
func nextPodOrDie(t *testing.T, testCtx *testutils.TestContext) *framework.QueuedPodInfo {
t.Helper()
var podInfo *framework.QueuedPodInfo
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo = testCtx.Scheduler.NextPod()
}); err != nil {
t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
}
return podInfo
}
// nextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout.
func nextPod(t *testing.T, testCtx *testutils.TestContext) *framework.QueuedPodInfo {
t.Helper()
var podInfo *framework.QueuedPodInfo
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo = testCtx.Scheduler.NextPod()
}); err != nil {
return nil
}
return podInfo
}
var (
createNode = testutils.CreateNode
createPausePod = testutils.CreatePausePod
createPausePodWithResource = testutils.CreatePausePodWithResource
deletePod = testutils.DeletePod
initPausePod = testutils.InitPausePod
initTest = testutils.InitTestSchedulerWithNS
nextPod = testutils.NextPod
nextPodOrDie = testutils.NextPodOrDie
runPausePod = testutils.RunPausePod
updateNode = testutils.UpdateNode
waitForNodesInCache = testutils.WaitForNodesInCache
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
waitForReflection = testutils.WaitForReflection
)

View File

@ -26,27 +26,41 @@ import (
"time"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
pvutil "k8s.io/component-helpers/storage/volume"
"k8s.io/klog/v2"
"k8s.io/kube-scheduler/config/v1beta3"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/pointer"
)
// ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
@ -456,3 +470,471 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond
return true, nil
}
}
// InitDisruptionController initializes and runs a Disruption Controller to properly
// update PodDisuptionBudget objects.
func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
config := restclient.Config{Host: testCtx.HTTPServer.URL}
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery())
scaleClient, err := scale.NewForConfig(&config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
t.Fatalf("Error in create scaleClient: %v", err)
}
dc := disruption.NewDisruptionController(
informers.Core().V1().Pods(),
informers.Policy().V1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
informers.Apps().V1().ReplicaSets(),
informers.Apps().V1().Deployments(),
informers.Apps().V1().StatefulSets(),
testCtx.ClientSet,
mapper,
scaleClient,
testCtx.ClientSet.Discovery())
informers.Start(testCtx.Scheduler.StopEverything)
informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
go dc.Run(testCtx.Ctx)
return dc
}
// InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default
// configuration.
func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext {
testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), opts...)
SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
}
// InitTestDisablePreemption initializes a test environment and creates API server and scheduler with default
// configuration but with pod preemption disabled.
func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
cfg := configtesting.V1beta3ToInternalWithDefaults(t, v1beta3.KubeSchedulerConfiguration{
Profiles: []v1beta3.KubeSchedulerProfile{{
SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
Plugins: &v1beta3.Plugins{
PostFilter: v1beta3.PluginSet{
Disabled: []v1beta3.Plugin{
{Name: defaultpreemption.Name},
},
},
},
}},
})
testCtx := InitTestSchedulerWithOptions(
t, InitTestAPIServer(t, nsPrefix, nil),
scheduler.WithProfiles(cfg.Profiles...))
SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
}
// WaitForReflection waits till the passFunc confirms that the object it expects
// to see is in the store. Used to observe reflected events.
func WaitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
passFunc func(n interface{}) bool) error {
var nodes []*v1.Node
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
n, err := nodeLister.Get(key)
switch {
case err == nil && passFunc(n):
return true, nil
case apierrors.IsNotFound(err):
nodes = append(nodes, nil)
case err != nil:
t.Errorf("Unexpected error: %v", err)
default:
nodes = append(nodes, n)
}
return false, nil
})
if err != nil {
t.Logf("Logging consecutive node versions received from store:")
for i, n := range nodes {
t.Logf("%d: %#v", i, n)
}
}
return err
}
func UpdateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
}
func CreateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
}
func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
nodes := make([]*v1.Node, numNodes)
for i := 0; i < numNodes; i++ {
nodeName := fmt.Sprintf("%v-%d", prefix, i)
node, err := CreateNode(cs, wrapper.Name(nodeName).Obj())
if err != nil {
return nodes[:], err
}
nodes[i] = node
}
return nodes[:], nil
}
// CreateAndWaitForNodesInCache calls createNodes(), and wait for the created
// nodes to be present in scheduler cache.
func CreateAndWaitForNodesInCache(testCtx *TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
existingNodes := testCtx.Scheduler.Cache.NodeCount()
nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
if err != nil {
return nodes, fmt.Errorf("cannot create nodes: %v", err)
}
return nodes, WaitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
}
// WaitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache
// within 30 seconds; otherwise returns false.
func WaitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
return sched.Cache.NodeCount() >= nodeCount, nil
})
if err != nil {
return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
}
return nil
}
type PausePodConfig struct {
Name string
Namespace string
Affinity *v1.Affinity
Annotations, Labels, NodeSelector map[string]string
Resources *v1.ResourceRequirements
Tolerations []v1.Toleration
NodeName string
SchedulerName string
Priority *int32
PreemptionPolicy *v1.PreemptionPolicy
PriorityClassName string
}
// InitPausePod initializes a pod API object from the given config. It is used
// mainly in pod creation process.
func InitPausePod(conf *PausePodConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
Labels: conf.Labels,
Annotations: conf.Annotations,
},
Spec: v1.PodSpec{
NodeSelector: conf.NodeSelector,
Affinity: conf.Affinity,
Containers: []v1.Container{
{
Name: conf.Name,
Image: imageutils.GetPauseImageName(),
},
},
Tolerations: conf.Tolerations,
NodeName: conf.NodeName,
SchedulerName: conf.SchedulerName,
Priority: conf.Priority,
PreemptionPolicy: conf.PreemptionPolicy,
PriorityClassName: conf.PriorityClassName,
},
}
if conf.Resources != nil {
pod.Spec.Containers[0].Resources = *conf.Resources
}
return pod
}
// CreatePausePod creates a pod with "Pause" image and the given config and
// return its pointer and error status.
func CreatePausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
}
// CreatePausePodWithResource creates a pod with "Pause" image and the given
// resources and returns its pointer and error status. The resource list can be
// nil.
func CreatePausePodWithResource(cs clientset.Interface, podName string,
nsName string, res *v1.ResourceList) (*v1.Pod, error) {
var conf PausePodConfig
if res == nil {
conf = PausePodConfig{
Name: podName,
Namespace: nsName,
}
} else {
conf = PausePodConfig{
Name: podName,
Namespace: nsName,
Resources: &v1.ResourceRequirements{
Requests: *res,
},
}
}
return CreatePausePod(cs, InitPausePod(&conf))
}
// RunPausePod creates a pod with "Pause" image and the given config and waits
// until it is scheduled. It returns its pointer and error status.
func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create pause pod: %v", err)
}
if err = WaitForPodToSchedule(cs, pod); err != nil {
return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
}
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err)
}
return pod, nil
}
type PodWithContainersConfig struct {
Name string
Namespace string
Containers []v1.Container
}
// InitPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
// pods with containers each having a specific image.
func InitPodWithContainers(cs clientset.Interface, conf *PodWithContainersConfig) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: conf.Name,
Namespace: conf.Namespace,
},
Spec: v1.PodSpec{
Containers: conf.Containers,
},
}
return pod
}
// RunPodWithContainers creates a pod with given config and containers and waits
// until it is scheduled. It returns its pointer and error status.
func RunPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create pod-with-containers: %v", err)
}
if err = WaitForPodToSchedule(cs, pod); err != nil {
return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
}
if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err)
}
return pod, nil
}
// PodIsGettingEvicted returns true if the pod's deletion timestamp is set.
func PodIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
if pod.DeletionTimestamp != nil {
return true, nil
}
return false, nil
}
}
// PodScheduledIn returns true if a given pod is placed onto one of the expected nodes.
func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
if pod.Spec.NodeName == "" {
return false, nil
}
for _, nodeName := range nodeNames {
if pod.Spec.NodeName == nodeName {
return true, nil
}
}
return false, nil
}
}
// PodUnschedulable returns a condition function that returns true if the given pod
// gets unschedulable status.
func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil
}
}
// PodSchedulingError returns a condition function that returns true if the given pod
// gets unschedulable status for reasons other than "Unschedulable". The scheduler
// records such reasons in case of error.
func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// This could be a connection error so we want to retry.
return false, nil
}
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse &&
cond.Reason != v1.PodReasonUnschedulable, nil
}
}
// waitForPodUnscheduleWithTimeout waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the given timeout.
func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
return wait.Poll(100*time.Millisecond, timeout, PodUnschedulable(cs, pod.Namespace, pod.Name))
}
// waitForPodUnschedule waits for a pod to fail scheduling and returns
// an error if it does not become unschedulable within the timeout duration (30 seconds).
func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
}
// WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
// the expected values.
func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
if len(pdbList.Items) != len(pdbs) {
return false, nil
}
for i, pdb := range pdbs {
found := false
for _, cpdb := range pdbList.Items {
if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
found = true
if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
return false, nil
}
}
}
if !found {
return false, nil
}
}
return true, nil
})
}
// WaitCachedPodsStable waits until scheduler cache has the given pods.
func WaitCachedPodsStable(testCtx *TestContext, pods []*v1.Pod) error {
return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
cachedPods, err := testCtx.Scheduler.Cache.PodCount()
if err != nil {
return false, err
}
if len(pods) != cachedPods {
return false, nil
}
for _, p := range pods {
actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
if err1 != nil {
return false, err1
}
cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
if err2 != nil || cachedPod == nil {
return false, err2
}
}
return true, nil
})
}
// DeletePod deletes the given pod in the given namespace.
func DeletePod(cs clientset.Interface, podName string, nsName string) error {
return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0))
}
func GetPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
}
func CreateNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error {
for _, n := range namespaces {
ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}}
if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil {
return err
}
}
return nil
}
// timeout returns a timeout error if the given `f` function doesn't
// complete within `d` duration; otherwise it returns nil.
func timeout(ctx context.Context, d time.Duration, f func()) error {
ctx, cancel := context.WithTimeout(ctx, d)
defer cancel()
done := make(chan struct{})
go func() {
f()
done <- struct{}{}
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// NextPodOrDie returns the next Pod in the scheduler queue.
// The operation needs to be completed within 5 seconds; otherwise the test gets aborted.
func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
t.Helper()
var podInfo *schedulerframework.QueuedPodInfo
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo = testCtx.Scheduler.NextPod()
}); err != nil {
t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
}
return podInfo
}
// NextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout.
func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
t.Helper()
var podInfo *schedulerframework.QueuedPodInfo
// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
// default go testing timeout (10m) to abort.
if err := timeout(testCtx.Ctx, time.Second*5, func() {
podInfo = testCtx.Scheduler.NextPod()
}); err != nil {
return nil
}
return podInfo
}