Merge pull request #108648 from ahg-g/ahg-ds

Update PreFilter interface to return a PreFilterResult
This commit is contained in:
Kubernetes Prow Robot 2022-03-14 18:50:20 -07:00 committed by GitHub
commit d7bd0d4f52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 527 additions and 198 deletions

View File

@ -30,6 +30,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
@ -337,8 +338,10 @@ type PreFilterExtensions interface {
type PreFilterPlugin interface {
Plugin
// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
// plugins must return success or the pod will be rejected.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status
// plugins must return success or the pod will be rejected. PreFilter could optionally
// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
// modify its pre-processed info. The framework guarantees that the extensions
@ -498,7 +501,9 @@ type Framework interface {
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
// It also returns a PreFilterResult, which may influence what or how many nodes to
// evaluate downstream.
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
// RunPostFilterPlugins runs the set of configured PostFilter plugins.
// PostFilter plugins can either be informational, in which case should be configured
@ -608,6 +613,36 @@ type Handle interface {
Parallelizer() parallelize.Parallelizer
}
// PreFilterResult wraps needed info for scheduler framework to act upon PreFilter phase.
type PreFilterResult struct {
// The set of nodes that should be considered downstream; if nil then
// all nodes are eligible.
NodeNames sets.String
}
func (p *PreFilterResult) AllNodes() bool {
return p == nil || p.NodeNames == nil
}
func (p *PreFilterResult) Merge(in *PreFilterResult) *PreFilterResult {
if p.AllNodes() && in.AllNodes() {
return nil
}
r := PreFilterResult{}
if p.AllNodes() {
r.NodeNames = sets.NewString(in.NodeNames.UnsortedList()...)
return &r
}
if in.AllNodes() {
r.NodeNames = sets.NewString(p.NodeNames.UnsortedList()...)
return &r
}
r.NodeNames = p.NodeNames.Intersection(in.NodeNames)
return &r
}
type NominatingMode int
const (

View File

@ -22,6 +22,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/sets"
)
var errorStatus = NewStatus(Error, "internal error")
@ -139,6 +140,61 @@ func TestPluginToStatusMerge(t *testing.T) {
}
}
func TestPreFilterResultMerge(t *testing.T) {
tests := map[string]struct {
receiver *PreFilterResult
in *PreFilterResult
want *PreFilterResult
}{
"all nil": {},
"nil receiver empty input": {
in: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"empty receiver nil input": {
receiver: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"empty receiver empty input": {
receiver: &PreFilterResult{NodeNames: sets.NewString()},
in: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"nil receiver populated input": {
in: &PreFilterResult{NodeNames: sets.NewString("node1")},
want: &PreFilterResult{NodeNames: sets.NewString("node1")},
},
"empty receiver populated input": {
receiver: &PreFilterResult{NodeNames: sets.NewString()},
in: &PreFilterResult{NodeNames: sets.NewString("node1")},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"populated receiver nil input": {
receiver: &PreFilterResult{NodeNames: sets.NewString("node1")},
want: &PreFilterResult{NodeNames: sets.NewString("node1")},
},
"populated receiver empty input": {
receiver: &PreFilterResult{NodeNames: sets.NewString("node1")},
in: &PreFilterResult{NodeNames: sets.NewString()},
want: &PreFilterResult{NodeNames: sets.NewString()},
},
"populated receiver and input": {
receiver: &PreFilterResult{NodeNames: sets.NewString("node1", "node2")},
in: &PreFilterResult{NodeNames: sets.NewString("node2", "node3")},
want: &PreFilterResult{NodeNames: sets.NewString("node2")},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
got := test.receiver.Merge(test.in)
if diff := cmp.Diff(test.want, got); diff != "" {
t.Errorf("unexpected diff (-want, +got):\n%s", diff)
}
})
}
}
func TestIsStatusEqual(t *testing.T) {
tests := []struct {
name string

View File

@ -128,8 +128,8 @@ func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return pl
}
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
return nil
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, nil
}
func (pl *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
@ -368,7 +368,7 @@ func TestPostFilter(t *testing.T) {
state := framework.NewCycleState()
// Ensure <state> is populated.
if status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
if _, status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
@ -1123,7 +1123,7 @@ func TestDryRunPreemption(t *testing.T) {
for cycle, pod := range tt.testPods {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() {
if _, status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() {
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
}
pe := preemption.Evaluator{
@ -1348,7 +1348,7 @@ func TestSelectBestCandidate(t *testing.T) {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
if _, status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
nodeInfos, err := snapshot.NodeInfos().List()
@ -1689,9 +1689,8 @@ func TestPreempt(t *testing.T) {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
if _, s := fwk.RunPreFilterPlugins(context.Background(), state, test.pod); !s.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", s)
}
// Call preempt and check the expected results.
pl := DefaultPreemption{

View File

@ -227,32 +227,32 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Co
}
// PreFilter invoked at the prefilter extension point.
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
var allNodes []*framework.NodeInfo
var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo
var err error
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
return framework.AsStatus(fmt.Errorf("failed to list NodeInfos: %w", err))
return nil, framework.AsStatus(fmt.Errorf("failed to list NodeInfos: %w", err))
}
if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil {
return framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err))
return nil, framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err))
}
s := &preFilterState{}
s.podInfo = framework.NewPodInfo(pod)
if s.podInfo.ParseError != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", s.podInfo.ParseError))
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", s.podInfo.ParseError))
}
for i := range s.podInfo.RequiredAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAffinityTerms[i]); err != nil {
return framework.AsStatus(err)
return nil, framework.AsStatus(err)
}
}
for i := range s.podInfo.RequiredAntiAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAntiAffinityTerms[i]); err != nil {
return framework.AsStatus(err)
return nil, framework.AsStatus(err)
}
}
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
@ -261,7 +261,7 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes)
cycleState.Write(preFilterStateKey, s)
return nil
return nil, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -997,7 +997,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node})
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces)
state := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() {
if !strings.Contains(preFilterStatus.Message(), test.wantStatus.Message()) {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
@ -1866,7 +1866,7 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
})
for indexNode, node := range test.nodes {
state := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
@ -2158,7 +2158,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
defer cancel()
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil)
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}

