Updated the scheduler framework's PreFilter interface to return a PreFilterResult

This commit is contained in:
Abdullah Gharaibeh 2022-03-10 17:48:33 -05:00
parent 45f2c63d6a
commit 5b20b68bc9
24 changed files with 370 additions and 164 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -337,8 +338,10 @@ type PreFilterExtensions interface {
type PreFilterPlugin interface { type PreFilterPlugin interface {
Plugin Plugin
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter // PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected. // plugins must return success or the pod will be rejected. PreFilter could optionally
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status // return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one, // PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally // or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
// modify its pre-processed info. The framework guarantees that the extensions // modify its pre-processed info. The framework guarantees that the extensions
@ -498,7 +501,9 @@ type Framework interface {
// *Status and its code is set to non-success if any of the plugins returns // *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling // anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted. // cycle is aborted.
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status // It also returns a PreFilterResult, which may influence what or how many nodes to
// evaluate downstream.
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
// RunPostFilterPlugins runs the set of configured PostFilter plugins. // RunPostFilterPlugins runs the set of configured PostFilter plugins.
// PostFilter plugins can either be informational, in which case should be configured // PostFilter plugins can either be informational, in which case should be configured
@ -608,6 +613,36 @@ type Handle interface {
Parallelizer() parallelize.Parallelizer Parallelizer() parallelize.Parallelizer
} }
// PreFilterResult wraps needed info for scheduler framework to act upon PreFilter phase.
type PreFilterResult struct {
// The set of nodes that should be considered downstream; if nil then
// all nodes are eligible.
NodeNames sets.String
}
func (p *PreFilterResult) AllNodes() bool {
return p == nil || p.NodeNames == nil
}
func (p *PreFilterResult) Merge(in *PreFilterResult) *PreFilterResult {
if p.AllNodes() && in.AllNodes() {
return nil
}
r := PreFilterResult{}
if p.AllNodes() {
r.NodeNames = sets.NewString(in.NodeNames.UnsortedList()...)
return &r
}
if in.AllNodes() {
r.NodeNames = sets.NewString(p.NodeNames.UnsortedList()...)
return &r
}
r.NodeNames = p.NodeNames.Intersection(in.NodeNames)
return &r
}
type NominatingMode int type NominatingMode int
const ( const (

View File

@ -22,6 +22,7 @@ import (
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/sets"
) )
var errorStatus = NewStatus(Error, "internal error") var errorStatus = NewStatus(Error, "internal error")
@ -139,6 +140,61 @@ func TestPluginToStatusMerge(t *testing.T) {
} }
} }
func TestPreFilterResultMerge(t *testing.T) {
tests := map[string]struct {
receiver *PreFilterResult
in *PreFilterResult
want *PreFilterResult
}{
"all nil": {},
"nil receiver empty input": {
in: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"empty receiver nil input": {
receiver: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"empty receiver empty input": {
receiver: &PreFilterResult{NodeNames: sets.NewString()},
in: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"nil receiver populated input": {
in: &PreFilterResult{NodeNames: sets.NewString("node1")},
want: &PreFilterResult{NodeNames: sets.NewString("node1")},
},
"empty receiver populated input": {
receiver: &PreFilterResult{NodeNames: sets.NewString()},
in: &PreFilterResult{NodeNames: sets.NewString("node1")},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"populated receiver nil input": {
receiver: &PreFilterResult{NodeNames: sets.NewString("node1")},
want: &PreFilterResult{NodeNames: sets.NewString("node1")},
},
"populated receiver empty input": {
receiver: &PreFilterResult{NodeNames: sets.NewString("node1")},
in: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"populated receiver and input": {
receiver: &PreFilterResult{NodeNames: sets.NewString("node1", "node2")},
in: &PreFilterResult{NodeNames: sets.NewString("node2", "node3")},
want: &PreFilterResult{NodeNames: sets.NewString("node2")},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
got := test.receiver.Merge(test.in)
if diff := cmp.Diff(test.want, got); diff != "" {
t.Errorf("unexpected diff (-want, +got):\n%s", diff)
}
})
}
}
func TestIsStatusEqual(t *testing.T) { func TestIsStatusEqual(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -128,8 +128,8 @@ func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return pl return pl
} }
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil return nil, nil
} }
func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
@ -368,7 +368,7 @@ func TestPostFilter(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
// Ensure <state> is populated. // Ensure <state> is populated.
if status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() { if _, status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status) t.Errorf("Unexpected PreFilter Status: %v", status)
} }
@ -1123,7 +1123,7 @@ func TestDryRunPreemption(t *testing.T) {
for cycle, pod := range tt.testPods { for cycle, pod := range tt.testPods {
state := framework.NewCycleState() state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState. // Some tests rely on PreFilter plugin to compute its CycleState.
if status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() { if _, status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() {
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status) t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
} }
pe := preemption.Evaluator{ pe := preemption.Evaluator{
@ -1348,7 +1348,7 @@ func TestSelectBestCandidate(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState. // Some tests rely on PreFilter plugin to compute its CycleState.
if status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() { if _, status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status) t.Errorf("Unexpected PreFilter Status: %v", status)
} }
nodeInfos, err := snapshot.NodeInfos().List() nodeInfos, err := snapshot.NodeInfos().List()
@ -1689,9 +1689,8 @@ func TestPreempt(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState. // Some tests rely on PreFilter plugin to compute its CycleState.
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod) if _, s := fwk.RunPreFilterPlugins(context.Background(), state, test.pod); !s.IsSuccess() {
if !preFilterStatus.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", s)
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
} }
// Call preempt and check the expected results. // Call preempt and check the expected results.
pl := DefaultPreemption{ pl := DefaultPreemption{

View File

@ -227,32 +227,32 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
} }
// PreFilter invoked at the prefilter extension point. // PreFilter invoked at the prefilter extension point.
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
var allNodes []*framework.NodeInfo var allNodes []*framework.NodeInfo
var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo
var err error var err error
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil { if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
return framework.AsStatus(fmt.Errorf("failed to list NodeInfos: %w", err)) return nil, framework.AsStatus(fmt.Errorf("failed to list NodeInfos: %w", err))
} }
if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil { if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil {
return framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err)) return nil, framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err))
} }
s := &preFilterState{} s := &preFilterState{}
s.podInfo = framework.NewPodInfo(pod) s.podInfo = framework.NewPodInfo(pod)
if s.podInfo.ParseError != nil { if s.podInfo.ParseError != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", s.podInfo.ParseError)) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", s.podInfo.ParseError))
} }
for i := range s.podInfo.RequiredAffinityTerms { for i := range s.podInfo.RequiredAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAffinityTerms[i]); err != nil { if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAffinityTerms[i]); err != nil {
return framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
} }
for i := range s.podInfo.RequiredAntiAffinityTerms { for i := range s.podInfo.RequiredAntiAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAntiAffinityTerms[i]); err != nil { if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAntiAffinityTerms[i]); err != nil {
return framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
} }
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister) s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
@ -261,7 +261,7 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes) s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes)
cycleState.Write(preFilterStateKey, s) cycleState.Write(preFilterStateKey, s)
return nil return nil, nil
} }
// PreFilterExtensions returns prefilter extensions, pod add and remove. // PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -997,7 +997,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node}) snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node})
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces) p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces)
state := framework.NewCycleState() state := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) { if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
@ -1866,7 +1866,7 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
}) })
for indexNode, node := range test.nodes { for indexNode, node := range test.nodes {
state := framework.NewCycleState() state := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
} }
@ -2158,7 +2158,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
defer cancel() defer cancel()
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil) p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil)
cycleState := framework.NewCycleState() cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
} }

