diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go index 933b24683ee..cb0de8416f7 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go @@ -638,7 +638,7 @@ func TestDryRunPreemption(t *testing.T) { name: "pod with anti-affinity is preempted", registerPlugins: []tf.RegisterPluginFunc{ tf.RegisterPluginAsExtensions(noderesources.Name, nodeResourcesFitFunc, "Filter", "PreFilter"), - tf.RegisterPluginAsExtensions(interpodaffinity.Name, interpodaffinity.New, "Filter", "PreFilter"), + tf.RegisterPluginAsExtensions(interpodaffinity.Name, frameworkruntime.FactoryAdapter(feature.Features{}, interpodaffinity.New), "Filter", "PreFilter"), }, nodeNames: []string{"node1", "node2"}, testPods: []*v1.Pod{ diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index ccbdb16b76a..272d373d625 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -271,6 +271,7 @@ func statusForClaim(schedulingCtx *resourceapi.PodSchedulingContext, podClaimNam // dynamicResources is a plugin that ensures that ResourceClaims are allocated. type dynamicResources struct { enabled bool + enableSchedulingQueueHint bool controlPlaneControllerEnabled bool fh framework.Handle @@ -345,6 +346,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe pl := &dynamicResources{ enabled: true, controlPlaneControllerEnabled: fts.EnableDRAControlPlaneController, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, fh: fh, clientset: fh.ClientSet(), @@ -380,22 +382,23 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu if !pl.enabled { return nil, nil } + // A resource might depend on node labels for topology filtering. + // A new or updated node may make pods schedulable. + // + // A note about UpdateNodeTaint event: + // Ideally, it's supposed to register only Add | UpdateNodeLabel because UpdateNodeTaint will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint + if pl.enableSchedulingQueueHint { + // When QHint is enabled, the problematic preCheck is already removed, and we can remove UpdateNodeTaint. + nodeActionType = framework.Add | framework.UpdateNodeLabel + } events := []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}}, // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange}, - // A resource might depend on node labels for topology filtering. - // A new or updated node may make pods schedulable. - // - // A note about UpdateNodeTaint event: - // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. - // As a common problematic scenario, - // when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive. - // In such cases, this plugin may miss some events that actually make pods schedulable. - // As a workaround, we add UpdateNodeTaint event to catch the case. - // We can remove UpdateNodeTaint when we remove the preCheck feature. - // See: https://github.com/kubernetes/kubernetes/issues/110175 - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, // A pod might be waiting for a class to get created or modified. {Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}}, // Adding or updating a ResourceSlice might make a pod schedulable because new resources became available. diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go index e6c272856b1..739c6a87dc4 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go @@ -30,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/backend/cache" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + schedruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/metrics" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -544,7 +546,7 @@ func TestRequiredAffinitySingleNode(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node}) - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces) + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, namespaces) state := framework.NewCycleState() _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, state, test.pod) if diff := cmp.Diff(preFilterStatus, test.wantPreFilterStatus); diff != "" { @@ -956,7 +958,7 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() snapshot := cache.NewSnapshot(test.pods, test.nodes) - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, []runtime.Object{ &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "NS1"}}, }) @@ -987,7 +989,7 @@ func TestPreFilterDisabled(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, cache.NewEmptySnapshot(), nil) + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, cache.NewEmptySnapshot(), nil) cycleState := framework.NewCycleState() gotStatus := p.(framework.FilterPlugin).Filter(ctx, cycleState, pod, nodeInfo) wantStatus := framework.AsStatus(fmt.Errorf(`error reading "PreFilterInterPodAffinity" from cycleState: %w`, framework.ErrNotFound)) @@ -1232,7 +1234,7 @@ func TestPreFilterStateAddRemovePod(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil) + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, nil) cycleState := framework.NewCycleState() _, preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(ctx, cycleState, test.pendingPod) if !preFilterStatus.IsSuccess() { @@ -1435,7 +1437,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) defer cancel() - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil) + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, nil) gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(ctx, mustNewPodInfo(t, tt.pod), l) if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) { t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 77aacab2cf6..f29994d3dd3 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -44,10 +45,11 @@ var _ framework.EnqueueExtensions = &InterPodAffinity{} // InterPodAffinity is a plugin that checks inter pod affinity type InterPodAffinity struct { - parallelizer parallelize.Parallelizer - args config.InterPodAffinityArgs - sharedLister framework.SharedLister - nsLister listersv1.NamespaceLister + parallelizer parallelize.Parallelizer + args config.InterPodAffinityArgs + sharedLister framework.SharedLister + nsLister listersv1.NamespaceLister + enableSchedulingQueueHint bool } // Name returns name of the plugin. It is used in logs, etc. @@ -58,6 +60,15 @@ func (pl *InterPodAffinity) Name() string { // EventsToRegister returns the possible events that may make a failed Pod // schedulable func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // A note about UpdateNodeTaint event: + // Ideally, it's supposed to register only Add | UpdateNodeLabel because UpdateNodeTaint will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint + if pl.enableSchedulingQueueHint { + // When QueueingHint is enabled, we don't use preCheck and we don't need to register UpdateNodeTaint event. + nodeActionType = framework.Add | framework.UpdateNodeLabel + } return []framework.ClusterEventWithHint{ // All ActionType includes the following events: // - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, @@ -66,22 +77,13 @@ func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.Clu // an unschedulable Pod schedulable. // - Add. An unschedulable Pod may fail due to violating pod-affinity constraints, // adding an assigned Pod may make it schedulable. - // - // A note about UpdateNodeTaint event: - // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. - // As a common problematic scenario, - // when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive. - // In such cases, this plugin may miss some events that actually make pods schedulable. - // As a workaround, we add UpdateNodeTaint event to catch the case. - // We can remove UpdateNodeTaint when we remove the preCheck feature. - // See: https://github.com/kubernetes/kubernetes/issues/110175 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange}, - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, }, nil } // New initializes a new plugin and returns it. -func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { if h.SnapshotSharedLister() == nil { return nil, fmt.Errorf("SnapshotSharedlister is nil") } @@ -93,10 +95,11 @@ func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framewor return nil, err } pl := &InterPodAffinity{ - parallelizer: h.Parallelizer(), - args: args, - sharedLister: h.SnapshotSharedLister(), - nsLister: h.SharedInformerFactory().Core().V1().Namespaces().Lister(), + parallelizer: h.Parallelizer(), + args: args, + sharedLister: h.SnapshotSharedLister(), + nsLister: h.SharedInformerFactory().Core().V1().Namespaces().Lister(), + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, } return pl, nil diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go index 74a364286a7..3e9d9af20c5 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin_test.go @@ -27,7 +27,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/backend/cache" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + schedruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -134,7 +136,7 @@ func Test_isSchedulableAfterPodChange(t *testing.T) { defer cancel() snapshot := cache.NewSnapshot(nil, nil) - pl := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces) + pl := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, namespaces) p := pl.(*InterPodAffinity) actualHint, err := p.isSchedulableAfterPodChange(logger, tc.pod, tc.oldPod, tc.newPod) if err != nil { @@ -194,7 +196,7 @@ func Test_isSchedulableAfterNodeChange(t *testing.T) { defer cancel() snapshot := cache.NewSnapshot(nil, nil) - pl := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces) + pl := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, namespaces) p := pl.(*InterPodAffinity) actualHint, err := p.isSchedulableAfterNodeChange(logger, tc.pod, tc.oldNode, tc.newNode) if err != nil { diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go index 97e874edc23..6eb0595114c 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring_test.go @@ -30,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/backend/cache" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + schedruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" ) @@ -783,7 +785,7 @@ func TestPreferredAffinity(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1, IgnorePreferredTermsOfExistingPods: test.ignorePreferredTermsOfExistingPods}, cache.NewSnapshot(test.pods, test.nodes), namespaces) + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{HardPodAffinityWeight: 1, IgnorePreferredTermsOfExistingPods: test.ignorePreferredTermsOfExistingPods}, cache.NewSnapshot(test.pods, test.nodes), namespaces) status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, tf.BuildNodeInfos(test.nodes)) if !status.IsSuccess() { @@ -951,7 +953,7 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() state := framework.NewCycleState() - p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces) + p := plugintesting.SetupPluginWithInformers(ctx, t, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces) status := p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, tf.BuildNodeInfos(test.nodes)) if !test.wantStatus.Equal(status) { t.Errorf("InterPodAffinity#PreScore() returned unexpected status.Code got: %v, want: %v", status.Code(), test.wantStatus.Code()) diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 4b6a13473b2..d785372dd04 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" @@ -36,9 +37,10 @@ import ( // NodeAffinity is a plugin that checks if a pod node selector matches the node label. type NodeAffinity struct { - handle framework.Handle - addedNodeSelector *nodeaffinity.NodeSelector - addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms + handle framework.Handle + addedNodeSelector *nodeaffinity.NodeSelector + addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms + enableSchedulingQueueHint bool } var _ framework.PreFilterPlugin = &NodeAffinity{} @@ -84,8 +86,18 @@ func (s *preFilterState) Clone() framework.StateData { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *NodeAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // A note about UpdateNodeTaint event: + // Ideally, it's supposed to register only Add | UpdateNodeLabel because UpdateNodeTaint will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint + if pl.enableSchedulingQueueHint { + // preCheck is not used when QHint is enabled, and hence we can use UpdateNodeLabel instead of Update. + nodeActionType = framework.Add | framework.UpdateNodeLabel + } + return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, }, nil } @@ -279,13 +291,14 @@ func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions { } // New initializes a new plugin and returns it. -func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) { args, err := getArgs(plArgs) if err != nil { return nil, err } pl := &NodeAffinity{ - handle: h, + handle: h, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, } if args.AddedAffinity != nil { if ns := args.AddedAffinity.RequiredDuringSchedulingIgnoredDuringExecution; ns != nil { diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go index 395cfbfc2d7..5f668e7ef04 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/backend/cache" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" @@ -907,7 +908,7 @@ func TestNodeAffinity(t *testing.T) { nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(&node) - p, err := New(ctx, &test.args, nil) + p, err := New(ctx, &test.args, nil, feature.Features{}) if err != nil { t.Fatalf("Creating plugin: %v", err) } @@ -1192,7 +1193,7 @@ func TestNodeAffinityPriority(t *testing.T) { state := framework.NewCycleState() fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes))) - p, err := New(ctx, &test.args, fh) + p, err := New(ctx, &test.args, fh, feature.Features{}) if err != nil { t.Fatalf("Creating plugin: %v", err) } @@ -1345,7 +1346,7 @@ func Test_isSchedulableAfterNodeChange(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) - p, err := New(ctx, tc.args, nil) + p, err := New(ctx, tc.args, nil, feature.Features{}) if err != nil { t.Fatalf("Creating plugin: %v", err) } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name.go b/pkg/scheduler/framework/plugins/nodename/node_name.go index 2e48164765a..29605a464c3 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name.go @@ -22,11 +22,14 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" ) // NodeName is a plugin that checks if a pod spec node name matches the current node. -type NodeName struct{} +type NodeName struct { + enableSchedulingQueueHint bool +} var _ framework.FilterPlugin = &NodeName{} var _ framework.EnqueueExtensions = &NodeName{} @@ -42,8 +45,20 @@ const ( // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *NodeName) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // A note about UpdateNodeTaint/UpdateNodeLabel event: + // Ideally, it's supposed to register only Add because any Node update event will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeTaint | framework.UpdateNodeLabel + if pl.enableSchedulingQueueHint { + // preCheck is not used when QHint is enabled, and hence Update event isn't necessary. + nodeActionType = framework.Add + } + return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, + // We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens. + // (the same as Queue) + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}}, }, nil } @@ -67,6 +82,8 @@ func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool { } // New initializes a new plugin and returns it. -func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { - return &NodeName{}, nil +func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &NodeName{ + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, + }, nil } diff --git a/pkg/scheduler/framework/plugins/nodename/node_name_test.go b/pkg/scheduler/framework/plugins/nodename/node_name_test.go index 0b92da92131..a4ffb914e95 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name_test.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name_test.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" st "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -56,7 +57,7 @@ func TestNodeName(t *testing.T) { nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(test.node) _, ctx := ktesting.NewTestContext(t) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("creating plugin: %v", err) } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index a4ba1474ee2..74350150c3c 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -24,12 +24,15 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" ) // NodePorts is a plugin that checks if a node has free ports for the requested pod ports. -type NodePorts struct{} +type NodePorts struct { + enableSchedulingQueueHint bool +} var _ framework.PreFilterPlugin = &NodePorts{} var _ framework.FilterPlugin = &NodePorts{} @@ -112,16 +115,22 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *NodePorts) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // A note about UpdateNodeTaint/UpdateNodeLabel event: + // Ideally, it's supposed to register only Add because NodeUpdated event never means to have any free ports for the Pod. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeTaint | framework.UpdateNodeLabel + if pl.enableSchedulingQueueHint { + // preCheck is not used when QHint is enabled, and hence Update event isn't necessary. + nodeActionType = framework.Add + } + return []framework.ClusterEventWithHint{ // Due to immutable fields `spec.containers[*].ports`, pod update events are ignored. {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, - // TODO(#110175): Ideally, it's supposed to register only NodeCreated, because NodeUpdated event never means to have any free ports for the Pod. - // But, we may miss NodeCreated event due to preCheck. - // See: https://github.com/kubernetes/kubernetes/issues/109437 - // And, we can remove NodeUpdated event once https://github.com/kubernetes/kubernetes/issues/110175 is solved. // We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens. // (the same as Queue) - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}}, }, nil } @@ -199,6 +208,8 @@ func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool } // New initializes a new plugin and returns it. -func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { - return &NodePorts{}, nil +func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &NodePorts{ + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, + }, nil } diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go index e3649de039b..f126ff006e1 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/klog/v2/ktesting" _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -146,7 +147,7 @@ func TestNodePorts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("creating plugin: %v", err) } @@ -175,7 +176,7 @@ func TestPreFilterDisabled(t *testing.T) { nodeInfo := framework.NewNodeInfo() node := v1.Node{} nodeInfo.SetNode(&node) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("creating plugin: %v", err) } @@ -341,7 +342,7 @@ func Test_isSchedulableAfterPodDeleted(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("Creating plugin: %v", err) } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index d62fab1937b..a6905387539 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -88,6 +88,7 @@ type Fit struct { ignoredResourceGroups sets.Set[string] enableInPlacePodVerticalScaling bool enableSidecarContainers bool + enableSchedulingQueueHint bool handle framework.Handle resourceAllocationScorer } @@ -172,6 +173,7 @@ func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts fe ignoredResourceGroups: sets.New(args.IgnoredResourceGroups...), enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling, enableSidecarContainers: fts.EnableSidecarContainers, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, handle: h, resourceAllocationScorer: *scorePlugin(args), }, nil @@ -254,9 +256,20 @@ func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithH // for this plugin since a Pod update may free up resources that make other Pods schedulable. podActionType |= framework.UpdatePodScaleDown } + + // A note about UpdateNodeTaint/UpdateNodeLabel event: + // Ideally, it's supposed to register only Add | UpdateNodeAllocatable because the only resource update could change the node resource fit plugin's result. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeAllocatable | framework.UpdateNodeTaint | framework.UpdateNodeLabel + if f.enableSchedulingQueueHint { + // preCheck is not used when QHint is enabled, and hence Update event isn't necessary. + nodeActionType = framework.Add | framework.UpdateNodeAllocatable + } + return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}, QueueingHintFn: f.isSchedulableAfterNodeChange}, }, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index b5c71eb4f50..0f5ed1f097a 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -1087,31 +1087,40 @@ func BenchmarkTestFitScore(b *testing.B) { func TestEventsToRegister(t *testing.T) { tests := []struct { - name string - inPlacePodVerticalScalingEnabled bool - expectedClusterEvents []framework.ClusterEventWithHint + name string + enableInPlacePodVerticalScaling bool + enableSchedulingQueueHint bool + expectedClusterEvents []framework.ClusterEventWithHint }{ { - "Register events with InPlacePodVerticalScaling feature enabled", - true, - []framework.ClusterEventWithHint{ + name: "Register events with InPlacePodVerticalScaling feature enabled", + enableInPlacePodVerticalScaling: true, + expectedClusterEvents: []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.UpdatePodScaleDown | framework.Delete}}, - {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.UpdateNodeAllocatable | framework.UpdateNodeTaint | framework.UpdateNodeLabel}}, }, }, { - "Register events with InPlacePodVerticalScaling feature disabled", - false, - []framework.ClusterEventWithHint{ + name: "Register events with SchedulingQueueHint feature enabled", + enableSchedulingQueueHint: true, + expectedClusterEvents: []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Delete}}, - {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.Update}}, + {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.UpdateNodeAllocatable}}, + }, + }, + { + name: "Register events with InPlacePodVerticalScaling feature disabled", + enableInPlacePodVerticalScaling: false, + expectedClusterEvents: []framework.ClusterEventWithHint{ + {Event: framework.ClusterEvent{Resource: "Pod", ActionType: framework.Delete}}, + {Event: framework.ClusterEvent{Resource: "Node", ActionType: framework.Add | framework.UpdateNodeAllocatable | framework.UpdateNodeTaint | framework.UpdateNodeLabel}}, }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled} + fp := &Fit{enableInPlacePodVerticalScaling: test.enableInPlacePodVerticalScaling, enableSchedulingQueueHint: test.enableSchedulingQueueHint} _, ctx := ktesting.NewTestContext(t) actualClusterEvents, err := fp.EventsToRegister(ctx) if err != nil { diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go index a2c4aae372f..7cc7988061b 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go @@ -24,6 +24,7 @@ import ( v1helper "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -31,6 +32,7 @@ import ( // NodeUnschedulable plugin filters nodes that set node.Spec.Unschedulable=true unless // the pod tolerates {key=node.kubernetes.io/unschedulable, effect:NoSchedule} taint. type NodeUnschedulable struct { + enableSchedulingQueueHint bool } var _ framework.FilterPlugin = &NodeUnschedulable{} @@ -49,8 +51,9 @@ const ( // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *NodeUnschedulable) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // preCheck is not used when QHint is enabled. return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, }, nil } @@ -101,6 +104,6 @@ func (pl *NodeUnschedulable) Filter(ctx context.Context, _ *framework.CycleState } // New initializes a new plugin and returns it. -func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { - return &NodeUnschedulable{}, nil +func New(_ context.Context, _ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &NodeUnschedulable{enableSchedulingQueueHint: fts.EnableSchedulingQueueHint}, nil } diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go index f66bb5611d6..d9659d6c66c 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable_test.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/test/utils/ktesting" ) @@ -75,7 +76,7 @@ func TestNodeUnschedulable(t *testing.T) { nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(test.node) _, ctx := ktesting.NewTestContext(t) - p, err := New(ctx, nil, nil) + p, err := New(ctx, nil, nil, feature.Features{}) if err != nil { t.Fatalf("creating plugin: %v", err) } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index 7bcfbd7f3b0..361c3362e9b 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -146,15 +146,6 @@ func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.Cl {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.UpdatePodLabel | framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodChange}, // Node add|delete|update maybe lead an topology key changed, // and make these pod in scheduling schedulable or unschedulable. - // - // A note about UpdateNodeTaint event: - // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. - // As a common problematic scenario, - // when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive. - // In such cases, this plugin may miss some events that actually make pods schedulable. - // As a workaround, we add UpdateNodeTaint event to catch the case. - // We can remove UpdateNodeTaint when we remove the preCheck feature. - // See: https://github.com/kubernetes/kubernetes/issues/110175 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, }, nil } @@ -251,6 +242,10 @@ func (pl *PodTopologySpread) getConstraints(pod *v1.Pod) ([]topologySpreadConstr return constraints, nil } +// isSchedulableAfterNodeChange returns Queue when node has topologyKey in its labels, else return QueueSkip. +// +// TODO: we can filter out node update events in a more fine-grained way once preCheck is completely removed. +// See: https://github.com/kubernetes/kubernetes/issues/110175 func (pl *PodTopologySpread) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj) if err != nil { @@ -262,10 +257,6 @@ func (pl *PodTopologySpread) isSchedulableAfterNodeChange(logger klog.Logger, po return framework.Queue, err } - // framework.Add/framework.Update: return Queue when node has topologyKey in its labels, else return QueueSkip. - // - // TODO: we can filter out node update events in a more fine-grained way once preCheck is completely removed. - // See: https://github.com/kubernetes/kubernetes/issues/110175 if modifiedNode != nil { if !nodeLabelsMatchSpreadConstraints(modifiedNode.Labels, constraints) { logger.V(5).Info("the created/updated node doesn't match pod topology spread constraints", diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 68f4e3e9a9d..2903a65c87c 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -60,18 +60,18 @@ func NewInTreeRegistry() runtime.Registry { dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New), imagelocality.Name: imagelocality.New, tainttoleration.Name: runtime.FactoryAdapter(fts, tainttoleration.New), - nodename.Name: nodename.New, - nodeports.Name: nodeports.New, - nodeaffinity.Name: nodeaffinity.New, + nodename.Name: runtime.FactoryAdapter(fts, nodename.New), + nodeports.Name: runtime.FactoryAdapter(fts, nodeports.New), + nodeaffinity.Name: runtime.FactoryAdapter(fts, nodeaffinity.New), podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New), - nodeunschedulable.Name: nodeunschedulable.New, + nodeunschedulable.Name: runtime.FactoryAdapter(fts, nodeunschedulable.New), noderesources.Name: runtime.FactoryAdapter(fts, noderesources.NewFit), noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation), volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New), volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New), - volumezone.Name: volumezone.New, + volumezone.Name: runtime.FactoryAdapter(fts, volumezone.New), nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI), - interpodaffinity.Name: interpodaffinity.New, + interpodaffinity.Name: runtime.FactoryAdapter(fts, interpodaffinity.New), queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New), diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index 4773c0fdb35..78bb73f1ee8 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -59,19 +59,26 @@ func (pl *TaintToleration) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { - clusterEventWithHint := []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + if pl.enableSchedulingQueueHint { + return []framework.ClusterEventWithHint{ + // When the QueueingHint feature is enabled, preCheck is eliminated and we don't need additional UpdateNodeLabel. + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + // When the QueueingHint feature is enabled, + // the scheduling queue uses Pod/Update Queueing Hint + // to determine whether a Pod's update makes the Pod schedulable or not. + // https://github.com/kubernetes/kubernetes/pull/122234 + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange}, + }, nil } - if !pl.enableSchedulingQueueHint { - return clusterEventWithHint, nil - } - // When the QueueingHint feature is enabled, - // the scheduling queue uses Pod/Update Queueing Hint - // to determine whether a Pod's update makes the Pod schedulable or not. - // https://github.com/kubernetes/kubernetes/pull/122234 - clusterEventWithHint = append(clusterEventWithHint, - framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodTolerations}, QueueingHintFn: pl.isSchedulableAfterPodTolerationChange}) - return clusterEventWithHint, nil + + return []framework.ClusterEventWithHint{ + // A note about UpdateNodeLabel event: + // Ideally, it's supposed to register only Add | UpdateNodeTaint because UpdateNodeLabel will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeTaint | framework.UpdateNodeLabel}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + // No need to register the Pod event; the update to the unschedulable Pods already triggers the scheduling retry when QHint is disabled. + }, nil } // isSchedulableAfterNodeChange is invoked for all node events reported by diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 2dcb72fdccb..8945b4faac3 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -95,6 +95,19 @@ func (pl *VolumeBinding) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // Pods may fail to find available PVs because the node labels do not + // match the storage class's allowed topologies or PV's node affinity. + // A new or updated node may make pods schedulable. + // + // A note about UpdateNodeTaint event: + // Ideally, it's supposed to register only Add | UpdateNodeLabel because UpdateNodeTaint will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint + if pl.fts.EnableSchedulingQueueHint { + // When scheduling queue hint is enabled, we don't use the problematic preCheck and don't need to register UpdateNodeTaint event. + nodeActionType = framework.Add | framework.UpdateNodeLabel + } events := []framework.ClusterEventWithHint{ // Pods may fail because of missing or mis-configured storage class // (e.g., allowedTopologies, volumeBindingMode), and hence may become @@ -105,19 +118,7 @@ func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.Cluste {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}}, - // Pods may fail to find available PVs because the node labels do not - // match the storage class's allowed topologies or PV's node affinity. - // A new or updated node may make pods schedulable. - // - // A note about UpdateNodeTaint event: - // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. - // As a common problematic scenario, - // when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive. - // In such cases, this plugin may miss some events that actually make pods schedulable. - // As a workaround, we add UpdateNodeTaint event to catch the case. - // We can remove UpdateNodeTaint when we remove the preCheck feature. - // See: https://github.com/kubernetes/kubernetes/issues/110175 - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}}, // We rely on CSI node to translate in-tree PV to CSI. // TODO: kube-schduler will unregister the CSINode events once all the volume plugins has completed their CSI migration. diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 192c65e8921..6b22340b4ab 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -35,8 +35,9 @@ import ( // VolumeRestrictions is a plugin that checks volume restrictions. type VolumeRestrictions struct { - pvcLister corelisters.PersistentVolumeClaimLister - sharedLister framework.SharedLister + pvcLister corelisters.PersistentVolumeClaimLister + sharedLister framework.SharedLister + enableSchedulingQueueHint bool } var _ framework.PreFilterPlugin = &VolumeRestrictions{} @@ -319,6 +320,16 @@ func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework. // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *VolumeRestrictions) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // A note about UpdateNodeTaint/UpdateNodeLabel event: + // Ideally, it's supposed to register only Add because any Node update event will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeTaint | framework.UpdateNodeLabel + if pl.enableSchedulingQueueHint { + // preCheck is not used when QHint is enabled, and hence Update event isn't necessary. + nodeActionType = framework.Add + } + return []framework.ClusterEventWithHint{ // Pods may fail to schedule because of volumes conflicting with other pods on same node. // Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable. @@ -326,7 +337,7 @@ func (pl *VolumeRestrictions) EventsToRegister(_ context.Context) ([]framework.C {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, // A new Node may make a pod schedulable. // We intentionally don't set QueueingHint since all Node/Add events could make Pods schedulable. - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}}, // Pods may fail to schedule because the PVC it uses has not yet been created. // This PVC is required to exist to check its access modes. {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, @@ -408,7 +419,8 @@ func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts featu sharedLister := handle.SnapshotSharedLister() return &VolumeRestrictions{ - pvcLister: pvcLister, - sharedLister: sharedLister, + pvcLister: pvcLister, + sharedLister: sharedLister, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, }, nil } diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index b590858a175..ec39f2ef7ca 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -33,15 +33,17 @@ import ( storagehelpers "k8s.io/component-helpers/storage/volume" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" ) // VolumeZone is a plugin that checks volume zone. type VolumeZone struct { - pvLister corelisters.PersistentVolumeLister - pvcLister corelisters.PersistentVolumeClaimLister - scLister storagelisters.StorageClassLister + pvLister corelisters.PersistentVolumeLister + pvcLister corelisters.PersistentVolumeClaimLister + scLister storagelisters.StorageClassLister + enableSchedulingQueueHint bool } var _ framework.FilterPlugin = &VolumeZone{} @@ -260,21 +262,22 @@ func getErrorAsStatus(err error) *framework.Status { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *VolumeZone) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + // A new node or updating a node's volume zone labels may make a pod schedulable. + // A note about UpdateNodeTaint event: + // Ideally, it's supposed to register only Add | UpdateNodeLabel because UpdateNodeTaint will never change the result from this plugin. + // But, we may miss Node/Add event due to preCheck, and we decided to register UpdateNodeTaint | UpdateNodeLabel for all plugins registering Node/Add. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + nodeActionType := framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint + if pl.enableSchedulingQueueHint { + // preCheck is not used when QHint is enabled. + nodeActionType = framework.Add | framework.UpdateNodeLabel + } + return []framework.ClusterEventWithHint{ // New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable. // Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored. {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}, QueueingHintFn: pl.isSchedulableAfterStorageClassAdded}, - // A new node or updating a node's volume zone labels may make a pod schedulable. - // - // A note about UpdateNodeTaint event: - // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. - // As a common problematic scenario, - // when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive. - // In such cases, this plugin may miss some events that actually make pods schedulable. - // As a workaround, we add UpdateNodeTaint event to catch the case. - // We can remove UpdateNodeTaint when we remove the preCheck feature. - // See: https://github.com/kubernetes/kubernetes/issues/110175 - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}}, // A new pvc may make a pod schedulable. // Also, if pvc's VolumeName is filled, that also could make a pod schedulable. {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange}, @@ -393,14 +396,15 @@ func (pl *VolumeZone) getPVTopologies(logger klog.Logger, pv *v1.PersistentVolum } // New initializes a new plugin and returns it. -func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvLister := informerFactory.Core().V1().PersistentVolumes().Lister() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() scLister := informerFactory.Storage().V1().StorageClasses().Lister() return &VolumeZone{ - pvLister, - pvcLister, - scLister, + pvLister: pvLister, + pvcLister: pvcLister, + scLister: scLister, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, }, nil } diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go index 774fb96cbc5..232517bb00a 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone_test.go @@ -31,7 +31,9 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/backend/cache" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + schedruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" st "k8s.io/kubernetes/pkg/scheduler/testing" tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" ) @@ -267,9 +269,10 @@ func TestSingleZone(t *testing.T) { node := &framework.NodeInfo{} node.SetNode(test.Node) p := &VolumeZone{ - pvLister, - pvcLister, - nil, + pvLister: pvLister, + pvcLister: pvcLister, + scLister: nil, + enableSchedulingQueueHint: false, } _, preFilterStatus := p.PreFilter(ctx, state, test.Pod) if diff := cmp.Diff(preFilterStatus, test.wantPreFilterStatus); diff != "" { @@ -399,9 +402,10 @@ func TestMultiZone(t *testing.T) { node := &framework.NodeInfo{} node.SetNode(test.Node) p := &VolumeZone{ - pvLister, - pvcLister, - nil, + pvLister: pvLister, + pvcLister: pvcLister, + scLister: nil, + enableSchedulingQueueHint: false, } _, preFilterStatus := p.PreFilter(ctx, state, test.Pod) if diff := cmp.Diff(preFilterStatus, test.wantPreFilterStatus); diff != "" { @@ -524,9 +528,10 @@ func TestWithBinding(t *testing.T) { node := &framework.NodeInfo{} node.SetNode(test.Node) p := &VolumeZone{ - pvLister, - pvcLister, - scLister, + pvLister: pvLister, + pvcLister: pvcLister, + scLister: scLister, + enableSchedulingQueueHint: false, } _, preFilterStatus := p.PreFilter(ctx, state, test.Pod) if diff := cmp.Diff(preFilterStatus, test.wantPreFilterStatus); diff != "" { @@ -841,7 +846,7 @@ func newPluginWithListers(ctx context.Context, tb testing.TB, pods []*v1.Pod, no for _, pv := range pvs { objects = append(objects, pv) } - return plugintesting.SetupPluginWithInformers(ctx, tb, New, &config.InterPodAffinityArgs{}, snapshot, objects) + return plugintesting.SetupPluginWithInformers(ctx, tb, schedruntime.FactoryAdapter(feature.Features{}, New), &config.InterPodAffinityArgs{}, snapshot, objects) } func makePVsWithZoneLabel(num int) []*v1.PersistentVolume { diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index d12bf8eb4e6..90427b73237 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -56,6 +56,7 @@ const ( // for better performance in requeueing. UpdateNodeAllocatable UpdateNodeLabel + // UpdateNodeTaint is an update for node's taints or node.Spec.Unschedulable. UpdateNodeTaint UpdateNodeCondition UpdateNodeAnnotation diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index f1b2dbdc04a..78b4748bb4c 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -873,7 +873,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { fns := []tf.RegisterPluginFunc{ tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - tf.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"), + tf.RegisterPluginAsExtensions(nodeports.Name, frameworkruntime.FactoryAdapter(feature.Features{}, nodeports.New), "Filter", "PreFilter"), } scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, pod, &node, fns...) @@ -938,7 +938,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { fns := []tf.RegisterPluginFunc{ tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - tf.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"), + tf.RegisterPluginAsExtensions(nodeports.Name, frameworkruntime.FactoryAdapter(feature.Features{}, nodeports.New), "Filter", "PreFilter"), } scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, firstPod, &node, fns...) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index a75d3b476c1..573602cf5a7 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -867,11 +867,11 @@ func Test_UnionedGVKs(t *testing.T) { want: map[framework.GVK]framework.ActionType{}, }, { - name: "plugins with default profile (InPlacePodVerticalScaling: disabled)", + name: "plugins with default profile (No feature gate enabled)", plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, want: map[framework.GVK]framework.ActionType{ framework.Pod: framework.Add | framework.UpdatePodLabel | framework.Delete, - framework.Node: framework.All, + framework.Node: framework.Add | framework.UpdateNodeAllocatable | framework.UpdateNodeLabel | framework.UpdateNodeTaint | framework.Delete, framework.CSINode: framework.All - framework.Delete, framework.CSIDriver: framework.All - framework.Delete, framework.CSIStorageCapacity: framework.All - framework.Delete, @@ -885,7 +885,7 @@ func Test_UnionedGVKs(t *testing.T) { plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, want: map[framework.GVK]framework.ActionType{ framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.Delete, - framework.Node: framework.All, + framework.Node: framework.Add | framework.UpdateNodeAllocatable | framework.UpdateNodeLabel | framework.UpdateNodeTaint | framework.Delete, framework.CSINode: framework.All - framework.Delete, framework.CSIDriver: framework.All - framework.Delete, framework.CSIStorageCapacity: framework.All - framework.Delete, @@ -896,11 +896,11 @@ func Test_UnionedGVKs(t *testing.T) { enableInPlacePodVerticalScaling: true, }, { - name: "plugins with default profile (queueingHint: enabled)", + name: "plugins with default profile (queueingHint/InPlacePodVerticalScaling: enabled)", plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled}, want: map[framework.GVK]framework.ActionType{ framework.Pod: framework.Add | framework.UpdatePodLabel | framework.UpdatePodScaleDown | framework.UpdatePodTolerations | framework.UpdatePodSchedulingGatesEliminated | framework.Delete, - framework.Node: framework.All, + framework.Node: framework.Add | framework.UpdateNodeAllocatable | framework.UpdateNodeLabel | framework.UpdateNodeTaint | framework.Delete, framework.CSINode: framework.All - framework.Delete, framework.CSIDriver: framework.All - framework.Delete, framework.CSIStorageCapacity: framework.All - framework.Delete,