View File

@ -20,8 +20,10 @@ import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
@ -58,6 +60,9 @@ const (
// errReasonEnforced is the reason for added node affinity not matching.
errReasonEnforced = "node(s) didn't match scheduler-enforced node affinity"
// errReasonConflict is the reason for pod's conflicting affinity rules.
errReasonConflict = "pod affinity terms conflict"
)
// Name returns name of the plugin. It is used in logs, etc.
@ -83,10 +88,51 @@ func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEvent {
}
// PreFilter builds and writes cycle state used by Filter.
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
cycleState.Write(preFilterStateKey, state)
return nil
affinity := pod.Spec.Affinity
if affinity == nil ||
affinity.NodeAffinity == nil ||
affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil ||
len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 {
return nil, nil
}
// Check if there is affinity to a specific node and return it.
terms := affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
var nodeNames sets.String
for _, t := range terms {
var termNodeNames sets.String
for _, r := range t.MatchFields {
if r.Key == metav1.ObjectNameField && r.Operator == v1.NodeSelectorOpIn {
// The requirements represent ANDed constraints, and so we need to
// find the intersection of nodes.
s := sets.NewString(r.Values...)
if termNodeNames == nil {
termNodeNames = s
} else {
termNodeNames = termNodeNames.Intersection(s)
}
}
}
if termNodeNames == nil {
// If this term has no node.Name field affinity,
// then all nodes are eligible because the terms are ORed.
return nil, nil
}
// If the set is empty, it means the terms had affinity to different
// sets of nodes, and since they are ANDed, then the pod will not match any node.
if len(termNodeNames) == 0 {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonConflict)
}
nodeNames = nodeNames.Union(termNodeNames)
}
if nodeNames != nil {
return &framework.PreFilterResult{NodeNames: nodeNames}, nil
}
return nil, nil
}
// PreFilterExtensions not necessary for this plugin as state doesn't depend on pod additions or deletions.

