Fix: fiterror in permit plugin not handled perfectly

We only added failed plulgins, but actually this will not work unless
we make the status with a fitError because we only copy the failured plugins
to podInfo if it is a fitError

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet 2023-07-07 10:35:59 +08:00
parent 960830bc66
commit 278a8376e1
8 changed files with 344 additions and 127 deletions

View File

@ -1333,8 +1333,7 @@ func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.C
if !status.IsSuccess() {
if status.IsUnschedulable() {
logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name())
return status
return status.WithFailedPlugin(pl.Name())
}
if status.IsWait() {
// Not allowed to be greater than maxTimeout.

View File

@ -2669,7 +2669,6 @@ func TestPermitPlugins(t *testing.T) {
}
status := f.RunPermitPlugins(ctx, nil, pod, "")
if !reflect.DeepEqual(status, tt.want) {
t.Errorf("wrong status code. got %v, want %v", status, tt.want)
}

View File

@ -330,6 +330,7 @@ func (f *FitError) Error() string {
if postFilterMsg != "" {
reasonMsg += fmt.Sprintf(SeparatorFormat, postFilterMsg)
}
return reasonMsg
}

View File

@ -1447,6 +1447,17 @@ func TestFitError_Error(t *testing.T) {
},
wantReasonMsg: "0/3 nodes are available: 1 Node(s) failed Filter plugin FalseFilter-2, 2 Node(s) failed Filter plugin FalseFilter-1. Error running PostFilter plugin FailedPostFilter.",
},
{
name: "failed to Permit on node",
numAllNodes: 1,
diagnosis: Diagnosis{
NodeToStatusMap: NodeToStatusMap{
// There should be only one node here.
"node1": NewStatus(Unschedulable, "Node failed Permit plugin Permit-1"),
},
},
wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -184,9 +184,7 @@ func (sched *Scheduler) schedulingCycle(
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
framework.AsStatus(err)
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err)
}
// Run the Reserve method of reserve plugins.
@ -197,9 +195,7 @@ func (sched *Scheduler) schedulingCycle(
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
sts
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
}
// Run "permit" plugins.
@ -211,9 +207,19 @@ func (sched *Scheduler) schedulingCycle(
logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
}
return ScheduleResult{nominatingInfo: clearNominatedNode},
assumedPodInfo,
runPermitStatus
if runPermitStatus.IsUnschedulable() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus},
UnschedulablePlugins: sets.New(runPermitStatus.FailedPlugin()),
},
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr)
}
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus
}
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
@ -923,7 +929,6 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
errMsg := status.Message()
if err == ErrNoNodesAvailable {
logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
} else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
schedulerutils "k8s.io/kubernetes/test/integration/scheduler"
testutils "k8s.io/kubernetes/test/integration/util"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/pointer"
@ -64,6 +65,9 @@ var (
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
waitForPodSchedulingGated = testutils.WaitForPodSchedulingGated
waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout
initRegistryAndConfig = func(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) {
return schedulerutils.InitRegistryAndConfig(t, newPlugin, plugins...)
}
)
type PreEnqueuePlugin struct {
@ -659,7 +663,7 @@ func TestPreFilterPlugin(t *testing.T) {
preFilterPlugin := &PreFilterPlugin{}
registry, prof := initRegistryAndConfig(t, preFilterPlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -837,7 +841,7 @@ func TestPostFilterPlugin(t *testing.T) {
},
}}})
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes),
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, int(tt.numNodes),
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
@ -904,7 +908,7 @@ func TestScorePlugin(t *testing.T) {
scorePlugin := &ScorePlugin{}
registry, prof := initRegistryAndConfig(t, scorePlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 10,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 10,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -947,7 +951,7 @@ func TestNormalizeScorePlugin(t *testing.T) {
scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{}
registry, prof := initRegistryAndConfig(t, scoreWithNormalizePlugin)
testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -995,7 +999,7 @@ func TestReservePluginReserve(t *testing.T) {
reservePlugin := &ReservePlugin{}
registry, prof := initRegistryAndConfig(t, reservePlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1110,7 +1114,7 @@ func TestPrebindPlugin(t *testing.T) {
},
})
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, nodesNum,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, nodesNum,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1265,7 +1269,7 @@ func TestUnReserveReservePlugins(t *testing.T) {
}
registry, prof := initRegistryAndConfig(t, pls...)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1358,7 +1362,7 @@ func TestUnReservePermitPlugins(t *testing.T) {
}
registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1430,7 +1434,7 @@ func TestUnReservePreBindPlugins(t *testing.T) {
}
registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1501,7 +1505,7 @@ func TestUnReserveBindPlugins(t *testing.T) {
test.plugin.client = testContext.ClientSet
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1634,7 +1638,7 @@ func TestBindPlugin(t *testing.T) {
}},
})
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
@ -1755,7 +1759,7 @@ func TestPostBindPlugin(t *testing.T) {
}
registry, prof := initRegistryAndConfig(t, preBindPlugin, postBindPlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1846,7 +1850,7 @@ func TestPermitPlugin(t *testing.T) {
perPlugin := &PermitPlugin{name: permitPluginName}
registry, prof := initRegistryAndConfig(t, perPlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -1895,7 +1899,7 @@ func TestMultiplePermitPlugins(t *testing.T) {
registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2)
// Create the API server and the scheduler with the test plugin set.
testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -1947,7 +1951,7 @@ func TestPermitPluginsCancelled(t *testing.T) {
registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2)
// Create the API server and the scheduler with the test plugin set.
testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -2010,7 +2014,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
permitPlugin := &PermitPlugin{name: permitPluginName}
registry, prof := initRegistryAndConfig(t, permitPlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2092,7 +2096,7 @@ func TestFilterPlugin(t *testing.T) {
filterPlugin := &FilterPlugin{}
registry, prof := initRegistryAndConfig(t, filterPlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 1,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2148,7 +2152,7 @@ func TestPreScorePlugin(t *testing.T) {
preScorePlugin := &PreScorePlugin{}
registry, prof := initRegistryAndConfig(t, preScorePlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 2,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2209,7 +2213,7 @@ func TestPreEnqueuePlugin(t *testing.T) {
preFilterPlugin := &PreFilterPlugin{}
registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin)
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 1,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 1,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
@ -2338,7 +2342,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
},
})
testCtx, teardown := initTestSchedulerForFrameworkTest(t, testContext, 0,
testCtx, teardown := schedulerutils.InitTestSchedulerForFrameworkTest(t, testContext, 0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
)
@ -2518,7 +2522,7 @@ func TestActivatePods(t *testing.T) {
})
// Create the API server and the scheduler with the test plugin set.
testCtx, _ := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1,
testCtx, _ := schedulerutils.InitTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -2563,96 +2567,3 @@ func TestActivatePods(t *testing.T) {
t.Errorf("JobPlugin's pods activation logic is not called")
}
}
// The returned shutdown func will delete created resources and scheduler, resources should be those
// that will affect the scheduling result, like nodes, pods, etc.. Namespaces should not be
// deleted here because it's created together with the apiserver, they should be deleted
// simultaneously or we'll have no namespace.
// This should only be called when you want to kill the scheduler alone, away from apiserver.
// For example, in scheduler integration tests, recreating apiserver is performance consuming,
// then shutdown the scheduler and recreate it between each test case is a better approach.
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) {
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
if nodeCount > 0 {
if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil {
// Make sure to cleanup the resources when initializing error.
testutils.CleanupTest(t, testCtx)
t.Fatal(err)
}
}
teardown := func() {
err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{})
if err != nil {
t.Errorf("error while deleting all nodes: %v", err)
}
err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{})
if err != nil {
t.Errorf("error while deleting pod: %v", err)
}
// Wait for all pods to be deleted, or will failed to create same name pods
// required in other test cases.
err = wait.PollUntilContextTimeout(testCtx.SchedulerCtx, time.Millisecond, wait.ForeverTestTimeout, true,
testutils.PodsCleanedUp(testCtx.SchedulerCtx, testCtx.ClientSet, testCtx.NS.Name))
if err != nil {
t.Errorf("error while waiting for all pods to be deleted: %v", err)
}
// Kill the scheduler.
testCtx.SchedulerCloseFn()
}
return testCtx, teardown
}
// initRegistryAndConfig returns registry and plugins config based on give plugins.
func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) {
if len(plugins) == 0 {
return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{}
}
registry := frameworkruntime.Registry{}
pls := &configv1.Plugins{}
for _, p := range plugins {
registry.Register(p.Name(), newPlugin(p))
plugin := configv1.Plugin{Name: p.Name()}
switch p.(type) {
case *PreEnqueuePlugin:
pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin)
case *PreFilterPlugin:
pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin)
case *FilterPlugin:
pls.Filter.Enabled = append(pls.Filter.Enabled, plugin)
case *PreScorePlugin:
pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin)
case *ScorePlugin, *ScoreWithNormalizePlugin:
pls.Score.Enabled = append(pls.Score.Enabled, plugin)
case *ReservePlugin:
pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin)
case *PreBindPlugin:
pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin)
case *BindPlugin:
pls.Bind.Enabled = append(pls.Bind.Enabled, plugin)
// It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail
// a pod's scheduling, as well as the test BindPlugin's execution.
pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}}
case *PostBindPlugin:
pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin)
case *PermitPlugin:
pls.Permit.Enabled = append(pls.Permit.Enabled, plugin)
}
}
versionedCfg := configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: pls,
}},
}
cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg)
return registry, cfg.Profiles[0]
}