View File

@ -83,10 +83,10 @@ func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEvent {
} }
// PreFilter builds and writes cycle state used by Filter. // PreFilter builds and writes cycle state used by Filter.
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)} state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
cycleState.Write(preFilterStateKey, state) cycleState.Write(preFilterStateKey, state)
return nil return nil, nil
} }
// PreFilterExtensions not necessary for this plugin as state doesn't depend on pod additions or deletions. // PreFilterExtensions not necessary for this plugin as state doesn't depend on pod additions or deletions.

View File

@ -855,7 +855,7 @@ func TestNodeAffinity(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
var gotStatus *framework.Status var gotStatus *framework.Status
if !test.disablePreFilter { if !test.disablePreFilter {
gotStatus = p.(framework.PreFilterPlugin).PreFilter(context.Background(), state, test.pod) _, gotStatus = p.(framework.PreFilterPlugin).PreFilter(context.Background(), state, test.pod)
if !gotStatus.IsSuccess() { if !gotStatus.IsSuccess() {
t.Errorf("unexpected error: %v", gotStatus) t.Errorf("unexpected error: %v", gotStatus)
} }

View File

@ -74,10 +74,10 @@ func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
} }
// PreFilter invoked at the prefilter extension point. // PreFilter invoked at the prefilter extension point.
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
s := getContainerPorts(pod) s := getContainerPorts(pod)
cycleState.Write(preFilterStateKey, preFilterState(s)) cycleState.Write(preFilterStateKey, preFilterState(s))
return nil return nil, nil
} }
// PreFilterExtensions do not exist for this plugin. // PreFilterExtensions do not exist for this plugin.