View File

@ -18,12 +18,12 @@ package nodeaffinity
import (
"context"
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -33,13 +33,15 @@ import (
// TODO: Add test case for RequiredDuringSchedulingRequiredDuringExecution after it's implemented.
func TestNodeAffinity(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
labels map[string]string
nodeName string
wantStatus *framework.Status
args config.NodeAffinityArgs
disablePreFilter bool
name string
pod *v1.Pod
labels map[string]string
nodeName string
wantStatus *framework.Status
wantPreFilterStatus *framework.Status
wantPreFilterResult *framework.PreFilterResult
args config.NodeAffinityArgs
disablePreFilter bool
}{
{
name: "no selector",
@ -513,7 +515,7 @@ func TestNodeAffinity(t *testing.T) {
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_1"},
Values: []string{"node1"},
},
},
},
@ -523,7 +525,8 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_1",
nodeName: "node1",
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.NewString("node1")},
},
{
name: "Pod with matchFields using In operator that does not match the existing node",
@ -538,7 +541,7 @@ func TestNodeAffinity(t *testing.T) {
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_1"},
Values: []string{"node1"},
},
},
},
@ -548,8 +551,9 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_2",
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod),
nodeName: "node2",
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod),
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.NewString("node1")},
},
{
name: "Pod with two terms: matchFields does not match, but matchExpressions matches",
@ -564,7 +568,7 @@ func TestNodeAffinity(t *testing.T) {
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_1"},
Values: []string{"node1"},
},
},
},
@ -583,7 +587,7 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_2",
nodeName: "node2",
labels: map[string]string{"foo": "bar"},
},
{
@ -599,7 +603,7 @@ func TestNodeAffinity(t *testing.T) {
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_1"},
Values: []string{"node1"},
},
},
MatchExpressions: []v1.NodeSelectorRequirement{
@ -616,9 +620,10 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_2",
labels: map[string]string{"foo": "bar"},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod),
nodeName: "node2",
labels: map[string]string{"foo": "bar"},
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.NewString("node1")},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod),
},
{
name: "Pod with one term: both matchFields and matchExpressions match",
@ -633,7 +638,7 @@ func TestNodeAffinity(t *testing.T) {
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_1"},
Values: []string{"node1"},
},
},
MatchExpressions: []v1.NodeSelectorRequirement{
@ -650,8 +655,9 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_1",
labels: map[string]string{"foo": "bar"},
nodeName: "node1",
labels: map[string]string{"foo": "bar"},
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.NewString("node1")},
},
{
name: "Pod with two terms: both matchFields and matchExpressions do not match",
@ -666,7 +672,7 @@ func TestNodeAffinity(t *testing.T) {
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_1"},
Values: []string{"node1"},
},
},
},
@ -685,10 +691,78 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_2",
nodeName: "node2",
labels: map[string]string{"foo": "bar"},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod),
},
{
name: "Pod with two terms of node.Name affinity",
pod: &v1.Pod{
Spec: v1.PodSpec{
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchFields: []v1.NodeSelectorRequirement{
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node1"},
},
},
},
{
MatchFields: []v1.NodeSelectorRequirement{
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node2"},
},
},
},
},
},
},
},
},
},
nodeName: "node2",
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.NewString("node1", "node2")},
},
{
name: "Pod with two conflicting mach field requirements",
pod: &v1.Pod{
Spec: v1.PodSpec{
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchFields: []v1.NodeSelectorRequirement{
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node1"},
},
{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node2"},
},
},
},
},
},
},
},
},
},
nodeName: "node2",
labels: map[string]string{"foo": "bar"},
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonConflict),
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod),
},
{
name: "Matches added affinity and Pod's node affinity",
pod: &v1.Pod{
@ -712,7 +786,7 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_2",
nodeName: "node2",
labels: map[string]string{"zone": "foo"},
args: config.NodeAffinityArgs{
AddedAffinity: &v1.NodeAffinity{
@ -721,7 +795,7 @@ func TestNodeAffinity(t *testing.T) {
MatchFields: []v1.NodeSelectorRequirement{{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_2"},
Values: []string{"node2"},
}},
}},
},
@ -751,7 +825,7 @@ func TestNodeAffinity(t *testing.T) {
},
},
},
nodeName: "node_2",
nodeName: "node2",
labels: map[string]string{"zone": "foo"},
args: config.NodeAffinityArgs{
AddedAffinity: &v1.NodeAffinity{
@ -760,7 +834,7 @@ func TestNodeAffinity(t *testing.T) {
MatchFields: []v1.NodeSelectorRequirement{{
Key: metav1.ObjectNameField,
Operator: v1.NodeSelectorOpIn,
Values: []string{"node_2"},
Values: []string{"node2"},
}},
}},
},
@ -771,7 +845,7 @@ func TestNodeAffinity(t *testing.T) {
{
name: "Doesn't match added affinity",
pod: &v1.Pod{},
nodeName: "node_2",
nodeName: "node2",
labels: map[string]string{"zone": "foo"},
args: config.NodeAffinityArgs{
AddedAffinity: &v1.NodeAffinity{
@ -855,14 +929,17 @@ func TestNodeAffinity(t *testing.T) {
state := framework.NewCycleState()
var gotStatus *framework.Status
if !test.disablePreFilter {
gotStatus = p.(framework.PreFilterPlugin).PreFilter(context.Background(), state, test.pod)
if !gotStatus.IsSuccess() {
t.Errorf("unexpected error: %v", gotStatus)
gotPreFilterResult, gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), state, test.pod)
if diff := cmp.Diff(test.wantPreFilterStatus, gotStatus); diff != "" {
t.Errorf("unexpected PreFilter Status (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(test.wantPreFilterResult, gotPreFilterResult); diff != "" {
t.Errorf("unexpected PreFilterResult (-want,+got):\n%s", diff)
}
}
gotStatus = p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" {
t.Errorf("unexpected Filter Status (-want,+got):\n%s", diff)
}
})
}

View File

@ -74,10 +74,10 @@ func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
}
// PreFilter invoked at the prefilter extension point.
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
s := getContainerPorts(pod)
cycleState.Write(preFilterStateKey, preFilterState(s))
return nil
return nil, nil
}
// PreFilterExtensions do not exist for this plugin.