View File

@ -0,0 +1,171 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
)
var _ framework.PermitPlugin = &PermitPlugin{}
var _ framework.EnqueueExtensions = &PermitPlugin{}
type PermitPlugin struct {
name string
statusCode framework.Code
numPermitCalled int
}
func (pp *PermitPlugin) Name() string {
return pp.name
}
func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
pp.numPermitCalled += 1
if pp.statusCode == framework.Error {
return framework.NewStatus(framework.Error, "failed to permit"), 0
}
if pp.statusCode == framework.Unschedulable {
if pp.numPermitCalled <= 1 {
return framework.NewStatus(framework.Unschedulable, "reject to permit"), 0
}
}
return nil, 0
}
func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
return framework.QueueImmediately
},
},
}
}
func TestReScheduling(t *testing.T) {
testContext := testutils.InitTestAPIServer(t, "permit-plugin", nil)
tests := []struct {
name string
plugins []framework.Plugin
action func() error
// The first time for pod scheduling, we make pod scheduled error or unschedulable on purpose.
// This is controlled by wantFirstSchedulingError. By default, pod is unschedulable.
wantFirstSchedulingError bool
// wantScheduled/wantError means the final expected scheduling result.
wantScheduled bool
wantError bool
}{
{
name: "Rescheduling pod rejected by Permit Plugin",
plugins: []framework.Plugin{
&PermitPlugin{name: "permit", statusCode: framework.Unschedulable},
},
action: func() error {
_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj())
return err
},
wantScheduled: true,
},
{
name: "Rescheduling pod rejected by Permit Plugin with unrelated event",
plugins: []framework.Plugin{
&PermitPlugin{name: "permit", statusCode: framework.Unschedulable},
},
action: func() error {
_, err := createPausePod(testContext.ClientSet,
initPausePod(&testutils.PausePodConfig{Name: "test-pod-2", Namespace: testContext.NS.Name}))
return err
},
wantScheduled: false,
},
{
name: "Rescheduling pod failed by Permit Plugin",
plugins: []framework.Plugin{
&PermitPlugin{name: "permit", statusCode: framework.Error},
},
action: func() error {
_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj())
return err
},
wantFirstSchedulingError: true,
wantError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry, prof := InitRegistryAndConfig(t, nil, test.plugins...)
testCtx, teardown := InitTestSchedulerForFrameworkTest(t, testContext, 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer teardown()
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
// The first time for scheduling, pod is error or unschedulable, controlled by wantFirstSchedulingError
if test.wantFirstSchedulingError {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, testutils.PodSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but got: %v", err)
}
} else {
if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
}
}
if test.action() != nil {
if err = test.action(); err != nil {
t.Errorf("Perform action() error: %v", err)
}
}
if test.wantScheduled {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
t.Errorf("Didn't expect the pod to be unschedulable. error: %v", err)
}
} else if test.wantError {
if err = wait.Poll(10*time.Millisecond, 30*time.Second, testutils.PodSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
t.Errorf("Expected a scheduling error, but got: %v", err)
}
} else {
if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
}
}
})
}
}