View File

@ -151,7 +151,7 @@ func TestNodePorts(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil) p, _ := New(nil, nil)
cycleState := framework.NewCycleState() cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
} }

View File

@ -178,9 +178,9 @@ func computePodResourceRequest(pod *v1.Pod, enablePodOverhead bool) *preFilterSt
} }
// PreFilter invoked at the prefilter extension point. // PreFilter invoked at the prefilter extension point.
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
cycleState.Write(preFilterStateKey, computePodResourceRequest(pod, f.enablePodOverhead)) cycleState.Write(preFilterStateKey, computePodResourceRequest(pod, f.enablePodOverhead))
return nil return nil, nil
} }
// PreFilterExtensions returns prefilter extensions, pod add and remove. // PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -477,7 +477,7 @@ func TestEnoughRequests(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cycleState := framework.NewCycleState() cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
} }
@ -555,7 +555,7 @@ func TestNotEnoughRequests(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cycleState := framework.NewCycleState() cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
} }
@ -627,7 +627,7 @@ func TestStorageRequests(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cycleState := framework.NewCycleState() cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() { if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus) t.Errorf("prefilter failed with status: %v", preFilterStatus)
} }

View File

@ -140,13 +140,13 @@ func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v
} }
// PreFilter invoked at the prefilter extension point. // PreFilter invoked at the prefilter extension point.
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
s, err := pl.calPreFilterState(ctx, pod) s, err := pl.calPreFilterState(ctx, pod)
if err != nil { if err != nil {
return framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
cycleState.Write(preFilterStateKey, s) cycleState.Write(preFilterStateKey, s)
return nil return nil, nil
} }
// PreFilterExtensions returns prefilter extensions, pod add and remove. // PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -520,7 +520,7 @@ func TestPreFilterState(t *testing.T) {
} }
p := plugintesting.SetupPluginWithInformers(ctx, t, New, args, cache.NewSnapshot(tt.existingPods, tt.nodes), tt.objs) p := plugintesting.SetupPluginWithInformers(ctx, t, New, args, cache.NewSnapshot(tt.existingPods, tt.nodes), tt.objs)
cs := framework.NewCycleState() cs := framework.NewCycleState()
if s := p.(*PodTopologySpread).PreFilter(ctx, cs, tt.pod); !s.IsSuccess() { if _, s := p.(*PodTopologySpread).PreFilter(ctx, cs, tt.pod); !s.IsSuccess() {
t.Fatal(s.AsError()) t.Fatal(s.AsError())
} }
got, err := getPreFilterState(cs) got, err := getPreFilterState(cs)
@ -826,7 +826,7 @@ func TestPreFilterStateAddPod(t *testing.T) {
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
cs := framework.NewCycleState() cs := framework.NewCycleState()
if s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() { if _, s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
t.Fatal(s.AsError()) t.Fatal(s.AsError())
} }
nodeInfo, err := snapshot.Get(tt.nodes[tt.nodeIdx].Name) nodeInfo, err := snapshot.Get(tt.nodes[tt.nodeIdx].Name)
@ -1030,8 +1030,7 @@ func TestPreFilterStateRemovePod(t *testing.T) {
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
cs := framework.NewCycleState() cs := framework.NewCycleState()
s := p.PreFilter(ctx, cs, tt.preemptor) if _, s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
if !s.IsSuccess() {
t.Fatal(s.AsError()) t.Fatal(s.AsError())
} }
@ -1106,8 +1105,7 @@ func BenchmarkFilter(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
state = framework.NewCycleState() state = framework.NewCycleState()
s := p.PreFilter(ctx, state, tt.pod) if _, s := p.PreFilter(ctx, state, tt.pod); !s.IsSuccess() {
if !s.IsSuccess() {
b.Fatal(s.AsError()) b.Fatal(s.AsError())
} }
filterNode := func(i int) { filterNode := func(i int) {
@ -1432,9 +1430,8 @@ func TestSingleConstraint(t *testing.T) {
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
state := framework.NewCycleState() state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, tt.pod) if _, s := p.PreFilter(context.Background(), state, tt.pod); !s.IsSuccess() {
if !preFilterStatus.IsSuccess() { t.Errorf("preFilter failed with status: %v", s)
t.Errorf("preFilter failed with status: %v", preFilterStatus)
} }
for _, node := range tt.nodes { for _, node := range tt.nodes {
@ -1659,9 +1656,8 @@ func TestMultipleConstraints(t *testing.T) {
pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot) pl := plugintesting.SetupPlugin(t, New, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread) p := pl.(*PodTopologySpread)
state := framework.NewCycleState() state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, tt.pod) if _, s := p.PreFilter(context.Background(), state, tt.pod); !s.IsSuccess() {
if !preFilterStatus.IsSuccess() { t.Errorf("preFilter failed with status: %v", s)
t.Errorf("preFilter failed with status: %v", preFilterStatus)
} }
for _, node := range tt.nodes { for _, node := range tt.nodes {

View File

@ -169,17 +169,17 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) {
// PreFilter invoked at the prefilter extension point to check if pod has all // PreFilter invoked at the prefilter extension point to check if pod has all
// immediate PVCs bound. If not all immediate PVCs are bound, an // immediate PVCs bound. If not all immediate PVCs are bound, an
// UnschedulableAndUnresolvable is returned. // UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// If pod does not reference any PVC, we don't need to do anything. // If pod does not reference any PVC, we don't need to do anything.
if hasPVC, err := pl.podHasPVCs(pod); err != nil { if hasPVC, err := pl.podHasPVCs(pod); err != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
} else if !hasPVC { } else if !hasPVC {
state.Write(stateKey, &stateData{skip: true}) state.Write(stateKey, &stateData{skip: true})
return nil return nil, nil
} }
boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod) boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod)
if err != nil { if err != nil {
return framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
if len(unboundClaimsImmediate) > 0 { if len(unboundClaimsImmediate) > 0 {
// Return UnschedulableAndUnresolvable error if immediate claims are // Return UnschedulableAndUnresolvable error if immediate claims are
@ -187,10 +187,10 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
// claims are bound by PV controller. // claims are bound by PV controller.
status := framework.NewStatus(framework.UnschedulableAndUnresolvable) status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
status.AppendReason("pod has unbound immediate PersistentVolumeClaims") status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return status return nil, status
} }
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*PodVolumes)}) state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*PodVolumes)})
return nil return nil, nil
} }
// PreFilterExtensions returns prefilter extensions, pod add and remove. // PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -653,7 +653,7 @@ func TestVolumeBinding(t *testing.T) {
state := framework.NewCycleState() state := framework.NewCycleState()
t.Logf("Verify: call PreFilter and check status") t.Logf("Verify: call PreFilter and check status")
gotPreFilterStatus := p.PreFilter(ctx, state, item.pod) _, gotPreFilterStatus := p.PreFilter(ctx, state, item.pod)
if !reflect.DeepEqual(gotPreFilterStatus, item.wantPreFilterStatus) { if !reflect.DeepEqual(gotPreFilterStatus, item.wantPreFilterStatus) {
t.Errorf("filter prefilter status does not match: %v, want: %v", gotPreFilterStatus, item.wantPreFilterStatus) t.Errorf("filter prefilter status does not match: %v, want: %v", gotPreFilterStatus, item.wantPreFilterStatus)
} }