View File

@ -151,7 +151,7 @@ func TestNodePorts(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil)
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}

View File

@ -178,9 +178,9 @@ func computePodResourceRequest(pod *v1.Pod, enablePodOverhead bool) *preFilterSt
}
// PreFilter invoked at the prefilter extension point.
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
cycleState.Write(preFilterStateKey, computePodResourceRequest(pod, f.enablePodOverhead))
return nil
return nil, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -477,7 +477,7 @@ func TestEnoughRequests(t *testing.T) {
t.Fatal(err)
}
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
@ -555,7 +555,7 @@ func TestNotEnoughRequests(t *testing.T) {
t.Fatal(err)
}
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
@ -627,7 +627,7 @@ func TestStorageRequests(t *testing.T) {
t.Fatal(err)
}
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
_, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}

View File

@ -140,13 +140,13 @@ func (s *preFilterState) updateWithPod(updatedPod, preemptorPod *v1.Pod, node *v
}
// PreFilter invoked at the prefilter extension point.
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
s, err := pl.calPreFilterState(ctx, pod)
if err != nil {
return framework.AsStatus(err)
return nil, framework.AsStatus(err)
}
cycleState.Write(preFilterStateKey, s)
return nil
return nil, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -541,7 +541,7 @@ func TestPreFilterState(t *testing.T) {
}
p := plugintesting.SetupPluginWithInformers(ctx, t, topologySpreadFunc, args, cache.NewSnapshot(tt.existingPods, tt.nodes), tt.objs)
cs := framework.NewCycleState()
if s := p.(*PodTopologySpread).PreFilter(ctx, cs, tt.pod); !s.IsSuccess() {
if _, s := p.(*PodTopologySpread).PreFilter(ctx, cs, tt.pod); !s.IsSuccess() {
t.Fatal(s.AsError())
}
got, err := getPreFilterState(cs)
@ -850,7 +850,7 @@ func TestPreFilterStateAddPod(t *testing.T) {
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
cs := framework.NewCycleState()
if s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
if _, s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
t.Fatal(s.AsError())
}
nodeInfo, err := snapshot.Get(tt.nodes[tt.nodeIdx].Name)
@ -1055,8 +1055,7 @@ func TestPreFilterStateRemovePod(t *testing.T) {
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
cs := framework.NewCycleState()
s := p.PreFilter(ctx, cs, tt.preemptor)
if !s.IsSuccess() {
if _, s := p.PreFilter(ctx, cs, tt.preemptor); !s.IsSuccess() {
t.Fatal(s.AsError())
}
@ -1131,8 +1130,7 @@ func BenchmarkFilter(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
state = framework.NewCycleState()
s := p.PreFilter(ctx, state, tt.pod)
if !s.IsSuccess() {
if _, s := p.PreFilter(ctx, state, tt.pod); !s.IsSuccess() {
b.Fatal(s.AsError())
}
filterNode := func(i int) {
@ -1457,9 +1455,8 @@ func TestSingleConstraint(t *testing.T) {
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, tt.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("preFilter failed with status: %v", preFilterStatus)
if _, s := p.PreFilter(context.Background(), state, tt.pod); !s.IsSuccess() {
t.Errorf("preFilter failed with status: %v", s)
}
for _, node := range tt.nodes {
@ -1684,9 +1681,8 @@ func TestMultipleConstraints(t *testing.T) {
pl := plugintesting.SetupPlugin(t, topologySpreadFunc, &config.PodTopologySpreadArgs{DefaultingType: config.ListDefaulting}, snapshot)
p := pl.(*PodTopologySpread)
state := framework.NewCycleState()
preFilterStatus := p.PreFilter(context.Background(), state, tt.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("preFilter failed with status: %v", preFilterStatus)
if _, s := p.PreFilter(context.Background(), state, tt.pod); !s.IsSuccess() {
t.Errorf("preFilter failed with status: %v", s)
}
for _, node := range tt.nodes {

View File

@ -169,17 +169,17 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) {
// PreFilter invoked at the prefilter extension point to check if pod has all
// immediate PVCs bound. If not all immediate PVCs are bound, an
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// If pod does not reference any PVC, we don't need to do anything.
if hasPVC, err := pl.podHasPVCs(pod); err != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
} else if !hasPVC {
state.Write(stateKey, &stateData{skip: true})
return nil
return nil, nil
}
boundClaims, claimsToBind, unboundClaimsImmediate, err := pl.Binder.GetPodVolumes(pod)
if err != nil {
return framework.AsStatus(err)
return nil, framework.AsStatus(err)
}
if len(unboundClaimsImmediate) > 0 {
// Return UnschedulableAndUnresolvable error if immediate claims are
@ -187,10 +187,10 @@ func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleSt
// claims are bound by PV controller.
status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
return status
return nil, status
}
state.Write(stateKey, &stateData{boundClaims: boundClaims, claimsToBind: claimsToBind, podVolumesByNode: make(map[string]*PodVolumes)})
return nil
return nil, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@ -653,7 +653,7 @@ func TestVolumeBinding(t *testing.T) {
state := framework.NewCycleState()
t.Logf("Verify: call PreFilter and check status")
gotPreFilterStatus := p.PreFilter(ctx, state, item.pod)
_, gotPreFilterStatus := p.PreFilter(ctx, state, item.pod)
if !reflect.DeepEqual(gotPreFilterStatus, item.wantPreFilterStatus) {
t.Errorf("filter prefilter status does not match: %v, want: %v", gotPreFilterStatus, item.wantPreFilterStatus)
}

View File

@ -120,11 +120,11 @@ func haveOverlap(a1, a2 []string) bool {
return false
}
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if pl.enableReadWriteOncePod {
return pl.isReadWriteOncePodAccessModeConflict(ctx, pod)
return nil, pl.isReadWriteOncePodAccessModeConflict(ctx, pod)
}
return framework.NewStatus(framework.Success)
return nil, framework.NewStatus(framework.Success)
}
// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.

View File

@ -366,7 +366,7 @@ func TestAccessModeConflicts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := newPluginWithListers(ctx, t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod)
gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
_, gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus)
}

View File

@ -561,33 +561,46 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) {
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
var result *framework.PreFilterResult
var pluginsWithNodes []string
for _, pl := range f.preFilterPlugins {
status = f.runPreFilterPlugin(ctx, pl, state, pod)
if !status.IsSuccess() {
status.SetFailedPlugin(pl.Name())
if status.IsUnschedulable() {
return status
r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if !s.IsSuccess() {
s.SetFailedPlugin(pl.Name())
if s.IsUnschedulable() {
return nil, s
}
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name())
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name())
}
if !r.AllNodes() {
pluginsWithNodes = append(pluginsWithNodes, pl.Name())
}
result = result.Merge(r)
if !result.AllNodes() && len(result.NodeNames) == 0 {
msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes)
if len(pluginsWithNodes) == 1 {
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
}
return nil, framework.NewStatus(framework.Unschedulable, msg)
}
}
return nil
}
return result, nil
}
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) *framework.Status {
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if !state.ShouldRecordPluginMetrics() {
return pl.PreFilter(ctx, state, pod)
}
startTime := time.Now()
status := pl.PreFilter(ctx, state, pod)
result, status := pl.PreFilter(ctx, state, pod)
f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
return status
return result, status
}
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured

View File

@ -171,8 +171,8 @@ func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
return framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), "injected status")
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), "injected status")
}
func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
@ -222,9 +222,9 @@ func (pl *TestPreFilterPlugin) Name() string {
return preFilterPluginName
}
func (pl *TestPreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
func (pl *TestPreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
pl.PreFilterCalled++
return nil
return nil, nil
}
func (pl *TestPreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
@ -242,9 +242,9 @@ func (pl *TestPreFilterWithExtensionsPlugin) Name() string {
return preFilterWithExtensionsPluginName
}
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
func (pl *TestPreFilterWithExtensionsPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
pl.PreFilterCalled++
return nil
return nil, nil
}
func (pl *TestPreFilterWithExtensionsPlugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
@ -270,8 +270,8 @@ func (dp *TestDuplicatePlugin) Name() string {
return duplicatePluginName
}
func (dp *TestDuplicatePlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
return nil
func (dp *TestDuplicatePlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, nil
}
func (dp *TestDuplicatePlugin) PreFilterExtensions() framework.PreFilterExtensions {

View File

@ -140,14 +140,14 @@ type schedulerOptions struct {
// Option configures a Scheduler
type Option func(*schedulerOptions)
// ScheduleResult represents the result of one pod scheduled. It will contain
// the final selected Node, along with the selected intermediate information.
// ScheduleResult represents the result of scheduling a pod.
type ScheduleResult struct {
// Name of the scheduler suggest host
// Name of the selected node.
SuggestedHost string
// Number of nodes scheduler evaluated on one pod scheduled
// The number of nodes the scheduler evaluated the pod against in the filtering
// phase and beyond.
EvaluatedNodes int
// Number of feasible nodes on one pod scheduled
// The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int
}
@ -900,7 +900,7 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
}
// Run "prefilter" plugins.
s := fwk.RunPreFilterPlugins(ctx, state, pod)
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, diagnosis, err
@ -915,7 +915,9 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
diagnosis.NodeToStatusMap[n.Node().Name] = s
}
// Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
if s.FailedPlugin() != "" {
diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
}
return nil, diagnosis, nil
}
@ -931,7 +933,19 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
return feasibleNodes, diagnosis, nil
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for n := range preRes.NodeNames {
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
if err != nil {
return nil, diagnosis, err
}
nodes = append(nodes, nInfo)
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
if err != nil {
return nil, diagnosis, err
}

View File

@ -64,6 +64,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/pointer"
)
var podTopologySpreadFunc = frameworkruntime.FactoryAdapter(feature.Features{}, podtopologyspread.New)
@ -1794,14 +1795,15 @@ func TestFindNodesThatPassExtenders(t *testing.T) {
func TestSchedulerSchedulePod(t *testing.T) {
fts := feature.Features{}
tests := []struct {
name string
registerPlugins []st.RegisterPluginFunc
nodes []string
pvcs []v1.PersistentVolumeClaim
pod *v1.Pod
pods []*v1.Pod
expectedHosts sets.String
wErr error
name string
registerPlugins []st.RegisterPluginFunc
nodes []string
pvcs []v1.PersistentVolumeClaim
pod *v1.Pod
pods []*v1.Pod
wantNodes sets.String
wantEvaluatedNodes *int32
wErr error
}{
{
registerPlugins: []st.RegisterPluginFunc{
@ -1830,11 +1832,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("machine1", "machine2"),
name: "test 2",
wErr: nil,
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
wantNodes: sets.NewString("machine1", "machine2"),
name: "test 2",
wErr: nil,
},
{
// Fits on a machine where the pod ID matches the machine name
@ -1843,11 +1845,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterFilterPlugin("MatchFilter", st.NewMatchFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine2", UID: types.UID("machine2")}},
expectedHosts: sets.NewString("machine2"),
name: "test 3",
wErr: nil,
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine2", UID: types.UID("machine2")}},
wantNodes: sets.NewString("machine2"),
name: "test 3",
wErr: nil,
},
{
registerPlugins: []st.RegisterPluginFunc{
@ -1856,11 +1858,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("3"),
name: "test 4",
wErr: nil,
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
wantNodes: sets.NewString("3"),
name: "test 4",
wErr: nil,
},
{
registerPlugins: []st.RegisterPluginFunc{
@ -1869,11 +1871,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("2"),
name: "test 5",
wErr: nil,
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
wantNodes: sets.NewString("2"),
name: "test 5",
wErr: nil,
},
{
registerPlugins: []st.RegisterPluginFunc{
@ -1883,11 +1885,11 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("1"),
name: "test 6",
wErr: nil,
nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
wantNodes: sets.NewString("1"),
name: "test 6",
wErr: nil,
},
{
registerPlugins: []st.RegisterPluginFunc{
@ -1976,9 +1978,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
},
},
},
expectedHosts: sets.NewString("machine1", "machine2"),
name: "existing PVC",
wErr: nil,
wantNodes: sets.NewString("machine1", "machine2"),
name: "existing PVC",
wErr: nil,
},
{
// Pod with non existing PVC
@ -2136,8 +2138,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
},
},
},
expectedHosts: sets.NewString("machine2"),
wErr: nil,
wantNodes: sets.NewString("machine2"),
wErr: nil,
},
{
name: "test podtopologyspread plugin - 3 nodes with maxskew=2",
@ -2201,8 +2203,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
},
},
},
expectedHosts: sets.NewString("machine2", "machine3"),
wErr: nil,
wantNodes: sets.NewString("machine2", "machine3"),
wErr: nil,
},
{
name: "test with filter plugin returning Unschedulable status",
@ -2215,9 +2217,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
wantNodes: nil,
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
@ -2240,9 +2242,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
wantNodes: nil,
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
@ -2265,10 +2267,10 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
wErr: nil,
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
wantNodes: nil,
wErr: nil,
},
{
name: "test prefilter plugin returning Unschedulable status",
@ -2276,13 +2278,13 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter",
st.NewFakePreFilterPlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
st.NewFakePreFilterPlugin("FakePreFilter", nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil,
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wantNodes: nil,
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 2,
@ -2301,14 +2303,97 @@ func TestSchedulerSchedulePod(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter",
st.NewFakePreFilterPlugin(framework.NewStatus(framework.Error, "injected error status")),
st.NewFakePreFilterPlugin("FakePreFilter", nil, framework.NewStatus(framework.Error, "injected error status")),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
expectedHosts: nil,
wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")),
nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wantNodes: nil,
wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")),
},
{
name: "test prefilter plugin returning node",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter2",
st.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.NewString("node2")}, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter3",
st.NewFakePreFilterPlugin("FakePreFilter3", &framework.PreFilterResult{NodeNames: sets.NewString("node1", "node2")}, nil),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wantNodes: sets.NewString("node2"),
wantEvaluatedNodes: pointer.Int32Ptr(1),
},
{
name: "test prefilter plugin returning non-intersecting nodes",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter2",
st.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.NewString("node2")}, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter3",
st.NewFakePreFilterPlugin("FakePreFilter3", &framework.PreFilterResult{NodeNames: sets.NewString("node1")}, nil),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2", "node3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node2": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node3": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
},
UnschedulablePlugins: sets.String{},
},
},
},
{
name: "test prefilter plugin returning empty node set",
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(
"FakePreFilter1",
st.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
),
st.RegisterPreFilterPlugin(
"FakePreFilter2",
st.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.NewString()}, nil),
),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
wErr: &framework.FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin FakePreFilter2"),
},
UnschedulablePlugins: sets.String{},
},
},
},
}
for _, test := range tests {
@ -2373,11 +2458,15 @@ func TestSchedulerSchedulePod(t *testing.T) {
}
}
}
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)
if test.wantNodes != nil && !test.wantNodes.Has(result.SuggestedHost) {
t.Errorf("Expected: %s, got: %s", test.wantNodes, result.SuggestedHost)
}
if test.wErr == nil && len(test.nodes) != result.EvaluatedNodes {
t.Errorf("Expected EvaluatedNodes: %d, got: %d", len(test.nodes), result.EvaluatedNodes)
wantEvaluatedNodes := len(test.nodes)
if test.wantEvaluatedNodes != nil {
wantEvaluatedNodes = int(*test.wantEvaluatedNodes)
}
if test.wErr == nil && wantEvaluatedNodes != result.EvaluatedNodes {
t.Errorf("Expected EvaluatedNodes: %d, got: %d", wantEvaluatedNodes, result.EvaluatedNodes)
}
})
}