View File

@ -17,7 +17,23 @@ limitations under the License.
package scheduler
import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testutils "k8s.io/kubernetes/test/integration/util"
"k8s.io/utils/pointer"
)
var (
@ -35,3 +51,107 @@ var (
waitForPodUnschedulable = testutils.WaitForPodUnschedulable
waitForReflection = testutils.WaitForReflection
)
// The returned shutdown func will delete created resources and scheduler, resources should be those
// that will affect the scheduling result, like nodes, pods, etc.. Namespaces should not be
// deleted here because it's created together with the apiserver, they should be deleted
// simultaneously or we'll have no namespace.
// This should only be called when you want to kill the scheduler alone, away from apiserver.
// For example, in scheduler integration tests, recreating apiserver is performance consuming,
// then shutdown the scheduler and recreate it between each test case is a better approach.
func InitTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) (*testutils.TestContext, testutils.ShutdownFunc) {
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...)
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
if nodeCount > 0 {
if _, err := testutils.CreateAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil {
// Make sure to cleanup the resources when initializing error.
testutils.CleanupTest(t, testCtx)
t.Fatal(err)
}
}
teardown := func() {
err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{})
if err != nil {
t.Errorf("error while deleting all nodes: %v", err)
}
err = testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).DeleteCollection(testCtx.SchedulerCtx, *metav1.NewDeleteOptions(0), metav1.ListOptions{})
if err != nil {
t.Errorf("error while deleting pod: %v", err)
}
// Wait for all pods to be deleted, or will failed to create same name pods
// required in other test cases.
err = wait.PollUntilContextTimeout(testCtx.SchedulerCtx, time.Millisecond, wait.ForeverTestTimeout, true,
testutils.PodsCleanedUp(testCtx.SchedulerCtx, testCtx.ClientSet, testCtx.NS.Name))
if err != nil {
t.Errorf("error while waiting for all pods to be deleted: %v", err)
}
// Kill the scheduler.
testCtx.SchedulerCloseFn()
}
return testCtx, teardown
}
// NewPlugin returns a plugin factory with specified Plugin.
func NewPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return plugin, nil
}
}
// InitRegistryAndConfig returns registry and plugins config based on give plugins.
func InitRegistryAndConfig(t *testing.T, factory func(plugin framework.Plugin) frameworkruntime.PluginFactory, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) {
if len(plugins) == 0 {
return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{}
}
if factory == nil {
factory = NewPlugin
}
registry := frameworkruntime.Registry{}
pls := &configv1.Plugins{}
for _, p := range plugins {
registry.Register(p.Name(), factory(p))
plugin := configv1.Plugin{Name: p.Name()}
switch p.(type) {
case framework.PreEnqueuePlugin:
pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin)
case framework.PreFilterPlugin:
pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin)
case framework.FilterPlugin:
pls.Filter.Enabled = append(pls.Filter.Enabled, plugin)
case framework.PreScorePlugin:
pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin)
case framework.ScorePlugin:
pls.Score.Enabled = append(pls.Score.Enabled, plugin)
case framework.ReservePlugin:
pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin)
case framework.PreBindPlugin:
pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin)
case framework.BindPlugin:
pls.Bind.Enabled = append(pls.Bind.Enabled, plugin)
// It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail
// a pod's scheduling, as well as the test BindPlugin's execution.
pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}}
case framework.PostBindPlugin:
pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin)
case framework.PermitPlugin:
pls.Permit.Enabled = append(pls.Permit.Enabled, plugin)
}
}
versionedCfg := configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: pointer.String(v1.DefaultSchedulerName),
Plugins: pls,
}},
}
cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg)
return registry, cfg.Profiles[0]
}