From 070773c3996e12c62455c9a920914945a4efe724 Mon Sep 17 00:00:00 2001 From: drfish Date: Sun, 20 Dec 2020 00:18:40 +0800 Subject: [PATCH] Remove scheduler plugins package's dependency from core package --- pkg/scheduler/core/BUILD | 1 - pkg/scheduler/core/generic_scheduler.go | 107 +----------- pkg/scheduler/factory.go | 1 + pkg/scheduler/framework/interface.go | 3 + .../framework/plugins/defaultpreemption/BUILD | 1 - .../defaultpreemption/default_preemption.go | 19 +- .../default_preemption_test.go | 4 +- pkg/scheduler/framework/runtime/BUILD | 2 + pkg/scheduler/framework/runtime/framework.go | 85 +++++++++ .../framework/runtime/framework_test.go | 165 ++++++++++++++++++ test/e2e/framework/.import-restrictions | 2 + 11 files changed, 277 insertions(+), 113 deletions(-) diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index ec49c76dfbd..d6447dac312 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -20,7 +20,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", - "//staging/src/k8s.io/component-helpers/scheduling/corev1:go_default_library", "//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/k8s.io/utils/trace:go_default_library", diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 961038dbe1b..9451dd4ff54 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -29,7 +29,6 @@ import ( "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" - corev1helpers "k8s.io/component-helpers/scheduling/corev1" extenderv1 "k8s.io/kube-scheduler/extender/v1" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" @@ -297,12 +296,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] - fits, status, err := PodPassesFiltersOnNode(ctx, fwk.PreemptHandle(), state, pod, nodeInfo) - if err != nil { - errCh.SendErrorWithCancel(err, cancel) + status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) + if status.Code() == framework.Error { + errCh.SendErrorWithCancel(status.AsError(), cancel) return } - if fits { + if status.IsSuccess() { length := atomic.AddInt32(&feasibleNodesLen, 1) if length > numNodesToFind { cancel() @@ -311,11 +310,9 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk fra feasibleNodes[length-1] = nodeInfo.Node() } } else { - if !status.IsSuccess() { - statusesLock.Lock() - statuses[nodeInfo.Node().Name] = status - statusesLock.Unlock() - } + statusesLock.Lock() + statuses[nodeInfo.Node().Name] = status + statusesLock.Unlock() } } @@ -372,96 +369,6 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes return feasibleNodes, nil } -// addNominatedPods adds pods with equal or greater priority which are nominated -// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, -// 3) augmented nodeInfo. -func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { - if ph == nil || nodeInfo == nil || nodeInfo.Node() == nil { - // This may happen only in tests. - return false, state, nodeInfo, nil - } - nominatedPods := ph.NominatedPodsForNode(nodeInfo.Node().Name) - if len(nominatedPods) == 0 { - return false, state, nodeInfo, nil - } - nodeInfoOut := nodeInfo.Clone() - stateOut := state.Clone() - podsAdded := false - for _, p := range nominatedPods { - if corev1helpers.PodPriority(p) >= corev1helpers.PodPriority(pod) && p.UID != pod.UID { - nodeInfoOut.AddPod(p) - status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) - if !status.IsSuccess() { - return false, state, nodeInfo, status.AsError() - } - podsAdded = true - } - } - return podsAdded, stateOut, nodeInfoOut, nil -} - -// PodPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the -// filter plugins. -// This function is called from two different places: Schedule and Preempt. -// When it is called from Schedule, we want to test whether the pod is -// schedulable on the node with all the existing pods on the node plus higher -// and equal priority pods nominated to run on the node. -// When it is called from Preempt, we should remove the victims of preemption -// and add the nominated pods. Removal of the victims is done by -// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and -// NodeInfo before calling this function. -// TODO: move this out so that plugins don't need to depend on pkg. -func PodPassesFiltersOnNode( - ctx context.Context, - ph framework.PreemptHandle, - state *framework.CycleState, - pod *v1.Pod, - info *framework.NodeInfo, -) (bool, *framework.Status, error) { - var status *framework.Status - - podsAdded := false - // We run filters twice in some cases. If the node has greater or equal priority - // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo. - // If all filters succeed in this pass, we run them again when these - // nominated pods are not added. This second pass is necessary because some - // filters such as inter-pod affinity may not pass without the nominated pods. - // If there are no nominated pods for the node or if the first run of the - // filters fail, we don't run the second pass. - // We consider only equal or higher priority pods in the first pass, because - // those are the current "pod" must yield to them and not take a space opened - // for running them. It is ok if the current "pod" take resources freed for - // lower priority pods. - // Requiring that the new pod is schedulable in both circumstances ensures that - // we are making a conservative decision: filters like resources and inter-pod - // anti-affinity are more likely to fail when the nominated pods are treated - // as running, while filters like pod affinity are more likely to fail when - // the nominated pods are treated as not running. We can't just assume the - // nominated pods are running because they are not running right now and in fact, - // they may end up getting scheduled to a different node. - for i := 0; i < 2; i++ { - stateToUse := state - nodeInfoToUse := info - if i == 0 { - var err error - podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info) - if err != nil { - return false, nil, err - } - } else if !podsAdded || !status.IsSuccess() { - break - } - - statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) - status = statusMap.Merge() - if !status.IsSuccess() && !status.IsUnschedulable() { - return false, status, status.AsError() - } - } - - return status.IsSuccess(), status, nil -} - // prioritizeNodes prioritizes the nodes by running the score plugins, // which return a score for each node from the call to RunScorePlugins(). // The scores from each plugin are added together to make the score for that node, then diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index cf3887dd22f..f499a0fa1ba 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -23,6 +23,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index a0ab27c719e..c1a392135b6 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -539,6 +539,9 @@ type Handle interface { SharedInformerFactory() informers.SharedInformerFactory + // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node. + RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status + // TODO: unroll the wrapped interfaces to Handle. PreemptHandle() PreemptHandle } diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/BUILD b/pkg/scheduler/framework/plugins/defaultpreemption/BUILD index a9a43db7dd2..c1a55260282 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/BUILD +++ b/pkg/scheduler/framework/plugins/defaultpreemption/BUILD @@ -12,7 +12,6 @@ go_library( "//pkg/features:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config/validation:go_default_library", - "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework:go_default_library", "//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/metrics:go_default_library", diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index 234142cd59b..787ae60c542 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -42,7 +42,6 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" - "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -191,7 +190,7 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor return nil, err } if len(allNodes) == 0 { - return nil, core.ErrNoNodesAvailable + return nil, fmt.Errorf("no nodes available") } potentialNodes := nodesWherePreemptionMightHelp(allNodes, m) @@ -219,7 +218,7 @@ func (pl *DefaultPreemption) FindCandidates(ctx context.Context, state *framewor klog.Infof("from a pool of %d nodes (offset: %d, sample %d nodes: %v), ~%d candidates will be chosen", len(potentialNodes), offset, len(sample), sample, numCandidates) } - return dryRunPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs, offset, numCandidates), nil + return dryRunPreemption(ctx, pl.fh, state, pod, potentialNodes, pdbs, offset, numCandidates), nil } // PodEligibleToPreemptOthers determines whether this pod should be considered @@ -308,7 +307,7 @@ func (cl *candidateList) get() []Candidate { // and returns preemption candidates. The number of candidates depends on the // constraints defined in the plugin's args. In the returned list of // candidates, ones that do not violate PDB are preferred over ones that do. -func dryRunPreemption(ctx context.Context, fh framework.PreemptHandle, +func dryRunPreemption(ctx context.Context, fh framework.Handle, state *framework.CycleState, pod *v1.Pod, potentialNodes []*framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) []Candidate { nonViolatingCandidates := newCandidateList(numCandidates) @@ -577,7 +576,7 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str // these predicates can be satisfied by removing more pods from the node. func selectVictimsOnNode( ctx context.Context, - ph framework.PreemptHandle, + fh framework.Handle, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, @@ -585,6 +584,7 @@ func selectVictimsOnNode( ) ([]*v1.Pod, int, bool) { var potentialVictims []*v1.Pod + ph := fh.PreemptHandle() removePod := func(rp *v1.Pod) error { if err := nodeInfo.RemovePod(rp); err != nil { return err @@ -626,9 +626,9 @@ func selectVictimsOnNode( // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. - if fits, _, err := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo); !fits { - if err != nil { - klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) + if status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() { + if status.Code() == framework.Error { + klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, status.AsError()) } return nil, 0, false @@ -644,7 +644,8 @@ func selectVictimsOnNode( if err := addPod(p); err != nil { return false, err } - fits, _, _ := core.PodPassesFiltersOnNode(ctx, ph, state, pod, nodeInfo) + status := fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) + fits := status.IsSuccess() if !fits { if err := removePod(p); err != nil { return false, err diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 8c07823bbb6..32f7057b93b 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -978,7 +978,7 @@ func TestDryRunPreemption(t *testing.T) { t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status) } offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos))) - got := dryRunPreemption(context.Background(), fwk.PreemptHandle(), state, pod, nodeInfos, tt.pdbs, offset, numCandidates) + got := dryRunPreemption(context.Background(), fwk, state, pod, nodeInfos, tt.pdbs, offset, numCandidates) if err != nil { t.Fatal(err) } @@ -1201,7 +1201,7 @@ func TestSelectBestCandidate(t *testing.T) { pl := &DefaultPreemption{args: *getDefaultDefaultPreemptionArgs()} offset, numCandidates := pl.getOffsetAndNumCandidates(int32(len(nodeInfos))) - candidates := dryRunPreemption(context.Background(), fwk.PreemptHandle(), state, tt.pod, nodeInfos, nil, offset, numCandidates) + candidates := dryRunPreemption(context.Background(), fwk, state, tt.pod, nodeInfos, nil, offset, numCandidates) s := SelectCandidate(candidates) found := false for _, nodeName := range tt.expected { diff --git a/pkg/scheduler/framework/runtime/BUILD b/pkg/scheduler/framework/runtime/BUILD index 4b8e776d8a3..ab012fbeb30 100644 --- a/pkg/scheduler/framework/runtime/BUILD +++ b/pkg/scheduler/framework/runtime/BUILD @@ -25,6 +25,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library", + "//staging/src/k8s.io/component-helpers/scheduling/corev1:go_default_library", "//staging/src/k8s.io/kube-scheduler/config/v1beta1:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", "//vendor/sigs.k8s.io/yaml:go_default_library", @@ -41,6 +42,7 @@ go_test( deps = [ "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework:go_default_library", + "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 87e0afd2b13..8c97c94f036 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/events" + "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" "k8s.io/kube-scheduler/config/v1beta1" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -594,6 +595,90 @@ func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.Po return r, s } +// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins +// for nominated pod on the given node. +// This function is called from two different places: Schedule and Preempt. +// When it is called from Schedule, we want to test whether the pod is +// schedulable on the node with all the existing pods on the node plus higher +// and equal priority pods nominated to run on the node. +// When it is called from Preempt, we should remove the victims of preemption +// and add the nominated pods. Removal of the victims is done by +// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and +// NodeInfo before calling this function. +func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status { + var status *framework.Status + + ph := f.PreemptHandle() + podsAdded := false + // We run filters twice in some cases. If the node has greater or equal priority + // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo. + // If all filters succeed in this pass, we run them again when these + // nominated pods are not added. This second pass is necessary because some + // filters such as inter-pod affinity may not pass without the nominated pods. + // If there are no nominated pods for the node or if the first run of the + // filters fail, we don't run the second pass. + // We consider only equal or higher priority pods in the first pass, because + // those are the current "pod" must yield to them and not take a space opened + // for running them. It is ok if the current "pod" take resources freed for + // lower priority pods. + // Requiring that the new pod is schedulable in both circumstances ensures that + // we are making a conservative decision: filters like resources and inter-pod + // anti-affinity are more likely to fail when the nominated pods are treated + // as running, while filters like pod affinity are more likely to fail when + // the nominated pods are treated as not running. We can't just assume the + // nominated pods are running because they are not running right now and in fact, + // they may end up getting scheduled to a different node. + for i := 0; i < 2; i++ { + stateToUse := state + nodeInfoToUse := info + if i == 0 { + var err error + podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + } else if !podsAdded || !status.IsSuccess() { + break + } + + statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) + status = statusMap.Merge() + if !status.IsSuccess() && !status.IsUnschedulable() { + return status + } + } + + return status +} + +// addNominatedPods adds pods with equal or greater priority which are nominated +// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState, +// 3) augmented nodeInfo. +func addNominatedPods(ctx context.Context, ph framework.PreemptHandle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) { + if ph == nil || nodeInfo.Node() == nil { + // This may happen only in tests. + return false, state, nodeInfo, nil + } + nominatedPods := ph.NominatedPodsForNode(nodeInfo.Node().Name) + if len(nominatedPods) == 0 { + return false, state, nodeInfo, nil + } + nodeInfoOut := nodeInfo.Clone() + stateOut := state.Clone() + podsAdded := false + for _, p := range nominatedPods { + if corev1.PodPriority(p) >= corev1.PodPriority(pod) && p.UID != pod.UID { + nodeInfoOut.AddPod(p) + status := ph.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut) + if !status.IsSuccess() { + return false, state, nodeInfo, status.AsError() + } + podsAdded = true + } + } + return podsAdded, stateOut, nodeInfoOut, nil +} + // RunPreScorePlugins runs the set of configured pre-score plugins. If any // of these plugins returns any status other than "Success", the given pod is rejected. func (f *frameworkImpl) RunPreScorePlugins( diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index f3b76705c28..d62f2e75744 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -28,12 +28,14 @@ import ( "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" ) @@ -51,6 +53,7 @@ const ( bindPlugin = "bind-plugin" testProfileName = "test-profile" + nodeName = "testNode" ) // TestScoreWithNormalizePlugin implements ScoreWithNormalizePlugin interface. @@ -341,6 +344,20 @@ var state = &framework.CycleState{} // Pod is only used for logging errors. var pod = &v1.Pod{} +var node = &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, +} +var lowPriority, highPriority = int32(0), int32(1000) +var lowPriorityPod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{UID: "low"}, + Spec: v1.PodSpec{Priority: &lowPriority}, +} +var highPriorityPod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{UID: "high"}, + Spec: v1.PodSpec{Priority: &highPriority}, +} var nodes = []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, @@ -1144,6 +1161,154 @@ func TestPostFilterPlugins(t *testing.T) { } } +func TestFilterPluginsWithNominatedPods(t *testing.T) { + tests := []struct { + name string + preFilterPlugin *TestPlugin + filterPlugin *TestPlugin + pod *v1.Pod + nominatedPod *v1.Pod + node *v1.Node + nodeInfo *framework.NodeInfo + wantStatus *framework.Status + }{ + { + name: "node has no nominated pod", + preFilterPlugin: nil, + filterPlugin: nil, + pod: lowPriorityPod, + nominatedPod: nil, + node: node, + nodeInfo: framework.NewNodeInfo(pod), + wantStatus: nil, + }, + { + name: "node has a high-priority nominated pod and all filters succeed", + preFilterPlugin: &TestPlugin{ + name: "TestPlugin1", + inj: injectedResult{ + PreFilterAddPodStatus: int(framework.Success), + }, + }, + filterPlugin: &TestPlugin{ + name: "TestPlugin2", + inj: injectedResult{ + FilterStatus: int(framework.Success), + }, + }, + pod: lowPriorityPod, + nominatedPod: highPriorityPod, + node: node, + nodeInfo: framework.NewNodeInfo(pod), + wantStatus: nil, + }, + { + name: "node has a high-priority nominated pod and pre filters fail", + preFilterPlugin: &TestPlugin{ + name: "TestPlugin1", + inj: injectedResult{ + PreFilterAddPodStatus: int(framework.Error), + }, + }, + filterPlugin: nil, + pod: lowPriorityPod, + nominatedPod: highPriorityPod, + node: node, + nodeInfo: framework.NewNodeInfo(pod), + wantStatus: framework.NewStatus(framework.Error, `running AddPod on PreFilter plugin "TestPlugin1": injected status`), + }, + { + name: "node has a high-priority nominated pod and filters fail", + preFilterPlugin: &TestPlugin{ + name: "TestPlugin1", + inj: injectedResult{ + PreFilterAddPodStatus: int(framework.Success), + }, + }, + filterPlugin: &TestPlugin{ + name: "TestPlugin2", + inj: injectedResult{ + FilterStatus: int(framework.Error), + }, + }, + pod: lowPriorityPod, + nominatedPod: highPriorityPod, + node: node, + nodeInfo: framework.NewNodeInfo(pod), + wantStatus: framework.NewStatus(framework.Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`), + }, + { + name: "node has a low-priority nominated pod and pre filters return unschedulable", + preFilterPlugin: &TestPlugin{ + name: "TestPlugin1", + inj: injectedResult{ + PreFilterAddPodStatus: int(framework.Unschedulable), + }, + }, + filterPlugin: &TestPlugin{ + name: "TestPlugin2", + inj: injectedResult{ + FilterStatus: int(framework.Success), + }, + }, + pod: highPriorityPod, + nominatedPod: lowPriorityPod, + node: node, + nodeInfo: framework.NewNodeInfo(pod), + wantStatus: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := Registry{} + cfgPls := &config.Plugins{ + PreFilter: &config.PluginSet{}, + Filter: &config.PluginSet{}, + } + + if tt.preFilterPlugin != nil { + if err := registry.Register(tt.preFilterPlugin.name, + func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tt.preFilterPlugin, nil + }); err != nil { + t.Fatalf("fail to register preFilter plugin (%s)", tt.preFilterPlugin.name) + } + cfgPls.PreFilter.Enabled = append( + cfgPls.PreFilter.Enabled, + config.Plugin{Name: tt.preFilterPlugin.name}, + ) + } + if tt.filterPlugin != nil { + if err := registry.Register(tt.filterPlugin.name, + func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tt.filterPlugin, nil + }); err != nil { + t.Fatalf("fail to register filter plugin (%s)", tt.filterPlugin.name) + } + cfgPls.Filter.Enabled = append( + cfgPls.Filter.Enabled, + config.Plugin{Name: tt.filterPlugin.name}, + ) + } + + podNominator := internalqueue.NewPodNominator() + if tt.nominatedPod != nil { + podNominator.AddNominatedPod(tt.nominatedPod, nodeName) + } + f, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs, WithPodNominator(podNominator)) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + tt.nodeInfo.SetNode(tt.node) + gotStatus := f.RunFilterPluginsWithNominatedPods(context.TODO(), nil, tt.pod, tt.nodeInfo) + if !reflect.DeepEqual(gotStatus, tt.wantStatus) { + t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus) + } + }) + } +} + func TestPreBindPlugins(t *testing.T) { tests := []struct { name string diff --git a/test/e2e/framework/.import-restrictions b/test/e2e/framework/.import-restrictions index 1db4ed4230e..2df6a81ccd1 100644 --- a/test/e2e/framework/.import-restrictions +++ b/test/e2e/framework/.import-restrictions @@ -189,7 +189,9 @@ rules: - k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports - k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources - k8s.io/kubernetes/pkg/scheduler/framework/runtime + - k8s.io/kubernetes/pkg/scheduler/internal/heap - k8s.io/kubernetes/pkg/scheduler/internal/parallelize + - k8s.io/kubernetes/pkg/scheduler/internal/queue - k8s.io/kubernetes/pkg/scheduler/listers - k8s.io/kubernetes/pkg/scheduler/metrics - k8s.io/kubernetes/pkg/scheduler/nodeinfo