View File

@ -22,7 +22,7 @@ import (
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -127,17 +127,19 @@ func NewMatchFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugi
// FakePreFilterPlugin is a test filter plugin.
type FakePreFilterPlugin struct {
Result *framework.PreFilterResult
Status *framework.Status
name string
}
// Name returns name of the plugin.
func (pl *FakePreFilterPlugin) Name() string {
return "FakePreFilter"
return pl.name
}
// PreFilter invoked at the PreFilter extension point.
func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status {
return pl.Status
func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return pl.Result, pl.Status
}
// PreFilterExtensions no extensions implemented by this plugin.
@ -146,10 +148,12 @@ func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensio
}
// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it.
func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory {
func NewFakePreFilterPlugin(name string, result *framework.PreFilterResult, status *framework.Status) frameworkruntime.PluginFactory {
return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &FakePreFilterPlugin{
Result: result,
Status: status,
name: name,
}, nil
}
}

View File

@ -407,15 +407,15 @@ func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
}
// PreFilter is a test function that returns (true, nil) or errors for testing.
func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
pp.numPreFilterCalled++
if pp.failPreFilter {
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}
if pp.rejectPreFilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
return nil
return nil, nil
}
// reset used to reset prefilter plugin.
@ -2288,16 +2288,16 @@ func (j *JobPlugin) Name() string {
return jobPluginName
}
func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status {
func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""})
driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector)
if err != nil {
return framework.AsStatus(err)
return nil, framework.AsStatus(err)
}
if len(driverPods) == 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod")
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod")
}
return nil
return nil, nil
}
func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions {

View File

@ -99,8 +99,8 @@ func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState,
return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name))
}
func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
return nil
func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, nil
}
func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,