/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package plugins import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" featuregatetesting "k8s.io/component-base/featuregate/testing" configv1 "k8s.io/kube-scheduler/config/v1" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" testutils "k8s.io/kubernetes/test/integration/util" imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" ) // imported from testutils var ( createPausePod = testutils.CreatePausePod initPausePod = testutils.InitPausePod getPod = testutils.GetPod deletePod = testutils.DeletePod podUnschedulable = testutils.PodUnschedulable podSchedulingError = testutils.PodSchedulingError createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache waitForPodUnschedulable = testutils.WaitForPodUnschedulable waitForPodSchedulingGated = testutils.WaitForPodSchedulingGated waitForPodToScheduleWithTimeout = testutils.WaitForPodToScheduleWithTimeout ) type PreEnqueuePlugin struct { called int32 admit bool } type PreFilterPlugin struct { numPreFilterCalled int failPreFilter bool rejectPreFilter bool } type ScorePlugin struct { failScore bool numScoreCalled int32 highScoreNode string } type ScoreWithNormalizePlugin struct { numScoreCalled int numNormalizeScoreCalled int } type FilterPlugin struct { numFilterCalled int32 failFilter bool rejectFilter bool numCalledPerPod map[string]int sync.RWMutex } type PostFilterPlugin struct { name string fh framework.Handle numPostFilterCalled int failPostFilter bool rejectPostFilter bool breakPostFilter bool } type ReservePlugin struct { name string numReserveCalled int failReserve bool numUnreserveCalled int pluginInvokeEventChan chan pluginInvokeEvent } type PreScorePlugin struct { numPreScoreCalled int failPreScore bool } type PreBindPlugin struct { numPreBindCalled int failPreBind bool rejectPreBind bool // If set to true, always succeed on non-first scheduling attempt. succeedOnRetry bool // Record the pod UIDs that have been tried scheduling. podUIDs map[types.UID]struct{} } type BindPlugin struct { name string numBindCalled int bindStatus *framework.Status client clientset.Interface pluginInvokeEventChan chan pluginInvokeEvent } type PostBindPlugin struct { name string numPostBindCalled int pluginInvokeEventChan chan pluginInvokeEvent } type PermitPlugin struct { name string numPermitCalled int failPermit bool rejectPermit bool timeoutPermit bool waitAndRejectPermit bool waitAndAllowPermit bool cancelled bool waitingPod string rejectingPod string allowingPod string fh framework.Handle } const ( enqueuePluginName = "enqueue-plugin" prefilterPluginName = "prefilter-plugin" postfilterPluginName = "postfilter-plugin" scorePluginName = "score-plugin" scoreWithNormalizePluginName = "score-with-normalize-plugin" filterPluginName = "filter-plugin" preScorePluginName = "prescore-plugin" reservePluginName = "reserve-plugin" preBindPluginName = "prebind-plugin" postBindPluginName = "postbind-plugin" permitPluginName = "permit-plugin" ) var _ framework.PreEnqueuePlugin = &PreEnqueuePlugin{} var _ framework.PreFilterPlugin = &PreFilterPlugin{} var _ framework.PostFilterPlugin = &PostFilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} var _ framework.FilterPlugin = &FilterPlugin{} var _ framework.ScorePlugin = &ScorePlugin{} var _ framework.ScorePlugin = &ScoreWithNormalizePlugin{} var _ framework.ReservePlugin = &ReservePlugin{} var _ framework.PreScorePlugin = &PreScorePlugin{} var _ framework.PreBindPlugin = &PreBindPlugin{} var _ framework.BindPlugin = &BindPlugin{} var _ framework.PostBindPlugin = &PostBindPlugin{} var _ framework.PermitPlugin = &PermitPlugin{} // newPlugin returns a plugin factory with specified Plugin. func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { switch pl := plugin.(type) { case *PermitPlugin: pl.fh = fh case *PostFilterPlugin: pl.fh = fh } return plugin, nil } } func (ep *PreEnqueuePlugin) Name() string { return enqueuePluginName } func (ep *PreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { ep.called++ if ep.admit { return nil } return framework.NewStatus(framework.UnschedulableAndUnresolvable, "not ready for scheduling") } // Name returns name of the score plugin. func (sp *ScorePlugin) Name() string { return scorePluginName } // reset returns name of the score plugin. func (sp *ScorePlugin) reset() { sp.failScore = false sp.numScoreCalled = 0 sp.highScoreNode = "" } // Score returns the score of scheduling a pod on a specific node. func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { curCalled := atomic.AddInt32(&sp.numScoreCalled, 1) if sp.failScore { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name)) } score := int64(1) if curCalled == 1 { // The first node is scored the highest, the rest is scored lower. sp.highScoreNode = nodeName score = framework.MaxNodeScore } return score, nil } func (sp *ScorePlugin) ScoreExtensions() framework.ScoreExtensions { return nil } // Name returns name of the score plugin. func (sp *ScoreWithNormalizePlugin) Name() string { return scoreWithNormalizePluginName } // reset returns name of the score plugin. func (sp *ScoreWithNormalizePlugin) reset() { sp.numScoreCalled = 0 sp.numNormalizeScoreCalled = 0 } // Score returns the score of scheduling a pod on a specific node. func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { sp.numScoreCalled++ score := int64(10) return score, nil } func (sp *ScoreWithNormalizePlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { sp.numNormalizeScoreCalled++ return nil } func (sp *ScoreWithNormalizePlugin) ScoreExtensions() framework.ScoreExtensions { return sp } // Name returns name of the plugin. func (fp *FilterPlugin) Name() string { return filterPluginName } // reset is used to reset filter plugin. func (fp *FilterPlugin) reset() { fp.numFilterCalled = 0 fp.failFilter = false if fp.numCalledPerPod != nil { fp.numCalledPerPod = make(map[string]int) } } // Filter is a test function that returns an error or nil, depending on the // value of "failFilter". func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { atomic.AddInt32(&fp.numFilterCalled, 1) if fp.numCalledPerPod != nil { fp.Lock() fp.numCalledPerPod[fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)]++ fp.Unlock() } if fp.failFilter { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } if fp.rejectFilter { return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) } return nil } // Name returns name of the plugin. func (rp *ReservePlugin) Name() string { return rp.name } // Reserve is a test function that increments an intenral counter and returns // an error or nil, depending on the value of "failReserve". func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { rp.numReserveCalled++ if rp.failReserve { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } return nil } // Unreserve is a test function that increments an internal counter and emits // an event to a channel. While Unreserve implementations should normally be // idempotent, we relax that requirement here for testing purposes. func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { rp.numUnreserveCalled++ if rp.pluginInvokeEventChan != nil { rp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: rp.Name(), val: rp.numUnreserveCalled} } } // reset used to reset internal counters. func (rp *ReservePlugin) reset() { rp.numReserveCalled = 0 rp.numUnreserveCalled = 0 rp.failReserve = false } // Name returns name of the plugin. func (*PreScorePlugin) Name() string { return preScorePluginName } // PreScore is a test function. func (pfp *PreScorePlugin) PreScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, _ []*v1.Node) *framework.Status { pfp.numPreScoreCalled++ if pfp.failPreScore { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } return nil } // reset used to reset prescore plugin. func (pfp *PreScorePlugin) reset() { pfp.numPreScoreCalled = 0 pfp.failPreScore = false } // Name returns name of the plugin. func (pp *PreBindPlugin) Name() string { return preBindPluginName } // PreBind is a test function that returns (true, nil) or errors for testing. func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { pp.numPreBindCalled++ if _, tried := pp.podUIDs[pod.UID]; tried && pp.succeedOnRetry { return nil } pp.podUIDs[pod.UID] = struct{}{} if pp.failPreBind { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } if pp.rejectPreBind { return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) } return nil } // reset used to reset prebind plugin. func (pp *PreBindPlugin) reset() { pp.numPreBindCalled = 0 pp.failPreBind = false pp.rejectPreBind = false pp.succeedOnRetry = false pp.podUIDs = make(map[types.UID]struct{}) } const bindPluginAnnotation = "bindPluginName" func (bp *BindPlugin) Name() string { return bp.name } func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { bp.numBindCalled++ if bp.pluginInvokeEventChan != nil { bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled} } if bp.bindStatus.IsSuccess() { if err := bp.client.CoreV1().Pods(p.Namespace).Bind(context.TODO(), &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}}, Target: v1.ObjectReference{ Kind: "Node", Name: nodeName, }, }, metav1.CreateOptions{}); err != nil { return framework.NewStatus(framework.Error, fmt.Sprintf("bind failed: %v", err)) } } return bp.bindStatus } // reset used to reset numBindCalled. func (bp *BindPlugin) reset() { bp.numBindCalled = 0 } // Name returns name of the plugin. func (pp *PostBindPlugin) Name() string { return pp.name } // PostBind is a test function, which counts the number of times called. func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { pp.numPostBindCalled++ if pp.pluginInvokeEventChan != nil { pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled} } } // reset used to reset postbind plugin. func (pp *PostBindPlugin) reset() { pp.numPostBindCalled = 0 } // Name returns name of the plugin. func (pp *PreFilterPlugin) Name() string { return prefilterPluginName } // Extensions returns the PreFilterExtensions interface. func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions { return nil } // PreFilter is a test function that returns (true, nil) or errors for testing. func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { pp.numPreFilterCalled++ if pp.failPreFilter { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } if pp.rejectPreFilter { return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) } return nil, nil } // reset used to reset prefilter plugin. func (pp *PreFilterPlugin) reset() { pp.numPreFilterCalled = 0 pp.failPreFilter = false pp.rejectPreFilter = false } // Name returns name of the plugin. func (pp *PostFilterPlugin) Name() string { return pp.name } func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { pp.numPostFilterCalled++ nodeInfos, err := pp.fh.SnapshotSharedLister().NodeInfos().List() if err != nil { return nil, framework.NewStatus(framework.Error, err.Error()) } for _, nodeInfo := range nodeInfos { pp.fh.RunFilterPlugins(ctx, state, pod, nodeInfo) } var nodes []*v1.Node for _, nodeInfo := range nodeInfos { nodes = append(nodes, nodeInfo.Node()) } pp.fh.RunScorePlugins(ctx, state, pod, nodes) if pp.failPostFilter { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) } if pp.rejectPostFilter { return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("injecting unschedulable for pod %v", pod.Name)) } if pp.breakPostFilter { return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("injecting unresolvable for pod %v", pod.Name)) } return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name)) } // Name returns name of the plugin. func (pp *PermitPlugin) Name() string { return pp.name } // Permit implements the permit test plugin. func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { pp.numPermitCalled++ if pp.failPermit { return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0 } if pp.rejectPermit { return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 } if pp.timeoutPermit { go func() { select { case <-ctx.Done(): pp.cancelled = true } }() return framework.NewStatus(framework.Wait, ""), 3 * time.Second } if pp.waitAndRejectPermit || pp.waitAndAllowPermit { if pp.waitingPod == "" || pp.waitingPod == pod.Name { pp.waitingPod = pod.Name return framework.NewStatus(framework.Wait, ""), 30 * time.Second } if pp.waitAndRejectPermit { pp.rejectingPod = pod.Name pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, fmt.Sprintf("reject pod %v", wp.GetPod().Name)) }) return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 } if pp.waitAndAllowPermit { pp.allowingPod = pod.Name pp.allowAllPods() return nil, 0 } } return nil, 0 } // allowAllPods allows all waiting pods. func (pp *PermitPlugin) allowAllPods() { pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) }) } // rejectAllPods rejects all waiting pods. func (pp *PermitPlugin) rejectAllPods() { pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") }) } // reset used to reset permit plugin. func (pp *PermitPlugin) reset() { pp.numPermitCalled = 0 pp.failPermit = false pp.rejectPermit = false pp.timeoutPermit = false pp.waitAndRejectPermit = false pp.waitAndAllowPermit = false pp.cancelled = false pp.waitingPod = "" pp.allowingPod = "" pp.rejectingPod = "" } // TestPreFilterPlugin tests invocation of prefilter plugins. func TestPreFilterPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a pre-filter plugin. preFilterPlugin := &PreFilterPlugin{} registry, prof := initRegistryAndConfig(t, preFilterPlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prefilter-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool reject bool }{ { name: "disable fail and reject flags", fail: false, reject: false, }, { name: "enable fail and disable reject flags", fail: true, reject: false, }, { name: "disable fail and enable reject flags", fail: false, reject: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { preFilterPlugin.failPreFilter = test.fail preFilterPlugin.rejectPreFilter = test.reject // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.reject { if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) } } else if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but got: %v", err) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } } if preFilterPlugin.numPreFilterCalled == 0 { t.Errorf("Expected the prefilter plugin to be called.") } preFilterPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPostFilterPlugin tests invocation of postFilter plugins. func TestPostFilterPlugin(t *testing.T) { var numNodes int32 = 1 tests := []struct { name string numNodes int32 rejectFilter bool failScore bool rejectPostFilter bool rejectPostFilter2 bool breakPostFilter bool breakPostFilter2 bool expectFilterNumCalled int32 expectScoreNumCalled int32 expectPostFilterNumCalled int }{ { name: "Filter passed and Score success", numNodes: 30, rejectFilter: false, failScore: false, rejectPostFilter: false, expectFilterNumCalled: 30, expectScoreNumCalled: 30, expectPostFilterNumCalled: 0, }, { name: "Filter failed and PostFilter passed", numNodes: numNodes, rejectFilter: true, failScore: false, rejectPostFilter: false, expectFilterNumCalled: numNodes * 2, expectScoreNumCalled: 1, expectPostFilterNumCalled: 1, }, { name: "Filter failed and PostFilter failed", numNodes: numNodes, rejectFilter: true, failScore: false, rejectPostFilter: true, expectFilterNumCalled: numNodes * 2, expectScoreNumCalled: 1, expectPostFilterNumCalled: 2, }, { name: "Score failed and PostFilter failed", numNodes: numNodes, rejectFilter: true, failScore: true, rejectPostFilter: true, expectFilterNumCalled: numNodes * 2, expectScoreNumCalled: 1, expectPostFilterNumCalled: 2, }, { name: "Filter failed and first PostFilter broken", numNodes: numNodes, rejectFilter: true, breakPostFilter: true, expectFilterNumCalled: numNodes * 2, expectScoreNumCalled: 0, expectPostFilterNumCalled: 1, }, { name: "Filter failed and second PostFilter broken", numNodes: numNodes, rejectFilter: true, rejectPostFilter: true, rejectPostFilter2: true, breakPostFilter2: true, expectFilterNumCalled: numNodes * 2, expectScoreNumCalled: 0, expectPostFilterNumCalled: 2, }, } var postFilterPluginName2 = postfilterPluginName + "2" for i, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create a plugin registry for testing. Register a combination of filter and postFilter plugin. var ( filterPlugin = &FilterPlugin{} scorePlugin = &ScorePlugin{} postFilterPlugin = &PostFilterPlugin{name: postfilterPluginName} postFilterPlugin2 = &PostFilterPlugin{name: postFilterPluginName2} ) filterPlugin.rejectFilter = tt.rejectFilter scorePlugin.failScore = tt.failScore postFilterPlugin.rejectPostFilter = tt.rejectPostFilter postFilterPlugin2.rejectPostFilter = tt.rejectPostFilter2 postFilterPlugin.breakPostFilter = tt.breakPostFilter postFilterPlugin2.breakPostFilter = tt.breakPostFilter2 registry := frameworkruntime.Registry{ filterPluginName: newPlugin(filterPlugin), scorePluginName: newPlugin(scorePlugin), postfilterPluginName: newPlugin(postFilterPlugin), postFilterPluginName2: newPlugin(postFilterPlugin2), } // Setup plugins for testing. cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{{ SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: &configv1.Plugins{ Filter: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: filterPluginName}, }, }, Score: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: scorePluginName}, }, // disable default in-tree Score plugins // to make it easy to control configured ScorePlugins failure Disabled: []configv1.Plugin{ {Name: "*"}, }, }, PostFilter: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: postfilterPluginName}, {Name: postFilterPluginName2}, }, // Need to disable default in-tree PostFilter plugins, as they will // call RunPostFilterPlugins and hence impact the "numPostFilterCalled". Disabled: []configv1.Plugin{ {Name: "*"}, }, }, }, }}}) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest( t, testutils.InitTestAPIServer(t, fmt.Sprintf("postfilter%v-", i), nil), int(tt.numNodes), scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry), ) defer testutils.CleanupTest(t, testCtx) // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if tt.rejectFilter { if err = wait.Poll(10*time.Millisecond, 10*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Didn't expect the pod to be scheduled.") } if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled < tt.expectFilterNumCalled { t.Errorf("Expected the filter plugin to be called at least %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled) } if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled { t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled != tt.expectFilterNumCalled { t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled) } if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled { t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) } } numPostFilterCalled := postFilterPlugin.numPostFilterCalled + postFilterPlugin2.numPostFilterCalled if numPostFilterCalled != tt.expectPostFilterNumCalled { t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, numPostFilterCalled) } }) } } // TestScorePlugin tests invocation of score plugins. func TestScorePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a score plugin. scorePlugin := &ScorePlugin{} registry, prof := initRegistryAndConfig(t, scorePlugin) testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool }{ { name: "fail score plugin", fail: true, }, { name: "do not fail score plugin", fail: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { scorePlugin.failScore = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Fatalf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but got: %v", err) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } else { p, err := getPod(testCtx.ClientSet, pod.Name, pod.Namespace) if err != nil { t.Errorf("Failed to retrieve the pod. error: %v", err) } else if p.Spec.NodeName != scorePlugin.highScoreNode { t.Errorf("Expected the pod to be scheduled on node %q, got %q", scorePlugin.highScoreNode, p.Spec.NodeName) } } } if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled == 0 { t.Errorf("Expected the score plugin to be called.") } scorePlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestNormalizeScorePlugin tests invocation of normalize score plugins. func TestNormalizeScorePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a normalize score plugin. scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{} registry, prof := initRegistryAndConfig(t, scoreWithNormalizePlugin) testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Fatalf("Error while creating a test pod: %v", err) } if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } if scoreWithNormalizePlugin.numScoreCalled == 0 { t.Errorf("Expected the score plugin to be called.") } if scoreWithNormalizePlugin.numNormalizeScoreCalled == 0 { t.Error("Expected the normalize score plugin to be called") } scoreWithNormalizePlugin.reset() } // TestReservePlugin tests invocation of reserve plugins. func TestReservePluginReserve(t *testing.T) { // Create a plugin registry for testing. Register only a reserve plugin. reservePlugin := &ReservePlugin{} registry, prof := initRegistryAndConfig(t, reservePlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "reserve-plugin-reserve", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool }{ { name: "fail reserve plugin", fail: true, }, { name: "do not fail reserve plugin", fail: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { reservePlugin.failReserve = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } } if reservePlugin.numReserveCalled == 0 { t.Errorf("Expected the reserve plugin to be called.") } reservePlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPrebindPlugin tests invocation of prebind plugins. func TestPrebindPlugin(t *testing.T) { // Create a plugin registry for testing. Register a prebind and a filter plugin. preBindPlugin := &PreBindPlugin{podUIDs: make(map[types.UID]struct{})} filterPlugin := &FilterPlugin{} registry := frameworkruntime.Registry{ preBindPluginName: newPlugin(preBindPlugin), filterPluginName: newPlugin(filterPlugin), } // Setup initial prebind and filter plugin in different profiles. // The second profile ensures the embedded filter plugin is exclusively called, and hence // we can use its internal `numFilterCalled` to perform some precise checking logic. cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{ { SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: &configv1.Plugins{ PreBind: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: preBindPluginName}, }, }, }, }, { SchedulerName: pointer.String("2nd-scheduler"), Plugins: &configv1.Plugins{ Filter: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: filterPluginName}, }, }, }, }, }, }) // Create the API server and the scheduler with the test plugin set. nodesNum := 2 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), nodesNum, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool reject bool succeedOnRetry bool unschedulablePod *v1.Pod }{ { name: "disable fail and reject flags", fail: false, reject: false, }, { name: "enable fail and disable reject flags", fail: true, reject: false, }, { name: "disable fail and enable reject flags", fail: false, reject: true, }, { name: "enable fail and reject flags", fail: true, reject: true, }, { name: "fail on 1st try but succeed on retry", fail: true, reject: false, succeedOnRetry: true, }, { name: "failure on preBind moves unschedulable pods", fail: true, unschedulablePod: st.MakePod().Name("unschedulable-pod").Namespace(testCtx.NS.Name).Container(imageutils.GetPauseImageName()).Obj(), }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { if p := test.unschedulablePod; p != nil { p.Spec.SchedulerName = "2nd-scheduler" filterPlugin.rejectFilter = true if _, err := createPausePod(testCtx.ClientSet, p); err != nil { t.Fatalf("Error while creating an unschedulable pod: %v", err) } defer filterPlugin.reset() } preBindPlugin.failPreBind = test.fail preBindPlugin.rejectPreBind = test.reject preBindPlugin.succeedOnRetry = test.succeedOnRetry // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if test.succeedOnRetry { if err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil { t.Errorf("Expected the pod to be schedulable on retry, but got an error: %v", err) } } else if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) } } else if test.reject { if err = testutils.WaitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be unschedulable") } } else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } if preBindPlugin.numPreBindCalled == 0 { t.Errorf("Expected the prebind plugin to be called.") } if test.unschedulablePod != nil { if err := wait.Poll(10*time.Millisecond, 15*time.Second, func() (bool, error) { // 2 means the unschedulable pod is expected to be retried at least twice. // (one initial attempt plus the one moved by the preBind pod) return int(filterPlugin.numFilterCalled) >= 2*nodesNum, nil }); err != nil { t.Errorf("Timed out waiting for the unschedulable Pod to be retried at least twice.") } } preBindPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestUnReserveReservePlugins tests invocation of the Unreserve operation in // reserve plugins through failures in execution points such as pre-bind. Also // tests that the order of invocation of Unreserve operation is executed in the // reverse order of invocation of the Reserve operation. func TestUnReserveReservePlugins(t *testing.T) { tests := []struct { name string plugins []*ReservePlugin fail bool failPluginIdx int }{ { name: "The first Reserve plugin fails", failPluginIdx: 0, plugins: []*ReservePlugin{ { name: "failedReservePlugin1", failReserve: true, }, { name: "succeedReservePlugin", failReserve: false, }, { name: "failedReservePlugin2", failReserve: true, }, }, fail: true, }, { name: "The middle Reserve plugin fails", failPluginIdx: 1, plugins: []*ReservePlugin{ { name: "succeedReservePlugin1", failReserve: false, }, { name: "failedReservePlugin1", failReserve: true, }, { name: "succeedReservePlugin2", failReserve: false, }, }, fail: true, }, { name: "The last Reserve plugin fails", failPluginIdx: 2, plugins: []*ReservePlugin{ { name: "succeedReservePlugin1", failReserve: false, }, { name: "succeedReservePlugin2", failReserve: false, }, { name: "failedReservePlugin1", failReserve: true, }, }, fail: true, }, { name: "The Reserve plugins succeed", failPluginIdx: -1, plugins: []*ReservePlugin{ { name: "succeedReservePlugin1", failReserve: false, }, { name: "succeedReservePlugin2", failReserve: false, }, { name: "succeedReservePlugin3", failReserve: false, }, }, fail: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { var pls []framework.Plugin for _, pl := range test.plugins { pls = append(pls, pl) } registry, prof := initRegistryAndConfig(t, pls...) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest( t, testutils.InitTestAPIServer(t, "unreserve-reserve-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Create a best effort pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a reasons other than Unschedulable, but got: %v", err) } for i, pl := range test.plugins { if i <= test.failPluginIdx { if pl.numReserveCalled != 1 { t.Errorf("Reserve Plugins %s numReserveCalled = %d, want 1.", pl.name, pl.numReserveCalled) } } else { if pl.numReserveCalled != 0 { t.Errorf("Reserve Plugins %s numReserveCalled = %d, want 0.", pl.name, pl.numReserveCalled) } } if pl.numUnreserveCalled != 1 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 1.", pl.name, pl.numUnreserveCalled) } } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } for _, pl := range test.plugins { if pl.numReserveCalled != 1 { t.Errorf("Reserve Plugin %s numReserveCalled = %d, want 1.", pl.name, pl.numReserveCalled) } if pl.numUnreserveCalled != 0 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 0.", pl.name, pl.numUnreserveCalled) } } } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestUnReservePermitPlugins tests unreserve of Permit plugins. func TestUnReservePermitPlugins(t *testing.T) { tests := []struct { name string plugin *PermitPlugin reject bool }{ { name: "All Reserve plugins passed, but a Permit plugin was rejected", reject: true, plugin: &PermitPlugin{ name: "rejectedPermitPlugin", rejectPermit: true, }, }, { name: "All Reserve plugins passed, but a Permit plugin timeout in waiting", reject: true, plugin: &PermitPlugin{ name: "timeoutPermitPlugin", timeoutPermit: true, }, }, { name: "The Permit plugin succeed", reject: false, plugin: &PermitPlugin{ name: "succeedPermitPlugin", }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { reservePlugin := &ReservePlugin{ name: "reservePlugin", failReserve: false, } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest( t, testutils.InitTestAPIServer(t, "unreserve-reserve-plugin", nil), 2, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Create a best effort pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.reject { if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) } // Verify the Reserve Plugins if reservePlugin.numUnreserveCalled != 1 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 1.", reservePlugin.name, reservePlugin.numUnreserveCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } // Verify the Reserve Plugins if reservePlugin.numUnreserveCalled != 0 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 0.", reservePlugin.name, reservePlugin.numUnreserveCalled) } } if test.plugin.numPermitCalled != 1 { t.Errorf("Expected the Permit plugin to be called.") } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestUnReservePreBindPlugins tests unreserve of Prebind plugins. func TestUnReservePreBindPlugins(t *testing.T) { tests := []struct { name string plugin *PreBindPlugin wantReject bool }{ { name: "All Reserve plugins passed, but a PreBind plugin failed", wantReject: true, plugin: &PreBindPlugin{ podUIDs: make(map[types.UID]struct{}), rejectPreBind: true, }, }, { name: "All Reserve plugins passed, and PreBind plugin succeed", wantReject: false, plugin: &PreBindPlugin{podUIDs: make(map[types.UID]struct{})}, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { reservePlugin := &ReservePlugin{ name: "reservePlugin", failReserve: false, } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest( t, testutils.InitTestAPIServer(t, "unreserve-prebind-plugin", nil), 2, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Create a pause pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.wantReject { if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected a reasons other than Unschedulable, but got: %v", err) } // Verify the Reserve Plugins if reservePlugin.numUnreserveCalled != 1 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 1.", reservePlugin.name, reservePlugin.numUnreserveCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } // Verify the Reserve Plugins if reservePlugin.numUnreserveCalled != 0 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 0.", reservePlugin.name, reservePlugin.numUnreserveCalled) } } if test.plugin.numPreBindCalled != 1 { t.Errorf("Expected the Prebind plugin to be called.") } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestUnReserveBindPlugins tests unreserve of Bind plugins. func TestUnReserveBindPlugins(t *testing.T) { tests := []struct { name string plugin *BindPlugin fail bool }{ { name: "All Reserve plugins passed, and Bind plugin succeed", fail: false, plugin: &BindPlugin{name: "SucceedBindPlugin"}, }, { name: "All Reserve plugins passed, but a Bind plugin failed", fail: false, plugin: &BindPlugin{name: "FailedBindPlugin"}, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { reservePlugin := &ReservePlugin{ name: "reservePlugin", failReserve: false, } registry, profile := initRegistryAndConfig(t, []framework.Plugin{test.plugin, reservePlugin}...) apiCtx := testutils.InitTestAPIServer(t, "unreserve-bind-plugin", nil) test.plugin.client = apiCtx.ClientSet // Create the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest( t, apiCtx, 2, scheduler.WithProfiles(profile), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Create a pause pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a reasons other than Unschedulable, but got: %v", err) } // Verify the Reserve Plugins if reservePlugin.numUnreserveCalled != 1 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 1.", reservePlugin.name, reservePlugin.numUnreserveCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } // Verify the Reserve Plugins if reservePlugin.numUnreserveCalled != 0 { t.Errorf("Reserve Plugin %s numUnreserveCalled = %d, want 0.", reservePlugin.name, reservePlugin.numUnreserveCalled) } } if test.plugin.numBindCalled != 1 { t.Errorf("Expected the Bind plugin to be called.") } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } type pluginInvokeEvent struct { pluginName string val int } // TestBindPlugin tests invocation of bind plugins. func TestBindPlugin(t *testing.T) { testContext := testutils.InitTestAPIServer(t, "bind-plugin", nil) bindPlugin1 := &BindPlugin{name: "bind-plugin-1", client: testContext.ClientSet} bindPlugin2 := &BindPlugin{name: "bind-plugin-2", client: testContext.ClientSet} reservePlugin := &ReservePlugin{name: "mock-reserve-plugin"} postBindPlugin := &PostBindPlugin{name: "mock-post-bind-plugin"} // Create a plugin registry for testing. Register reserve, bind, and // postBind plugins. registry := frameworkruntime.Registry{ reservePlugin.Name(): newPlugin(reservePlugin), bindPlugin1.Name(): newPlugin(bindPlugin1), bindPlugin2.Name(): newPlugin(bindPlugin2), postBindPlugin.Name(): newPlugin(postBindPlugin), } // Setup initial unreserve and bind plugins for testing. cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{{ SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: &configv1.Plugins{ MultiPoint: configv1.PluginSet{ Disabled: []configv1.Plugin{ {Name: defaultbinder.Name}, }, }, Reserve: configv1.PluginSet{ Enabled: []configv1.Plugin{{Name: reservePlugin.Name()}}, }, Bind: configv1.PluginSet{ // Put DefaultBinder last. Enabled: []configv1.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}}, Disabled: []configv1.Plugin{{Name: defaultbinder.Name}}, }, PostBind: configv1.PluginSet{ Enabled: []configv1.Plugin{{Name: postBindPlugin.Name()}}, }, }, }}, }) // Create the scheduler with the test plugin set. testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, 0, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) defer testutils.CleanupTest(t, testCtx) // Add a few nodes. _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), 2) if err != nil { t.Fatal(err) } tests := []struct { name string bindPluginStatuses []*framework.Status expectBoundByScheduler bool // true means this test case expecting scheduler would bind pods expectBoundByPlugin bool // true means this test case expecting a plugin would bind pods expectBindPluginName string // expecting plugin name to bind pods expectInvokeEvents []pluginInvokeEvent }{ { name: "bind plugins skipped to bind the pod and scheduler bond the pod", bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")}, expectBoundByScheduler: true, expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}}, }, { name: "bindplugin2 succeeded to bind the pod", bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")}, expectBoundByPlugin: true, expectBindPluginName: bindPlugin2.Name(), expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}}, }, { name: "bindplugin1 succeeded to bind the pod", bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")}, expectBoundByPlugin: true, expectBindPluginName: bindPlugin1.Name(), expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}}, }, { name: "bind plugin fails to bind the pod", bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")}, expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: reservePlugin.Name(), val: 1}}, }, } var pluginInvokeEventChan chan pluginInvokeEvent for _, test := range tests { t.Run(test.name, func(t *testing.T) { pluginInvokeEventChan = make(chan pluginInvokeEvent, 10) bindPlugin1.bindStatus = test.bindPluginStatuses[0] bindPlugin2.bindStatus = test.bindPluginStatuses[1] bindPlugin1.pluginInvokeEventChan = pluginInvokeEventChan bindPlugin2.pluginInvokeEventChan = pluginInvokeEventChan reservePlugin.pluginInvokeEventChan = pluginInvokeEventChan postBindPlugin.pluginInvokeEventChan = pluginInvokeEventChan // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.expectBoundByScheduler || test.expectBoundByPlugin { // bind plugins skipped to bind the pod if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Fatalf("Expected the pod to be scheduled. error: %v", err) } pod, err = testCtx.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) if err != nil { t.Errorf("can't get pod: %v", err) } if test.expectBoundByScheduler { if pod.Annotations[bindPluginAnnotation] != "" { t.Errorf("Expected the pod to be bound by scheduler instead of by bindplugin %s", pod.Annotations[bindPluginAnnotation]) } if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 { t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", bindPlugin1.numBindCalled, bindPlugin2.numBindCalled) } } else { if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName { t.Errorf("Expected the pod to be bound by bindplugin %s instead of by bindplugin %s", test.expectBindPluginName, pod.Annotations[bindPluginAnnotation]) } if bindPlugin1.numBindCalled != 1 { t.Errorf("Expected %s to be called once, was called %d times.", bindPlugin1.Name(), bindPlugin1.numBindCalled) } if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 { // expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called. t.Errorf("Expected %s not to be called, was called %d times.", bindPlugin2.Name(), bindPlugin1.numBindCalled) } } if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) { return postBindPlugin.numPostBindCalled == 1, nil }); err != nil { t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled) } if reservePlugin.numUnreserveCalled != 0 { t.Errorf("Expected unreserve to not be called, was called %d times.", reservePlugin.numUnreserveCalled) } } else { // bind plugin fails to bind the pod if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) } if postBindPlugin.numPostBindCalled > 0 { t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled) } } for j := range test.expectInvokeEvents { expectEvent := test.expectInvokeEvents[j] select { case event := <-pluginInvokeEventChan: if event.pluginName != expectEvent.pluginName { t.Errorf("Expect invoke event %d from plugin %s instead of %s", j, expectEvent.pluginName, event.pluginName) } if event.val != expectEvent.val { t.Errorf("Expect val of invoke event %d to be %d instead of %d", j, expectEvent.val, event.val) } case <-time.After(time.Second * 30): t.Errorf("Waiting for invoke event %d timeout.", j) } } postBindPlugin.reset() bindPlugin1.reset() bindPlugin2.reset() reservePlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPostBindPlugin tests invocation of postbind plugins. func TestPostBindPlugin(t *testing.T) { tests := []struct { name string preBindFail bool }{ { name: "plugin preBind fail", preBindFail: true, }, { name: "plugin preBind do not fail", preBindFail: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Create a plugin registry for testing. Register a prebind and a postbind plugin. preBindPlugin := &PreBindPlugin{ failPreBind: test.preBindFail, podUIDs: make(map[types.UID]struct{}), } postBindPlugin := &PostBindPlugin{ name: postBindPluginName, pluginInvokeEventChan: make(chan pluginInvokeEvent, 1), } registry, prof := initRegistryAndConfig(t, preBindPlugin, postBindPlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "postbind-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.preBindFail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) } if postBindPlugin.numPostBindCalled > 0 { t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } select { case <-postBindPlugin.pluginInvokeEventChan: case <-time.After(time.Second * 15): t.Errorf("pluginInvokeEventChan timed out") } if postBindPlugin.numPostBindCalled == 0 { t.Errorf("Expected the postbind plugin to be called, was called %d times.", postBindPlugin.numPostBindCalled) } } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPermitPlugin tests invocation of permit plugins. func TestPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. perPlugin := &PermitPlugin{name: permitPluginName} registry, prof := initRegistryAndConfig(t, perPlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool reject bool timeout bool }{ { name: "disable fail, reject and timeout flags", fail: false, reject: false, timeout: false, }, { name: "enable fail, disable reject and timeout flags", fail: true, reject: false, timeout: false, }, { name: "disable fail and timeout, enable reject flags", fail: false, reject: true, timeout: false, }, { name: "enable fail and reject, disable timeout flags", fail: true, reject: true, timeout: false, }, { name: "disable fail and reject, disable timeout flags", fail: false, reject: false, timeout: true, }, { name: "disable fail and reject, enable timeout flags", fail: false, reject: false, timeout: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { perPlugin.failPermit = test.fail perPlugin.rejectPermit = test.reject perPlugin.timeoutPermit = test.timeout perPlugin.waitAndRejectPermit = false perPlugin.waitAndAllowPermit = false // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) } } else { if test.reject || test.timeout { if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } } } if perPlugin.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } perPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestMultiplePermitPlugins tests multiple permit plugins returning wait for a same pod. func TestMultiplePermitPlugins(t *testing.T) { // Create a plugin registry for testing. perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Both permit plugins will return Wait for permitting perPlugin1.timeoutPermit = true perPlugin2.timeoutPermit = true // Create a test pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } var waitingPod framework.WaitingPod // Wait until the test pod is actually waiting. wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID) return waitingPod != nil, nil }) // Check the number of pending permits if l := len(waitingPod.GetPendingPlugins()); l != 2 { t.Errorf("Expected the number of pending plugins is 2, but got %d", l) } perPlugin1.allowAllPods() // Check the number of pending permits if l := len(waitingPod.GetPendingPlugins()); l != 1 { t.Errorf("Expected the number of pending plugins is 1, but got %d", l) } perPlugin2.allowAllPods() if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } if perPlugin1.numPermitCalled == 0 || perPlugin2.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) } // TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected. func TestPermitPluginsCancelled(t *testing.T) { // Create a plugin registry for testing. perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Both permit plugins will return Wait for permitting perPlugin1.timeoutPermit = true perPlugin2.timeoutPermit = true // Create a test pod. podName := "test-pod" pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } var waitingPod framework.WaitingPod // Wait until the test pod is actually waiting. wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID) return waitingPod != nil, nil }) perPlugin1.rejectAllPods() // Wait some time for the permit plugins to be cancelled err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { return perPlugin1.cancelled && perPlugin2.cancelled, nil }) if err != nil { t.Errorf("Expected all permit plugins to be cancelled") } } // TestCoSchedulingWithPermitPlugin tests invocation of permit plugins. func TestCoSchedulingWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. permitPlugin := &PermitPlugin{name: permitPluginName} registry, prof := initRegistryAndConfig(t, permitPlugin) // Create the API server and the scheduler with the test plugin set. // TODO Make the subtests not share scheduler instances. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string waitReject bool waitAllow bool }{ { name: "having wait reject true and wait allow false", waitReject: true, waitAllow: false, }, { name: "having wait reject false and wait allow true", waitReject: false, waitAllow: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { permitPlugin.failPermit = false permitPlugin.rejectPermit = false permitPlugin.timeoutPermit = false permitPlugin.waitAndRejectPermit = test.waitReject permitPlugin.waitAndAllowPermit = test.waitAllow // Create two pods. First pod to enter Permit() will wait and a second one will either // reject or allow first one. podA, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating the first pod: %v", err) } podB, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating the second pod: %v", err) } if test.waitReject { if err = waitForPodUnschedulable(testCtx.ClientSet, podA); err != nil { t.Errorf("Didn't expect the first pod to be scheduled. error: %v", err) } if err = waitForPodUnschedulable(testCtx.ClientSet, podB); err != nil { t.Errorf("Didn't expect the second pod to be scheduled. error: %v", err) } if !((permitPlugin.waitingPod == podA.Name && permitPlugin.rejectingPod == podB.Name) || (permitPlugin.waitingPod == podB.Name && permitPlugin.rejectingPod == podA.Name)) { t.Errorf("Expect one pod to wait and another pod to reject instead %s waited and %s rejected.", permitPlugin.waitingPod, permitPlugin.rejectingPod) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podA); err != nil { t.Errorf("Expected the first pod to be scheduled. error: %v", err) } if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podB); err != nil { t.Errorf("Expected the second pod to be scheduled. error: %v", err) } if !((permitPlugin.waitingPod == podA.Name && permitPlugin.allowingPod == podB.Name) || (permitPlugin.waitingPod == podB.Name && permitPlugin.allowingPod == podA.Name)) { t.Errorf("Expect one pod to wait and another pod to allow instead %s waited and %s allowed.", permitPlugin.waitingPod, permitPlugin.allowingPod) } } if permitPlugin.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } permitPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{podA, podB}) }) } } // TestFilterPlugin tests invocation of filter plugins. func TestFilterPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a filter plugin. filterPlugin := &FilterPlugin{} registry, prof := initRegistryAndConfig(t, filterPlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "filter-plugin", nil), 1, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool }{ { name: "fail filter plugin", fail: true, }, { name: "do not fail filter plugin", fail: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { filterPlugin.failFilter = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but got: %v", err) } if filterPlugin.numFilterCalled < 1 { t.Errorf("Expected the filter plugin to be called at least 1 time, but got %v.", filterPlugin.numFilterCalled) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } if filterPlugin.numFilterCalled != 1 { t.Errorf("Expected the filter plugin to be called 1 time, but got %v.", filterPlugin.numFilterCalled) } } filterPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPreScorePlugin tests invocation of pre-score plugins. func TestPreScorePlugin(t *testing.T) { // Create a plugin registry for testing. Register only a pre-score plugin. preScorePlugin := &PreScorePlugin{} registry, prof := initRegistryAndConfig(t, preScorePlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "pre-score-plugin", nil), 2, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string fail bool }{ { name: "fail preScore plugin", fail: true, }, { name: "do not fail preScore plugin", fail: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { preScorePlugin.failPreScore = test.fail // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if test.fail { if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { t.Errorf("Expected a scheduling error, but got: %v", err) } } else { if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { t.Errorf("Expected the pod to be scheduled. error: %v", err) } } if preScorePlugin.numPreScoreCalled == 0 { t.Errorf("Expected the pre-score plugin to be called.") } preScorePlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPreEnqueuePlugin tests invocation of enqueue plugins. func TestPreEnqueuePlugin(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodSchedulingReadiness, true)() // Create a plugin registry for testing. Register only a filter plugin. enqueuePlugin := &PreEnqueuePlugin{} // Plumb a preFilterPlugin to verify if it's called or not. preFilterPlugin := &PreFilterPlugin{} registry, prof := initRegistryAndConfig(t, enqueuePlugin, preFilterPlugin) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "enqueue-plugin", nil), 1, scheduler.WithProfiles(prof), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) tests := []struct { name string pod *v1.Pod admitEnqueue bool }{ { name: "pod is admitted to enqueue", pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).Container("pause").Obj(), admitEnqueue: true, }, { name: "pod is not admitted to enqueue", pod: st.MakePod().Name("p").Namespace(testCtx.NS.Name).SchedulingGates([]string{"foo"}).Container("pause").Obj(), admitEnqueue: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { enqueuePlugin.admit = tt.admitEnqueue // Create a best effort pod. pod, err := createPausePod(testCtx.ClientSet, tt.pod) if err != nil { t.Errorf("Error while creating a test pod: %v", err) } if tt.admitEnqueue { if err := waitForPodToScheduleWithTimeout(testCtx.ClientSet, pod, 10*time.Second); err != nil { t.Errorf("Expected the pod to be schedulable, but got: %v", err) } // Also verify enqueuePlugin is called. if enqueuePlugin.called == 0 { t.Errorf("Expected the enqueuePlugin plugin to be called at least once, but got 0") } } else { if err := waitForPodSchedulingGated(testCtx.ClientSet, pod, 10*time.Second); err != nil { t.Errorf("Expected the pod to be scheduling waiting, but got: %v", err) } // Also verify preFilterPlugin is not called. if preFilterPlugin.numPreFilterCalled != 0 { t.Errorf("Expected the preFilter plugin not to be called, but got %v", preFilterPlugin.numPreFilterCalled) } } preFilterPlugin.reset() testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) }) } } // TestPreemptWithPermitPlugin tests preempt with permit plugins. // It verifies how waitingPods behave in different scenarios: // - when waitingPods get preempted // - they should be removed from internal waitingPods map, but not physically deleted // - it'd trigger moving unschedulable Pods, but not the waitingPods themselves // // - when waitingPods get deleted externally, it'd trigger moving unschedulable Pods func TestPreemptWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register a permit and a filter plugin. permitPlugin := &PermitPlugin{} // Inject a fake filter plugin to use its internal `numFilterCalled` to verify // how many times a Pod gets tried scheduling. filterPlugin := &FilterPlugin{numCalledPerPod: make(map[string]int)} registry := frameworkruntime.Registry{ permitPluginName: newPlugin(permitPlugin), filterPluginName: newPlugin(filterPlugin), } // Setup initial permit and filter plugins in the profile. cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{ { SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: &configv1.Plugins{ Permit: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: permitPluginName}, }, }, Filter: configv1.PluginSet{ // Ensure the fake filter plugin is always called; otherwise noderesources // would fail first and exit the Filter phase. Enabled: []configv1.Plugin{ {Name: filterPluginName}, {Name: noderesources.Name}, }, Disabled: []configv1.Plugin{ {Name: noderesources.Name}, }, }, }, }, }, }) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "preempt-with-permit-plugin", nil), 0, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) // Add one node. nodeRes := map[v1.ResourceName]string{ v1.ResourcePods: "32", v1.ResourceCPU: "500m", v1.ResourceMemory: "500", } _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode().Capacity(nodeRes), 1) if err != nil { t.Fatal(err) } ns := testCtx.NS.Name lowPriority, highPriority := int32(100), int32(300) resReq := map[v1.ResourceName]string{ v1.ResourceCPU: "200m", v1.ResourceMemory: "200", } preemptorReq := map[v1.ResourceName]string{ v1.ResourceCPU: "400m", v1.ResourceMemory: "400", } tests := []struct { name string deleteWaitingPod bool maxNumWaitingPodCalled int runningPod *v1.Pod waitingPod *v1.Pod preemptor *v1.Pod }{ { name: "waiting pod is not physically deleted upon preemption", maxNumWaitingPodCalled: 2, runningPod: st.MakePod().Name("running-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(), }, { name: "rejecting a waiting pod to trigger retrying unschedulable pods immediately, but the waiting pod itself won't be retried", maxNumWaitingPodCalled: 1, waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(highPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(), }, { name: "deleting a waiting pod to trigger retrying unschedulable pods immediately", deleteWaitingPod: true, maxNumWaitingPodCalled: 1, waitingPod: st.MakePod().Name("waiting-pod").Namespace(ns).Priority(lowPriority).Req(resReq).ZeroTerminationGracePeriod().Obj(), preemptor: st.MakePod().Name("preemptor-pod").Namespace(ns).Priority(lowPriority).Req(preemptorReq).ZeroTerminationGracePeriod().Obj(), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { defer func() { permitPlugin.reset() filterPlugin.reset() var pods []*v1.Pod for _, p := range []*v1.Pod{tt.runningPod, tt.waitingPod, tt.preemptor} { if p != nil { pods = append(pods, p) } } testutils.CleanupPods(testCtx.ClientSet, t, pods) }() permitPlugin.waitAndAllowPermit = true permitPlugin.waitingPod = "waiting-pod" if r := tt.runningPod; r != nil { if _, err := createPausePod(testCtx.ClientSet, r); err != nil { t.Fatalf("Error while creating the running pod: %v", err) } // Wait until the pod to be scheduled. if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, r); err != nil { t.Fatalf("The running pod is expected to be scheduled: %v", err) } } if w := tt.waitingPod; w != nil { if _, err := createPausePod(testCtx.ClientSet, w); err != nil { t.Fatalf("Error while creating the waiting pod: %v", err) } // Wait until the waiting-pod is actually waiting. if err := wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { w := false permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) return w, nil }); err != nil { t.Fatalf("The waiting pod is expected to be waiting: %v", err) } } if p := tt.preemptor; p != nil { if _, err := createPausePod(testCtx.ClientSet, p); err != nil { t.Fatalf("Error while creating the preemptor pod: %v", err) } // Delete the waiting pod if specified. if w := tt.waitingPod; w != nil && tt.deleteWaitingPod { if err := deletePod(testCtx.ClientSet, w.Name, w.Namespace); err != nil { t.Fatalf("Error while deleting the waiting pod: %v", err) } } if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, p); err != nil { t.Fatalf("Expected the preemptor pod to be scheduled. error: %v", err) } } if w := tt.waitingPod; w != nil { if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { w := false permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) return !w, nil }); err != nil { t.Fatalf("Expected the waiting pod to get preempted.") } filterPlugin.RLock() waitingPodCalled := filterPlugin.numCalledPerPod[fmt.Sprintf("%v/%v", w.Namespace, w.Name)] filterPlugin.RUnlock() if waitingPodCalled > tt.maxNumWaitingPodCalled { t.Fatalf("Expected the waiting pod to be called %v times at most, but got %v", tt.maxNumWaitingPodCalled, waitingPodCalled) } if !tt.deleteWaitingPod { // Expect the waitingPod to be still present. if _, err := getPod(testCtx.ClientSet, w.Name, w.Namespace); err != nil { t.Error("Get waiting pod in waiting pod failed.") } } if permitPlugin.numPermitCalled == 0 { t.Errorf("Expected the permit plugin to be called.") } } if r := tt.runningPod; r != nil { // Expect the runningPod to be deleted physically. if _, err = getPod(testCtx.ClientSet, r.Name, r.Namespace); err == nil { t.Error("The running pod still exists.") } else if !errors.IsNotFound(err) { t.Errorf("Get running pod failed: %v", err) } } }) } } const ( jobPluginName = "job plugin" ) var _ framework.PreFilterPlugin = &JobPlugin{} var _ framework.PostBindPlugin = &PostBindPlugin{} type JobPlugin struct { podLister listersv1.PodLister podsActivated bool } func (j *JobPlugin) Name() string { return jobPluginName } func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) { labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""}) driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector) if err != nil { return nil, framework.AsStatus(err) } if len(driverPods) == 0 { return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod") } return nil, nil } func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions { return nil } func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { if _, ok := p.Labels["driver"]; !ok { return } // If it's a driver pod, move other executor pods proactively to accelerating the scheduling. labelSelector := labels.SelectorFromSet(labels.Set{"executor": ""}) podsToActivate, err := j.podLister.Pods(p.Namespace).List(labelSelector) if err == nil && len(podsToActivate) != 0 { c, err := state.Read(framework.PodsToActivateKey) if err == nil { if s, ok := c.(*framework.PodsToActivate); ok { s.Lock() for _, pod := range podsToActivate { namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name) s.Map[namespacedName] = pod } s.Unlock() j.podsActivated = true } } } } // This test simulates a typical spark job workflow. // - N executor pods are created, but kept pending due to missing the driver pod // - when the driver pod gets created and scheduled, proactively move the executors to activeQ // and thus accelerate the entire job workflow. func TestActivatePods(t *testing.T) { var jobPlugin *JobPlugin // Create a plugin registry for testing. Register a Job plugin. registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()} return jobPlugin, nil }} // Setup initial filter plugin for testing. cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{{ SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: &configv1.Plugins{ PreFilter: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: jobPluginName}, }, }, PostBind: configv1.PluginSet{ Enabled: []configv1.Plugin{ {Name: jobPluginName}, }, }, }, }}, }) // Create the API server and the scheduler with the test plugin set. testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, scheduler.WithProfiles(cfg.Profiles...), scheduler.WithFrameworkOutOfTreeRegistry(registry)) defer testutils.CleanupTest(t, testCtx) cs := testCtx.ClientSet ns := testCtx.NS.Name pause := imageutils.GetPauseImageName() // Firstly create 2 executor pods. var pods []*v1.Pod for i := 1; i <= 2; i++ { name := fmt.Sprintf("executor-%v", i) executor := st.MakePod().Name(name).Namespace(ns).Label("executor", "").Container(pause).Obj() pods = append(pods, executor) if _, err := cs.CoreV1().Pods(executor.Namespace).Create(context.TODO(), executor, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create pod %v: %v", executor.Name, err) } } // Wait for the 2 executor pods to be unschedulable. for _, pod := range pods { if err := waitForPodUnschedulable(cs, pod); err != nil { t.Errorf("Failed to wait for Pod %v to be unschedulable: %v", pod.Name, err) } } // Create a driver pod. driver := st.MakePod().Name("driver").Namespace(ns).Label("driver", "").Container(pause).Obj() pods = append(pods, driver) if _, err := cs.CoreV1().Pods(driver.Namespace).Create(context.TODO(), driver, metav1.CreateOptions{}); err != nil { t.Fatalf("Failed to create pod %v: %v", driver.Name, err) } // Verify all pods to be scheduled. for _, pod := range pods { if err := waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout); err != nil { t.Fatalf("Failed to wait for Pod %v to be schedulable: %v", pod.Name, err) } } // Lastly verify the pods activation logic is really called. if jobPlugin.podsActivated == false { t.Errorf("JobPlugin's pods activation logic is not called") } } func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext { testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...) testutils.SyncInformerFactory(testCtx) go testCtx.Scheduler.Run(testCtx.Ctx) if nodeCount > 0 { if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { // Make sure to cleanup the resources when initializing error. testutils.CleanupTest(t, testCtx) t.Fatal(err) } } return testCtx } // initRegistryAndConfig returns registry and plugins config based on give plugins. func initRegistryAndConfig(t *testing.T, plugins ...framework.Plugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { if len(plugins) == 0 { return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{} } registry := frameworkruntime.Registry{} pls := &configv1.Plugins{} for _, p := range plugins { registry.Register(p.Name(), newPlugin(p)) plugin := configv1.Plugin{Name: p.Name()} switch p.(type) { case *PreEnqueuePlugin: pls.PreEnqueue.Enabled = append(pls.PreEnqueue.Enabled, plugin) case *PreFilterPlugin: pls.PreFilter.Enabled = append(pls.PreFilter.Enabled, plugin) case *FilterPlugin: pls.Filter.Enabled = append(pls.Filter.Enabled, plugin) case *PreScorePlugin: pls.PreScore.Enabled = append(pls.PreScore.Enabled, plugin) case *ScorePlugin, *ScoreWithNormalizePlugin: pls.Score.Enabled = append(pls.Score.Enabled, plugin) case *ReservePlugin: pls.Reserve.Enabled = append(pls.Reserve.Enabled, plugin) case *PreBindPlugin: pls.PreBind.Enabled = append(pls.PreBind.Enabled, plugin) case *BindPlugin: pls.Bind.Enabled = append(pls.Bind.Enabled, plugin) // It's intentional to disable the DefaultBind plugin. Otherwise, DefaultBinder's failure would fail // a pod's scheduling, as well as the test BindPlugin's execution. pls.Bind.Disabled = []configv1.Plugin{{Name: defaultbinder.Name}} case *PostBindPlugin: pls.PostBind.Enabled = append(pls.PostBind.Enabled, plugin) case *PermitPlugin: pls.Permit.Enabled = append(pls.Permit.Enabled, plugin) } } versionedCfg := configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{{ SchedulerName: pointer.String(v1.DefaultSchedulerName), Plugins: pls, }}, } cfg := configtesting.V1ToInternalWithDefaults(t, versionedCfg) return registry, cfg.Profiles[0] }