View File

@ -120,11 +120,11 @@ func haveOverlap(a1, a2 []string) bool {
return false return false
} }
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if pl.enableReadWriteOncePod { if pl.enableReadWriteOncePod {
return pl.isReadWriteOncePodAccessModeConflict(ctx, pod) return nil, pl.isReadWriteOncePodAccessModeConflict(ctx, pod)
} }
return framework.NewStatus(framework.Success) return nil, framework.NewStatus(framework.Success)
} }
// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode. // isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.

View File

@ -366,7 +366,7 @@ func TestAccessModeConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod) _, gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
if !reflect.DeepEqual(gotStatus, test.wantStatus) { if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus) t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus)
} }

View File

@ -561,33 +561,46 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
// *Status and its code is set to non-success if any of the plugins returns // *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling // anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted. // cycle is aborted.
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) { func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}() }()
var result *framework.PreFilterResult
var pluginsWithNodes []string
for _, pl := range f.preFilterPlugins { for _, pl := range f.preFilterPlugins {
status = f.runPreFilterPlugin(ctx, pl, state, pod) r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() { if !s.IsSuccess() {
status.SetFailedPlugin(pl.Name()) s.SetFailedPlugin(pl.Name())
if status.IsUnschedulable() { if s.IsUnschedulable() {
return status return nil, s
} }
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name()) return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name())
}
if !r.AllNodes() {
pluginsWithNodes = append(pluginsWithNodes, pl.Name())
}
result = result.Merge(r)
if !result.AllNodes() && len(result.NodeNames) == 0 {
msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes)
if len(pluginsWithNodes) == 1 {
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
}
return nil, framework.NewStatus(framework.Unschedulable, msg)
} }
}
return nil }
return result, nil
} }
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) *framework.Status { func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if !state.ShouldRecordPluginMetrics() { if !state.ShouldRecordPluginMetrics() {
return pl.PreFilter(ctx, state, pod) return pl.PreFilter(ctx, state, pod)
} }
startTime := time.Now() startTime := time.Now()
status := pl.PreFilter(ctx, state, pod) result, status := pl.PreFilter(ctx, state, pod)
f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime)) f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status return result, status
} }
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured

View File

@ -171,8 +171,8 @@ func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil return nil
} }
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), "injected status") return nil, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), "injected status")
} }
func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions { func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
@ -222,9 +222,9 @@ func (pl *TestPreFilterPlugin) Name() string {
return preFilterPluginName return preFilterPluginName
} }
func (pl *TestPreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { func (pl *TestPreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
pl.PreFilterCalled++ pl.PreFilterCalled++
return nil return nil, nil
} }
func (pl *TestPreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions { func (pl *TestPreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
@ -242,9 +242,9 @@ func (pl *TestPreFilterWithExtensionsPlugin) Name() string {
return preFilterWithExtensionsPluginName return preFilterWithExtensionsPluginName
} }
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
pl.PreFilterCalled++ pl.PreFilterCalled++
return nil return nil, nil
} }
func (pl *TestPreFilterWithExtensionsPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, func (pl *TestPreFilterWithExtensionsPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
@ -270,8 +270,8 @@ func (dp *TestDuplicatePlugin) Name() string {
return duplicatePluginName return duplicatePluginName
} }
func (dp *TestDuplicatePlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { func (dp *TestDuplicatePlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil return nil, nil
} }
func (dp *TestDuplicatePlugin) PreFilterExtensions() framework.PreFilterExtensions { func (dp *TestDuplicatePlugin) PreFilterExtensions() framework.PreFilterExtensions {

View File

@ -140,14 +140,14 @@ type schedulerOptions struct {
// Option configures a Scheduler // Option configures a Scheduler
type Option func(*schedulerOptions) type Option func(*schedulerOptions)
// ScheduleResult represents the result of one pod scheduled. It will contain // ScheduleResult represents the result of scheduling a pod.
// the final selected Node, along with the selected intermediate information.
type ScheduleResult struct { type ScheduleResult struct {
// Name of the scheduler suggest host // Name of the selected node.
SuggestedHost string SuggestedHost string
// Number of nodes scheduler evaluated on one pod scheduled // The number of nodes the scheduler evaluated the pod against in the filtering
// phase and beyond.
EvaluatedNodes int EvaluatedNodes int
// Number of feasible nodes on one pod scheduled // The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int FeasibleNodes int
} }
@ -900,7 +900,7 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
} }
// Run "prefilter" plugins. // Run "prefilter" plugins.
s := fwk.RunPreFilterPlugins(ctx, state, pod) preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List() allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil { if err != nil {
return nil, diagnosis, err return nil, diagnosis, err
@ -915,7 +915,9 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
diagnosis.NodeToStatusMap[n.Node().Name] = s diagnosis.NodeToStatusMap[n.Node().Name] = s
} }
// Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins. // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin()) if s.FailedPlugin() != "" {
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
}
return nil, diagnosis, nil return nil, diagnosis, nil
} }
@ -931,7 +933,19 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
return feasibleNodes, diagnosis, nil return feasibleNodes, diagnosis, nil
} }
} }
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for n := range preRes.NodeNames {
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
if err != nil {
return nil, diagnosis, err
}
nodes = append(nodes, nInfo)
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
if err != nil { if err != nil {
return nil, diagnosis, err return nil, diagnosis, err
} }

View File

@ -64,6 +64,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/pointer"
) )
func podWithID(id, desiredHost string) *v1.Pod { func podWithID(id, desiredHost string) *v1.Pod {
@ -1792,14 +1793,15 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
func TestSchedulerSchedulePod(t *testing.T) { func TestSchedulerSchedulePod(t *testing.T) {
fts := feature.Features{} fts := feature.Features{}
tests := []struct { tests := []struct {
name string name string
registerPlugins []st.RegisterPluginFunc registerPlugins []st.RegisterPluginFunc
nodes []string nodes []string
pvcs []v1.PersistentVolumeClaim pvcs []v1.PersistentVolumeClaim
pod *v1.Pod pod *v1.Pod
pods []*v1.Pod pods []*v1.Pod
expectedHosts sets.String wantNodes sets.String
wErr error wantEvaluatedNodes *int32
wErr error
}{ }{
{ {
registerPlugins: []st.RegisterPluginFunc{ registerPlugins: []st.RegisterPluginFunc{
@ -1828,11 +1830,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("machine1", "machine2"), wantNodes: sets.NewString("machine1", "machine2"),
name: "test 2", name: "test 2",
wErr: nil, wErr: nil,
}, },
{ {
// Fits on a machine where the pod ID matches the machine name // Fits on a machine where the pod ID matches the machine name
@ -1841,11 +1843,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterFilterPlugin("MatchFilter", st.NewMatchFilterPlugin), st.RegisterFilterPlugin("MatchFilter", st.NewMatchFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine2", UID: types.UID("machine2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine2", UID: types.UID("machine2")}},
expectedHosts: sets.NewString("machine2"), wantNodes: sets.NewString("machine2"),
name: "test 3", name: "test 3",
wErr: nil, wErr: nil,
}, },
{ {
registerPlugins: []st.RegisterPluginFunc{ registerPlugins: []st.RegisterPluginFunc{
@ -1854,11 +1856,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("3"), wantNodes: sets.NewString("3"),
name: "test 4", name: "test 4",
wErr: nil, wErr: nil,
}, },
{ {
registerPlugins: []st.RegisterPluginFunc{ registerPlugins: []st.RegisterPluginFunc{
@ -1867,11 +1869,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("2"), wantNodes: sets.NewString("2"),
name: "test 5", name: "test 5",
wErr: nil, wErr: nil,
}, },
{ {
registerPlugins: []st.RegisterPluginFunc{ registerPlugins: []st.RegisterPluginFunc{
@ -1881,11 +1883,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2), st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("1"), wantNodes: sets.NewString("1"),
name: "test 6", name: "test 6",
wErr: nil, wErr: nil,
}, },
{ {
registerPlugins: []st.RegisterPluginFunc{ registerPlugins: []st.RegisterPluginFunc{
@ -1974,9 +1976,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
}, },
}, },
}, },
expectedHosts: sets.NewString("machine1", "machine2"), wantNodes: sets.NewString("machine1", "machine2"),
name: "existing PVC", name: "existing PVC",
wErr: nil, wErr: nil,
}, },
{ {
// Pod with non existing PVC // Pod with non existing PVC
@ -2134,8 +2136,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
}, },
}, },
}, },
expectedHosts: sets.NewString("machine2"), wantNodes: sets.NewString("machine2"),
wErr: nil, wErr: nil,
}, },
{ {
name: "test podtopologyspread plugin - 3 nodes with maxskew=2", name: "test podtopologyspread plugin - 3 nodes with maxskew=2",
@ -2199,8 +2201,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
}, },
}, },
}, },
expectedHosts: sets.NewString("machine2", "machine3"), wantNodes: sets.NewString("machine2", "machine3"),
wErr: nil, wErr: nil,
}, },
{ {
name: "test with filter plugin returning Unschedulable status", name: "test with filter plugin returning Unschedulable status",
@ -2213,9 +2215,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"3"}, nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, wantNodes: nil,
wErr: &framework.FitError{ wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1, NumAllNodes: 1,
@ -2238,9 +2240,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"3"}, nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, wantNodes: nil,
wErr: &framework.FitError{ wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1, NumAllNodes: 1,
@ -2263,10 +2265,10 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1), st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, wantNodes: nil,
wErr: nil, wErr: nil,
}, },
{ {
name: "test prefilter plugin returning Unschedulable status", name: "test prefilter plugin returning Unschedulable status",
@ -2274,13 +2276,13 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin( st.RegisterPreFilterPlugin(
"FakePreFilter", "FakePreFilter",
st.NewFakePreFilterPlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")), st.NewFakePreFilterPlugin("FakePreFilter", nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
), ),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil, wantNodes: nil,
wErr: &framework.FitError{ wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 2, NumAllNodes: 2,
@ -2299,14 +2301,97 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin( st.RegisterPreFilterPlugin(
"FakePreFilter", "FakePreFilter",
st.NewFakePreFilterPlugin(framework.NewStatus(framework.Error, "injected error status")), st.NewFakePreFilterPlugin("FakePreFilter", nil, framework.NewStatus(framework.Error, "injected error status")),
), ),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}, },
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil, wantNodes: nil,
wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")), wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")),
},
{
name: "test prefilter plugin returning node",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter2",
st.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.NewString("node2")}, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter3",
st.NewFakePreFilterPlugin("FakePreFilter3", &framework.PreFilterResult{NodeNames: sets.NewString("node1", "node2")}, nil),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wantNodes: sets.NewString("node2"),
wantEvaluatedNodes: pointer.Int32Ptr(1),
},
{
name: "test prefilter plugin returning non-intersecting nodes",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter2",
st.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.NewString("node2")}, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter3",
st.NewFakePreFilterPlugin("FakePreFilter3", &framework.PreFilterResult{NodeNames: sets.NewString("node1")}, nil),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node2": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node3": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
},
UnschedulablePlugins: sets.String{},
},
},
},
{
name: "test prefilter plugin returning empty node set",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter2",
st.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.NewString()}, nil),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin FakePreFilter2"),
},
UnschedulablePlugins: sets.String{},
},
},
}, },
} }
for _, test := range tests { for _, test := range tests {
@ -2371,11 +2456,15 @@ func TestSchedulerSchedulePod(t *testing.T) {
} }
} }
} }
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) { if test.wantNodes != nil && !test.wantNodes.Has(result.SuggestedHost) {
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost) t.Errorf("Expected: %s, got: %s", test.wantNodes, result.SuggestedHost)
} }
if test.wErr == nil && len(test.nodes) != result.EvaluatedNodes { wantEvaluatedNodes := len(test.nodes)
t.Errorf("Expected EvaluatedNodes: %d, got: %d", len(test.nodes), result.EvaluatedNodes) if test.wantEvaluatedNodes != nil {
wantEvaluatedNodes = int(*test.wantEvaluatedNodes)
}
if test.wErr == nil && wantEvaluatedNodes != result.EvaluatedNodes {
t.Errorf("Expected EvaluatedNodes: %d, got: %d", wantEvaluatedNodes, result.EvaluatedNodes)
} }
}) })
} }

