mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Merge pull request #119779 from sanposhiho/prefilterexec
run all PreFilter when the preemption will happen later in the same scheduling cycle
This commit is contained in:
commit
8609b8c22c
@ -400,6 +400,9 @@ type PreFilterPlugin interface {
|
||||
// plugins must return success or the pod will be rejected. PreFilter could optionally
|
||||
// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
|
||||
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
|
||||
// When PreFilterResult filters out some Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
|
||||
// i.e., those Nodes will be out of the candidates of the preemption.
|
||||
//
|
||||
// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
|
||||
// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
|
||||
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
|
||||
|
@ -670,6 +670,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
if verboseLogs {
|
||||
logger = klog.LoggerWithName(logger, "PreFilter")
|
||||
}
|
||||
var returnStatus *framework.Status
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
ctx := ctx
|
||||
if verboseLogs {
|
||||
@ -683,9 +684,18 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
}
|
||||
if !s.IsSuccess() {
|
||||
s.SetPlugin(pl.Name())
|
||||
if s.IsRejected() {
|
||||
if s.Code() == framework.UnschedulableAndUnresolvable {
|
||||
// In this case, the preemption shouldn't happen in this scheduling cycle.
|
||||
// So, no need to execute all PreFilter.
|
||||
return nil, s
|
||||
}
|
||||
if s.Code() == framework.Unschedulable {
|
||||
// In this case, the preemption should happen later in this scheduling cycle.
|
||||
// So we need to execute all PreFilter.
|
||||
// https://github.com/kubernetes/kubernetes/issues/119770
|
||||
returnStatus = s
|
||||
continue
|
||||
}
|
||||
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name())
|
||||
}
|
||||
if !r.AllNodes() {
|
||||
@ -697,10 +707,12 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
|
||||
if len(pluginsWithNodes) == 1 {
|
||||
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
|
||||
}
|
||||
return nil, framework.NewStatus(framework.Unschedulable, msg)
|
||||
|
||||
// When PreFilterResult filters out Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
|
||||
return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
return result, returnStatus
|
||||
}
|
||||
|
||||
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
||||
|
@ -197,7 +197,7 @@ func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
||||
return nil, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), injectReason)
|
||||
return pl.inj.PreFilterResult, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), injectReason)
|
||||
}
|
||||
|
||||
func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
@ -1622,6 +1622,56 @@ func TestRunPreFilterPlugins(t *testing.T) {
|
||||
wantSkippedPlugins: sets.New("skip1", "skip2"),
|
||||
wantStatusCode: framework.Success,
|
||||
},
|
||||
{
|
||||
name: "one PreFilter plugin returned Unschedulable, but another PreFilter plugin should be executed",
|
||||
plugins: []*TestPlugin{
|
||||
{
|
||||
name: "unschedulable",
|
||||
inj: injectedResult{PreFilterStatus: int(framework.Unschedulable)},
|
||||
},
|
||||
{
|
||||
// to make sure this plugin is executed, this plugin return Skip and we confirm it via wantSkippedPlugins.
|
||||
name: "skip",
|
||||
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
|
||||
},
|
||||
},
|
||||
wantPreFilterResult: nil,
|
||||
wantSkippedPlugins: sets.New("skip"),
|
||||
wantStatusCode: framework.Unschedulable,
|
||||
},
|
||||
{
|
||||
name: "one PreFilter plugin returned UnschedulableAndUnresolvable, and all other plugins aren't executed",
|
||||
plugins: []*TestPlugin{
|
||||
{
|
||||
name: "unresolvable",
|
||||
inj: injectedResult{PreFilterStatus: int(framework.UnschedulableAndUnresolvable)},
|
||||
},
|
||||
{
|
||||
// to make sure this plugin is not executed, this plugin return Skip and we confirm it via wantSkippedPlugins.
|
||||
name: "skip",
|
||||
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
|
||||
},
|
||||
},
|
||||
wantPreFilterResult: nil,
|
||||
wantStatusCode: framework.UnschedulableAndUnresolvable,
|
||||
},
|
||||
{
|
||||
name: "all nodes are filtered out by prefilter result, but other plugins aren't executed because we consider all nodes are filtered out by UnschedulableAndUnresolvable",
|
||||
plugins: []*TestPlugin{
|
||||
{
|
||||
name: "reject-all-nodes",
|
||||
inj: injectedResult{PreFilterResult: &framework.PreFilterResult{NodeNames: sets.New[string]()}},
|
||||
},
|
||||
{
|
||||
// to make sure this plugin is not executed, this plugin return Skip and we confirm it via wantSkippedPlugins.
|
||||
name: "skip",
|
||||
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
|
||||
},
|
||||
},
|
||||
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.New[string]()},
|
||||
wantSkippedPlugins: sets.New[string](), // "skip" plugin isn't executed.
|
||||
wantStatusCode: framework.UnschedulableAndUnresolvable,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
@ -3224,20 +3274,21 @@ func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config
|
||||
}
|
||||
|
||||
type injectedResult struct {
|
||||
ScoreRes int64 `json:"scoreRes,omitempty"`
|
||||
NormalizeRes int64 `json:"normalizeRes,omitempty"`
|
||||
ScoreStatus int `json:"scoreStatus,omitempty"`
|
||||
NormalizeStatus int `json:"normalizeStatus,omitempty"`
|
||||
PreFilterStatus int `json:"preFilterStatus,omitempty"`
|
||||
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
|
||||
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
|
||||
FilterStatus int `json:"filterStatus,omitempty"`
|
||||
PostFilterStatus int `json:"postFilterStatus,omitempty"`
|
||||
PreScoreStatus int `json:"preScoreStatus,omitempty"`
|
||||
ReserveStatus int `json:"reserveStatus,omitempty"`
|
||||
PreBindStatus int `json:"preBindStatus,omitempty"`
|
||||
BindStatus int `json:"bindStatus,omitempty"`
|
||||
PermitStatus int `json:"permitStatus,omitempty"`
|
||||
ScoreRes int64 `json:"scoreRes,omitempty"`
|
||||
NormalizeRes int64 `json:"normalizeRes,omitempty"`
|
||||
ScoreStatus int `json:"scoreStatus,omitempty"`
|
||||
NormalizeStatus int `json:"normalizeStatus,omitempty"`
|
||||
PreFilterResult *framework.PreFilterResult `json:"preFilterResult,omitempty"`
|
||||
PreFilterStatus int `json:"preFilterStatus,omitempty"`
|
||||
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
|
||||
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
|
||||
FilterStatus int `json:"filterStatus,omitempty"`
|
||||
PostFilterStatus int `json:"postFilterStatus,omitempty"`
|
||||
PreScoreStatus int `json:"preScoreStatus,omitempty"`
|
||||
ReserveStatus int `json:"reserveStatus,omitempty"`
|
||||
PreBindStatus int `json:"preBindStatus,omitempty"`
|
||||
BindStatus int `json:"bindStatus,omitempty"`
|
||||
PermitStatus int `json:"permitStatus,omitempty"`
|
||||
}
|
||||
|
||||
func setScoreRes(inj injectedResult) (int64, *framework.Status) {
|
||||
|
@ -290,6 +290,9 @@ const ExtenderName = "Extender"
|
||||
|
||||
// Diagnosis records the details to diagnose a scheduling failure.
|
||||
type Diagnosis struct {
|
||||
// NodeToStatusMap records the status of each node
|
||||
// if they're rejected in PreFilter (via PreFilterResult) or Filter plugins.
|
||||
// Nodes that pass PreFilter/Filter plugins are not included in this map.
|
||||
NodeToStatusMap NodeToStatusMap
|
||||
// UnschedulablePlugins are plugins that returns Unschedulable or UnschedulableAndUnresolvable.
|
||||
UnschedulablePlugins sets.Set[string]
|
||||
|
@ -485,12 +485,14 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
|
||||
nodes := allNodes
|
||||
if !preRes.AllNodes() {
|
||||
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
|
||||
for n := range preRes.NodeNames {
|
||||
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
|
||||
if err != nil {
|
||||
return nil, diagnosis, err
|
||||
for _, n := range allNodes {
|
||||
if !preRes.NodeNames.Has(n.Node().Name) {
|
||||
// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
|
||||
// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
|
||||
diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
|
||||
continue
|
||||
}
|
||||
nodes = append(nodes, nInfo)
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
}
|
||||
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
|
||||
|
@ -2227,7 +2227,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
nodes: []string{"node1", "node2", "node3"},
|
||||
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
||||
wantNodes: sets.New("node2"),
|
||||
wantEvaluatedNodes: ptr.To[int32](1),
|
||||
wantEvaluatedNodes: ptr.To[int32](3),
|
||||
},
|
||||
{
|
||||
name: "test prefilter plugin returning non-intersecting nodes",
|
||||
@ -2254,9 +2254,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
NumAllNodes: 3,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
"node2": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
"node3": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
|
||||
},
|
||||
UnschedulablePlugins: sets.Set[string]{},
|
||||
PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
|
||||
@ -2284,13 +2284,42 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
NumAllNodes: 1,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin FakePreFilter2"),
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"),
|
||||
},
|
||||
UnschedulablePlugins: sets.Set[string]{},
|
||||
PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test some nodes are filtered out by prefilter plugin and other are filtered out by filter plugin",
|
||||
registerPlugins: []tf.RegisterPluginFunc{
|
||||
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
tf.RegisterPreFilterPlugin(
|
||||
"FakePreFilter",
|
||||
tf.NewFakePreFilterPlugin("FakePreFilter", &framework.PreFilterResult{NodeNames: sets.New[string]("node2")}, nil),
|
||||
),
|
||||
tf.RegisterFilterPlugin(
|
||||
"FakeFilter",
|
||||
tf.NewFakeFilterPlugin(map[string]framework.Code{"node2": framework.Unschedulable}),
|
||||
),
|
||||
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
nodes: []string{"node1", "node2"},
|
||||
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
||||
wErr: &framework.FitError{
|
||||
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
||||
NumAllNodes: 2,
|
||||
Diagnosis: framework.Diagnosis{
|
||||
NodeToStatusMap: framework.NodeToStatusMap{
|
||||
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result"),
|
||||
"node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
|
||||
},
|
||||
UnschedulablePlugins: sets.New("FakeFilter"),
|
||||
PreFilterMsg: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test prefilter plugin returning skip",
|
||||
registerPlugins: []tf.RegisterPluginFunc{
|
||||
@ -2356,7 +2385,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
wantEvaluatedNodes: ptr.To[int32](1),
|
||||
},
|
||||
{
|
||||
name: "test no score plugin, prefilter plugin returning 2 nodes, only 1 node is evaluated",
|
||||
name: "test no score plugin, prefilter plugin returning 2 nodes, only 1 node is evaluated in Filter",
|
||||
registerPlugins: []tf.RegisterPluginFunc{
|
||||
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
tf.RegisterPreFilterPlugin(
|
||||
@ -2368,7 +2397,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
||||
nodes: []string{"node1", "node2", "node3"},
|
||||
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
||||
wantNodes: sets.New("node1", "node2"),
|
||||
wantEvaluatedNodes: ptr.To[int32](1),
|
||||
wantEvaluatedNodes: ptr.To[int32](2),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
|
@ -139,6 +139,7 @@ type ScheduleResult struct {
|
||||
SuggestedHost string
|
||||
// The number of nodes the scheduler evaluated the pod against in the filtering
|
||||
// phase and beyond.
|
||||
// Note that it contains the number of nodes that filtered out by PreFilterResult.
|
||||
EvaluatedNodes int
|
||||
// The number of nodes out of the evaluated ones that fit the pod.
|
||||
FeasibleNodes int
|
||||
|
Loading…
Reference in New Issue
Block a user