mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 13:31:52 +00:00
fix: requeue pods rejected by Extenders properly
This commit is contained in:
parent
1f07da7575
commit
468e2dac81
@ -260,6 +260,14 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
|
|||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(f.extenders) > 0 {
|
||||||
|
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
|
||||||
|
// We register a defaultEnqueueExtension to framework.ExtenderName here.
|
||||||
|
// And, in the scheduling cycle, when Extenders reject some Nodes and the pod ends up being unschedulable,
|
||||||
|
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
|
||||||
|
f.enqueueExtensions = []framework.EnqueueExtensions{&defaultEnqueueExtension{pluginName: framework.ExtenderName}}
|
||||||
|
}
|
||||||
|
|
||||||
if profile == nil {
|
if profile == nil {
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
@ -279,6 +279,9 @@ type WeightedAffinityTerm struct {
|
|||||||
Weight int32
|
Weight int32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExtenderName is a fake plugin name put in UnschedulablePlugins when Extender rejected some Nodes.
|
||||||
|
const ExtenderName = "Extender"
|
||||||
|
|
||||||
// Diagnosis records the details to diagnose a scheduling failure.
|
// Diagnosis records the details to diagnose a scheduling failure.
|
||||||
type Diagnosis struct {
|
type Diagnosis struct {
|
||||||
NodeToStatusMap NodeToStatusMap
|
NodeToStatusMap NodeToStatusMap
|
||||||
|
@ -500,11 +500,26 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
|
|||||||
return nil, diagnosis, err
|
return nil, diagnosis, err
|
||||||
}
|
}
|
||||||
|
|
||||||
feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, diagnosis, err
|
return nil, diagnosis, err
|
||||||
}
|
}
|
||||||
return feasibleNodes, diagnosis, nil
|
if len(feasibleNodesAfterExtender) != len(feasibleNodes) {
|
||||||
|
// Extenders filtered out some nodes.
|
||||||
|
//
|
||||||
|
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
|
||||||
|
// When Extenders reject some Nodes and the pod ends up being unschedulable,
|
||||||
|
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
|
||||||
|
// This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ
|
||||||
|
// by any kind of cluster events.
|
||||||
|
// https://github.com/kubernetes/kubernetes/issues/122019
|
||||||
|
if diagnosis.UnschedulablePlugins == nil {
|
||||||
|
diagnosis.UnschedulablePlugins = sets.New[string]()
|
||||||
|
}
|
||||||
|
diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return feasibleNodesAfterExtender, diagnosis, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {
|
||||||
|
@ -1771,6 +1771,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
registerPlugins []tf.RegisterPluginFunc
|
registerPlugins []tf.RegisterPluginFunc
|
||||||
|
extenders []tf.FakeExtender
|
||||||
nodes []string
|
nodes []string
|
||||||
pvcs []v1.PersistentVolumeClaim
|
pvcs []v1.PersistentVolumeClaim
|
||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
@ -2078,6 +2079,39 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "test with extender which filters out some Nodes",
|
||||||
|
registerPlugins: []tf.RegisterPluginFunc{
|
||||||
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||||
|
tf.RegisterFilterPlugin(
|
||||||
|
"FakeFilter",
|
||||||
|
tf.NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
|
||||||
|
),
|
||||||
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
||||||
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||||
|
},
|
||||||
|
extenders: []tf.FakeExtender{
|
||||||
|
{
|
||||||
|
ExtenderName: "FakeExtender1",
|
||||||
|
Predicates: []tf.FitPredicate{tf.FalsePredicateExtender},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
nodes: []string{"1", "2", "3"},
|
||||||
|
pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
||||||
|
wantNodes: nil,
|
||||||
|
wErr: &framework.FitError{
|
||||||
|
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
||||||
|
NumAllNodes: 3,
|
||||||
|
Diagnosis: framework.Diagnosis{
|
||||||
|
NodeToStatusMap: framework.NodeToStatusMap{
|
||||||
|
"1": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "1" failed`),
|
||||||
|
"2": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "2" failed`),
|
||||||
|
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
|
||||||
|
},
|
||||||
|
UnschedulablePlugins: sets.New("FakeFilter", framework.ExtenderName),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
|
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
|
||||||
registerPlugins: []tf.RegisterPluginFunc{
|
registerPlugins: []tf.RegisterPluginFunc{
|
||||||
@ -2336,10 +2370,15 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var extenders []framework.Extender
|
||||||
|
for ii := range test.extenders {
|
||||||
|
extenders = append(extenders, &test.extenders[ii])
|
||||||
|
}
|
||||||
sched := &Scheduler{
|
sched := &Scheduler{
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
nodeInfoSnapshot: snapshot,
|
nodeInfoSnapshot: snapshot,
|
||||||
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||||
|
Extenders: extenders,
|
||||||
}
|
}
|
||||||
sched.applyDefaultHandlers()
|
sched.applyDefaultHandlers()
|
||||||
|
|
||||||
@ -2351,7 +2390,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
|
|||||||
gotFitErr, gotOK := err.(*framework.FitError)
|
gotFitErr, gotOK := err.(*framework.FitError)
|
||||||
wantFitErr, wantOK := test.wErr.(*framework.FitError)
|
wantFitErr, wantOK := test.wErr.(*framework.FitError)
|
||||||
if gotOK != wantOK {
|
if gotOK != wantOK {
|
||||||
t.Errorf("Expected err to be FitError: %v, but got %v", wantOK, gotOK)
|
t.Errorf("Expected err to be FitError: %v, but got %v (error: %v)", wantOK, gotOK, err)
|
||||||
} else if gotOK {
|
} else if gotOK {
|
||||||
if diff := cmp.Diff(gotFitErr, wantFitErr); diff != "" {
|
if diff := cmp.Diff(gotFitErr, wantFitErr); diff != "" {
|
||||||
t.Errorf("Unexpected fitErr: (-want, +got): %s", diff)
|
t.Errorf("Unexpected fitErr: (-want, +got): %s", diff)
|
||||||
|
Loading…
Reference in New Issue
Block a user