View File

@ -22,7 +22,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
v1 "k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -127,17 +127,19 @@ func NewMatchFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugi
// FakePreFilterPlugin is a test filter plugin. // FakePreFilterPlugin is a test filter plugin.
type FakePreFilterPlugin struct { type FakePreFilterPlugin struct {
Result *framework.PreFilterResult
Status *framework.Status Status *framework.Status
name string
} }
// Name returns name of the plugin. // Name returns name of the plugin.
func (pl *FakePreFilterPlugin) Name() string { func (pl *FakePreFilterPlugin) Name() string {
return "FakePreFilter" return pl.name
} }
// PreFilter invoked at the PreFilter extension point. // PreFilter invoked at the PreFilter extension point.
func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status { func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return pl.Status return pl.Result, pl.Status
} }
// PreFilterExtensions no extensions implemented by this plugin. // PreFilterExtensions no extensions implemented by this plugin.
@ -146,10 +148,12 @@ func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensio
} }
// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it. // NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it.
func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory { func NewFakePreFilterPlugin(name string, result *framework.PreFilterResult, status *framework.Status) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &FakePreFilterPlugin{ return &FakePreFilterPlugin{
Result: result,
Status: status, Status: status,
name: name,
}, nil }, nil
} }
} }

View File

@ -407,15 +407,15 @@ func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
} }
// PreFilter is a test function that returns (true, nil) or errors for testing. // PreFilter is a test function that returns (true, nil) or errors for testing.
func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
pp.numPreFilterCalled++ pp.numPreFilterCalled++
if pp.failPreFilter { if pp.failPreFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
} }
if pp.rejectPreFilter { if pp.rejectPreFilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
} }
return nil return nil, nil
} }
// reset used to reset prefilter plugin. // reset used to reset prefilter plugin.
@ -2288,16 +2288,16 @@ func (j *JobPlugin) Name() string {
return jobPluginName return jobPluginName
} }
func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status { func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""}) labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""})
driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector) driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector)
if err != nil { if err != nil {
return framework.AsStatus(err) return nil, framework.AsStatus(err)
} }
if len(driverPods) == 0 { if len(driverPods) == 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod") return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod")
} }
return nil return nil, nil
} }
func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions { func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions {

View File

@ -99,8 +99,8 @@ func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState,
return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name)) return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
} }
func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil return nil, nil
} }
func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,