From b478621596295689d9944b9d168bc42a49ead705 Mon Sep 17 00:00:00 2001 From: olderTaoist Date: Tue, 2 Jul 2024 09:26:58 +0800 Subject: [PATCH] register unscheduable plugin when prefileter with NodeNames --- .../framework_contract_test.go | 3 +- pkg/scheduler/framework/interface.go | 5 +- .../default_preemption_test.go | 8 +-- .../framework/preemption/preemption_test.go | 2 +- pkg/scheduler/framework/runtime/framework.go | 18 ++--- .../framework/runtime/framework_test.go | 3 +- pkg/scheduler/schedule_one.go | 3 +- pkg/scheduler/schedule_one_test.go | 9 +-- test/integration/scheduler/queue_test.go | 67 ++++++++++++++----- 9 files changed, 80 insertions(+), 38 deletions(-) diff --git a/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go b/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go index 13feb969cad..25c3f87e37a 100644 --- a/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go +++ b/pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go @@ -26,13 +26,14 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" ) type frameworkContract interface { - RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) + RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status, sets.Set[string]) RunFilterPlugins(context.Context, *framework.CycleState, *v1.Pod, *framework.NodeInfo) *framework.Status } diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index c5b7d89ceed..b32cd6f2abf 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -588,7 +588,10 @@ type Framework interface { // cycle is aborted. // It also returns a PreFilterResult, which may influence what or how many nodes to // evaluate downstream. - RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status) + // The third returns value contains PreFilter plugin that rejected some or all Nodes with PreFilterResult. + // But, note that it doesn't contain any plugin when a plugin rejects this Pod with non-success status, + // not with PreFilterResult. + RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status, sets.Set[string]) // RunPostFilterPlugins runs the set of configured PostFilter plugins. // PostFilter plugins can either be informational, in which case should be configured diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index e04a946b261..af8965e0c53 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -383,7 +383,7 @@ func TestPostFilter(t *testing.T) { state := framework.NewCycleState() // Ensure is populated. - if _, status := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { + if _, status, _ := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { t.Errorf("Unexpected PreFilter Status: %v", status) } @@ -1141,7 +1141,7 @@ func TestDryRunPreemption(t *testing.T) { for cycle, pod := range tt.testPods { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, status := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() { + if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() { t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status) } pe := preemption.Evaluator{ @@ -1371,7 +1371,7 @@ func TestSelectBestCandidate(t *testing.T) { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { + if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { t.Errorf("Unexpected PreFilter Status: %v", status) } nodeInfos, err := snapshot.NodeInfos().List() @@ -1784,7 +1784,7 @@ func TestPreempt(t *testing.T) { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, s := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() { + if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() { t.Errorf("Unexpected preFilterStatus: %v", s) } // Call preempt and check the expected results. diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 0bf384239ac..79e71b49057 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -456,7 +456,7 @@ func TestSelectCandidate(t *testing.T) { state := framework.NewCycleState() // Some tests rely on PreFilter plugin to compute its CycleState. - if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { + if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() { t.Errorf("Unexpected PreFilter Status: %v", status) } nodeInfos, err := snapshot.NodeInfos().List() diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index ed1b76a585a..132a099d0ef 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -684,7 +684,7 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { // When it returns Skip status, returned PreFilterResult and other fields in status are just ignored, // and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle. // If a non-success status is returned, then the scheduling cycle is aborted. -func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) { +func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status, _ sets.Set[string]) { startTime := time.Now() skipPlugins := sets.New[string]() defer func() { @@ -692,7 +692,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) }() var result *framework.PreFilterResult - var pluginsWithNodes []string + pluginsWithNodes := sets.New[string]() logger := klog.FromContext(ctx) verboseLogs := logger.V(4).Enabled() if verboseLogs { @@ -715,7 +715,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor if s.Code() == framework.UnschedulableAndUnresolvable { // In this case, the preemption shouldn't happen in this scheduling cycle. // So, no need to execute all PreFilter. - return nil, s + return nil, s, nil } if s.Code() == framework.Unschedulable { // In this case, the preemption should happen later in this scheduling cycle. @@ -724,23 +724,23 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor returnStatus = s continue } - return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name()) + return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name()), nil } if !r.AllNodes() { - pluginsWithNodes = append(pluginsWithNodes, pl.Name()) + pluginsWithNodes.Insert(pl.Name()) } result = result.Merge(r) if !result.AllNodes() && len(result.NodeNames) == 0 { - msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes) + msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", sets.List(pluginsWithNodes)) if len(pluginsWithNodes) == 1 { - msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0]) + msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", sets.List(pluginsWithNodes)[0]) } // When PreFilterResult filters out Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable". - return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg) + return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg), pluginsWithNodes } } - return result, returnStatus + return result, returnStatus, pluginsWithNodes } func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 8e9638ee4d6..269907074f4 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1531,7 +1531,6 @@ func TestPreFilterPlugins(t *testing.T) { t.Fatalf("Failed to create framework for testing: %v", err) } state := framework.NewCycleState() - f.RunPreFilterPlugins(ctx, state, nil) f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil) f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil) @@ -1721,7 +1720,7 @@ func TestRunPreFilterPlugins(t *testing.T) { } state := framework.NewCycleState() - result, status := f.RunPreFilterPlugins(ctx, state, nil) + result, status, _ := f.RunPreFilterPlugins(ctx, state, nil) if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" { t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d) } diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 9324a7f1e12..2ac55fc6b60 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -461,7 +461,8 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F return nil, diagnosis, err } // Run "prefilter" plugins. - preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod) + preRes, s, unscheduledPlugins := fwk.RunPreFilterPlugins(ctx, state, pod) + diagnosis.UnschedulablePlugins = unscheduledPlugins if !s.IsSuccess() { if !s.IsRejected() { return nil, diagnosis, s.AsError() diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index fdb4519a46c..2af9ab6e32d 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -2283,7 +2283,7 @@ func TestSchedulerSchedulePod(t *testing.T) { "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"), "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"), }, - UnschedulablePlugins: sets.Set[string]{}, + UnschedulablePlugins: sets.New("FakePreFilter2", "FakePreFilter3"), PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously", }, }, @@ -2311,7 +2311,7 @@ func TestSchedulerSchedulePod(t *testing.T) { NodeToStatusMap: framework.NodeToStatusMap{ "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"), }, - UnschedulablePlugins: sets.Set[string]{}, + UnschedulablePlugins: sets.New("FakePreFilter2"), PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2", }, }, @@ -2339,7 +2339,7 @@ func TestSchedulerSchedulePod(t *testing.T) { NodeToStatusMap: framework.NodeToStatusMap{ "node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"), }, - UnschedulablePlugins: sets.New("FakeFilter"), + UnschedulablePlugins: sets.New("FakePreFilter", "FakeFilter"), PreFilterMsg: "", }, }, @@ -2444,7 +2444,8 @@ func TestSchedulerSchedulePod(t *testing.T) { Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(), NumAllNodes: 2, Diagnosis: framework.Diagnosis{ - NodeToStatusMap: framework.NodeToStatusMap{}, + NodeToStatusMap: framework.NodeToStatusMap{}, + UnschedulablePlugins: sets.New("FakePreFilter"), }, }, }, diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index b32f12384ca..c0c9f258202 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -188,7 +188,7 @@ func TestCoreResourceEnqueue(t *testing.T) { tests := []struct { name string // initialNode is the Node to be created at first. - initialNode *v1.Node + initialNodes []*v1.Node // initialPod is the Pod to be created at first if it's not empty. initialPod *v1.Pod // pods are the list of Pods to be created. @@ -202,8 +202,8 @@ func TestCoreResourceEnqueue(t *testing.T) { enableSchedulingQueueHint []bool }{ { - name: "Pod without a required toleration to a node isn't requeued to activeQ", - initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), + name: "Pod without a required toleration to a node isn't requeued to activeQ", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()}, pods: []*v1.Pod{ // - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. // (TaintToleration plugin is evaluated before NodeResourcesFit plugin.) @@ -224,9 +224,9 @@ func TestCoreResourceEnqueue(t *testing.T) { enableSchedulingQueueHint: []bool{false, true}, }, { - name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready", - initialNode: st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), - initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(), + name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()}, + initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(), pods: []*v1.Pod{ // - Pod2 will be rejected by the PodAffinity plugin. st.MakePod().Label("anti", "anti").Name("pod2").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(), @@ -254,8 +254,8 @@ func TestCoreResourceEnqueue(t *testing.T) { enableSchedulingQueueHint: []bool{false, true}, }, { - name: "Pod updated with toleration requeued to activeQ", - initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj(), + name: "Pod updated with toleration requeued to activeQ", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj()}, pods: []*v1.Pod{ // - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(), @@ -272,8 +272,8 @@ func TestCoreResourceEnqueue(t *testing.T) { enableSchedulingQueueHint: []bool{false, true}, }, { - name: "Pod got resource scaled down requeued to activeQ", - initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + name: "Pod got resource scaled down requeued to activeQ", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()}, pods: []*v1.Pod{ // - Pod1 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin. st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(), @@ -290,8 +290,8 @@ func TestCoreResourceEnqueue(t *testing.T) { enableSchedulingQueueHint: []bool{false, true}, }, { - name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration", - initialNode: st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), + name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration", + initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()}, pods: []*v1.Pod{ // - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. st.MakePod().Name("pod1").Container("image").Obj(), @@ -327,6 +327,41 @@ func TestCoreResourceEnqueue(t *testing.T) { // because QHint of TaintToleration would decide to ignore a Pod update. enableSchedulingQueueHint: []bool{true}, }, + { + // The test case makes sure that PreFilter plugins returning PreFilterResult are also inserted into pInfo.UnschedulablePlugins + // meaning, they're taken into consideration during requeuing flow of the queue. + // https://github.com/kubernetes/kubernetes/issues/122018 + name: "Pod rejected by the PreFilter of NodeAffinity plugin and Filter of NodeResourcesFit is requeued based on both plugins", + initialNodes: []*v1.Node{ + st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + st.MakeNode().Name("fake-node2").Label("node", "fake-node2").Label("zone", "zone1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + }, + pods: []*v1.Pod{ + // - Pod1 will be rejected by the NodeAffinity plugin and NodeResourcesFit plugin. + st.MakePod().Label("unscheduled", "plugins").Name("pod1").NodeAffinityIn("metadata.name", []string{"fake-node"}).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + // Trigger a NodeDeleted event. + // It will not requeue pod1 to activeQ, + // because both NodeAffinity and NodeResourceFit don't register Node/delete event. + err := testCtx.ClientSet.CoreV1().Nodes().Delete(testCtx.Ctx, "fake-node", metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("failed to delete fake-node node") + } + + // Trigger a NodeCreated event. + // It will requeue pod1 to activeQ, because QHint of NodeAffinity return Queue. + // It makes sure PreFilter plugins returned PreFilterResult takes an effect for sure, + // because NodeResourceFit QHint returns QueueSkip for this event actually. + node := st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj() + if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create a new node: %w", err) + } + + return nil + }, + wantRequeuedPods: sets.New("pod1"), + }, } for _, tt := range tests { @@ -350,9 +385,11 @@ func TestCoreResourceEnqueue(t *testing.T) { defer testCtx.Scheduler.SchedulingQueue.Close() cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx - // Create initialNode. - if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil { - t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err) + // Create one Node with a taint. + for _, node := range tt.initialNodes { + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create an initial Node %q: %v", node.Name, err) + } } if tt.initialPod != nil {