From a3978e8315556b7b1d024ddc0ba8c94ccc46d316 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Tue, 16 Jul 2024 13:47:31 +0800 Subject: [PATCH] scheduler: Add ctx param and error return to EnqueueExtensions.EventsToRegister() --- pkg/scheduler/framework/interface.go | 8 +-- .../dynamicresources/dynamicresources.go | 6 +-- .../plugins/interpodaffinity/plugin.go | 4 +- .../plugins/nodeaffinity/node_affinity.go | 4 +- .../framework/plugins/nodename/node_name.go | 4 +- .../framework/plugins/nodeports/node_ports.go | 4 +- .../framework/plugins/noderesources/fit.go | 4 +- .../plugins/noderesources/fit_test.go | 5 +- .../nodeunschedulable/node_unschedulable.go | 4 +- .../framework/plugins/nodevolumelimits/csi.go | 4 +- .../plugins/nodevolumelimits/non_csi.go | 4 +- .../plugins/podtopologyspread/plugin.go | 4 +- .../schedulinggates/scheduling_gates.go | 6 +-- .../tainttoleration/taint_toleration.go | 6 +-- .../plugins/volumebinding/volume_binding.go | 4 +- .../volumerestrictions/volume_restrictions.go | 4 +- .../plugins/volumezone/volume_zone.go | 4 +- pkg/scheduler/framework/runtime/framework.go | 4 +- pkg/scheduler/scheduler.go | 23 ++++++-- pkg/scheduler/scheduler_test.go | 54 +++++++++++++++---- .../eventhandler/eventhandler_test.go | 4 +- .../scheduler/plugins/plugins_test.go | 8 +-- test/integration/scheduler/queue_test.go | 8 +-- .../scheduler/rescheduling_test.go | 8 +-- 24 files changed, 122 insertions(+), 66 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 8b16688c3a5..d8ac59d47e7 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -375,11 +375,13 @@ type EnqueueExtensions interface { // filters out events to reduce useless retry of Pod's scheduling. // The events will be registered when instantiating the internal scheduling queue, // and leveraged to build event handlers dynamically. - // Note: the returned list needs to be static (not depend on configuration parameters); - // otherwise it would lead to undefined behavior. + // When it returns an error, the scheduler fails to start. + // Note: the returned list needs to be determined at a startup, + // and the scheduler only evaluates it once during start up. + // Do not change the result during runtime, for example, based on the cluster's state etc. // // Appropriate implementation of this function will make Pod's re-scheduling accurate and performant. - EventsToRegister() []ClusterEventWithHint + EventsToRegister(context.Context) ([]ClusterEventWithHint, error) } // PreFilterExtensions is an interface that is included in plugins that allow specifying diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 90039854a14..dd54fee5611 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -429,9 +429,9 @@ func (pl *dynamicResources) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { if !pl.enabled { - return nil + return nil, nil } events := []framework.ClusterEventWithHint{ @@ -460,7 +460,7 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint // A pod might be waiting for a class to get created or modified. {Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}}, } - return events + return events, nil } // PreEnqueue checks if there are known reasons why a pod currently cannot be diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 738ba70aed4..76ff0bca5f1 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -57,7 +57,7 @@ func (pl *InterPodAffinity) Name() string { // EventsToRegister returns the possible events that may make a failed Pod // schedulable -func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ // All ActionType includes the following events: // - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, @@ -77,7 +77,7 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint // See: https://github.com/kubernetes/kubernetes/issues/110175 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, - } + }, nil } // New initializes a new plugin and returns it. diff --git a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go index 2cca3a4a932..4b6a13473b2 100644 --- a/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go +++ b/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go @@ -83,10 +83,10 @@ func (s *preFilterState) Clone() framework.StateData { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *NodeAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, - } + }, nil } // isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether diff --git a/pkg/scheduler/framework/plugins/nodename/node_name.go b/pkg/scheduler/framework/plugins/nodename/node_name.go index ad222e4cf04..2e48164765a 100644 --- a/pkg/scheduler/framework/plugins/nodename/node_name.go +++ b/pkg/scheduler/framework/plugins/nodename/node_name.go @@ -41,10 +41,10 @@ const ( // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodeName) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *NodeName) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, - } + }, nil } // Name returns name of the plugin. It is used in logs, etc. diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index 791cf35549a..a4ba1474ee2 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -111,7 +111,7 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *NodePorts) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ // Due to immutable fields `spec.containers[*].ports`, pod update events are ignored. {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, @@ -122,7 +122,7 @@ func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint { // We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens. // (the same as Queue) {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, - } + }, nil } // isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 8e4c3c34c99..7b8c8ec174f 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -247,7 +247,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint { +func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { podActionType := framework.Delete if f.enableInPlacePodVerticalScaling { // If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered @@ -257,7 +257,7 @@ func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange}, - } + }, nil } // isSchedulableAfterPodChange is invoked whenever a pod deleted or updated. It checks whether diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index c3b5a332988..d0cdddfbb6a 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -1112,7 +1112,10 @@ func TestEventsToRegister(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled} - actualClusterEvents := fp.EventsToRegister() + actualClusterEvents, err := fp.EventsToRegister(context.TODO()) + if err != nil { + t.Fatal(err) + } for i := range actualClusterEvents { actualClusterEvents[i].QueueingHintFn = nil } diff --git a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go index 28e917f4199..a2c4aae372f 100644 --- a/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go +++ b/pkg/scheduler/framework/plugins/nodeunschedulable/node_unschedulable.go @@ -48,10 +48,10 @@ const ( // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *NodeUnschedulable) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, - } + }, nil } // isSchedulableAfterNodeChange is invoked for all node events reported by diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index cc9b7a49d26..9c2e037fd7f 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -76,14 +76,14 @@ func (pl *CSILimits) Name() string { // EventsToRegister returns the possible events that may make a Pod. // failed by this plugin schedulable. -func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *CSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ // We don't register any `QueueingHintFn` intentionally // because any new CSINode could make pods that were rejected by CSI volumes schedulable. {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, - } + }, nil } func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go index cab293058c9..1dcc6afd741 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go @@ -206,12 +206,12 @@ func (pl *nonCSILimits) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *nonCSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, - } + }, nil } // PreFilter invoked at the prefilter extension point diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index fd037dc558f..cafec02b0aa 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -134,7 +134,7 @@ func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory) // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ // All ActionType includes the following events: // - Add. An unschedulable Pod may fail due to violating topology spread constraints, @@ -156,7 +156,7 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint // We can remove UpdateNodeTaint when we remove the preCheck feature. // See: https://github.com/kubernetes/kubernetes/issues/110175 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, - } + }, nil } func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool { diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go index 2b8c4564f0e..28c81cf402f 100644 --- a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -58,9 +58,9 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { if !pl.enableSchedulingQueueHint { - return nil + return nil, nil } // When the QueueingHint feature is enabled, // the scheduling queue uses Pod/Update Queueing Hint @@ -69,7 +69,7 @@ func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ // Pods can be more schedulable once it's gates are removed {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}, - } + }, nil } // New initializes a new plugin and returns it. diff --git a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go index ca7a1d88565..2e6b3f88e52 100644 --- a/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go +++ b/pkg/scheduler/framework/plugins/tainttoleration/taint_toleration.go @@ -58,12 +58,12 @@ func (pl *TaintToleration) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { clusterEventWithHint := []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, } if !pl.enableSchedulingQueueHint { - return clusterEventWithHint + return clusterEventWithHint, nil } // When the QueueingHint feature is enabled, // the scheduling queue uses Pod/Update Queueing Hint @@ -71,7 +71,7 @@ func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint { // https://github.com/kubernetes/kubernetes/pull/122234 clusterEventWithHint = append(clusterEventWithHint, framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}) - return clusterEventWithHint + return clusterEventWithHint, nil } // isSchedulableAfterNodeChange is invoked for all node events reported by diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index 0feb002bf75..c143c01b1d9 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -94,7 +94,7 @@ func (pl *VolumeBinding) Name() string { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { events := []framework.ClusterEventWithHint{ // Pods may fail because of missing or mis-configured storage class // (e.g., allowedTopologies, volumeBindingMode), and hence may become @@ -128,7 +128,7 @@ func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint { {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}}, } - return events + return events, nil } func (pl *VolumeBinding) isSchedulableAfterCSINodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index b3b05333240..fd578b84240 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -318,7 +318,7 @@ func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework. // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *VolumeRestrictions) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ // Pods may fail to schedule because of volumes conflicting with other pods on same node. // Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable. @@ -331,7 +331,7 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin // This PVC is required to exist to check its access modes. {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimAdded}, - } + }, nil } // isSchedulableAfterPersistentVolumeClaimAdded is invoked whenever a PersistentVolumeClaim added or changed, It checks whether diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index 82210703307..b590858a175 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -259,7 +259,7 @@ func getErrorAsStatus(err error) *framework.Status { // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *VolumeZone) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ // New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable. // Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored. @@ -280,7 +280,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange}, // A new pv or updating a pv's volume zone labels may make a pod schedulable. {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange}, - } + }, nil } // getPersistentVolumeClaimNameFromPod gets pvc names bound to a pod. diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 7805985fa3c..47d72d470f9 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -629,12 +629,12 @@ type defaultEnqueueExtension struct { } func (p *defaultEnqueueExtension) Name() string { return p.pluginName } -func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint { +func (p *defaultEnqueueExtension) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { // need to return all specific cluster events with framework.All action instead of wildcard event // because the returning values are used to register event handlers. // If we return the wildcard here, it won't affect the event handlers registered by the plugin // and some events may not be registered in the event handlers. - return framework.UnrollWildCardResource() + return framework.UnrollWildCardResource(), nil } func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 22ac5054d9c..bdd2246f01e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -324,9 +324,17 @@ func New(ctx context.Context, preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile) + var returnErr error for profileName, profile := range profiles { preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() - queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions()) + queueingHintsPerProfile[profileName], err = buildQueueingHintMap(ctx, profile.EnqueueExtensions()) + if err != nil { + returnErr = errors.Join(returnErr, err) + } + } + + if returnErr != nil { + return nil, returnErr } podQueue := internalqueue.NewSchedulingQueue( @@ -379,10 +387,14 @@ var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (fr return framework.Queue, nil } -func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { +func buildQueueingHintMap(ctx context.Context, es []framework.EnqueueExtensions) (internalqueue.QueueingHintMap, error) { queueingHintMap := make(internalqueue.QueueingHintMap) + var returnErr error for _, e := range es { - events := e.EventsToRegister() + events, err := e.EventsToRegister(ctx) + if err != nil { + returnErr = errors.Join(returnErr, err) + } // This will happen when plugin registers with empty events, it's usually the case a pod // will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod @@ -438,7 +450,10 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei ) } } - return queueingHintMap + if returnErr != nil { + return nil, returnErr + } + return queueingHintMap, nil } // Run begins watching and scheduling. It starts scheduling and blocked until the context is done. diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 6ce22577fc3..3c9c07d960f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -18,6 +18,7 @@ package scheduler import ( "context" + "errors" "fmt" "sort" "strings" @@ -596,6 +597,7 @@ const ( fakeNode = "fakeNode" fakePod = "fakePod" emptyEventsToRegister = "emptyEventsToRegister" + errorEventsToRegister = "errorEventsToRegister" queueSort = "no-op-queue-sort-plugin" fakeBind = "bind-plugin" emptyEventExtensions = "emptyEventExtensions" @@ -608,6 +610,7 @@ func Test_buildQueueingHintMap(t *testing.T) { plugins []framework.Plugin want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction featuregateDisabled bool + wantErr error }{ { name: "filter without EnqueueExtensions plugin", @@ -705,6 +708,12 @@ func Test_buildQueueingHintMap(t *testing.T) { }, }, }, + { + name: "one EventsToRegister returns an error", + plugins: []framework.Plugin{&errorEventsToRegisterPlugin{}}, + want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{}, + wantErr: errors.New("mock error"), + }, } for _, tt := range tests { @@ -738,7 +747,16 @@ func Test_buildQueueingHintMap(t *testing.T) { return exts[i].Name() < exts[j].Name() }) - got := buildQueueingHintMap(exts) + got, err := buildQueueingHintMap(ctx, exts) + if err != nil { + if tt.wantErr != nil && tt.wantErr.Error() != err.Error() { + t.Fatalf("unexpected error from buildQueueingHintMap: expected: %v, actual: %v", tt.wantErr, err) + } + + if tt.wantErr == nil { + t.Fatalf("unexpected error from buildQueueingHintMap: %v", err) + } + } for e, fns := range got { wantfns, ok := tt.want[e] @@ -894,9 +912,12 @@ func Test_UnionedGVKs(t *testing.T) { if err != nil { t.Fatal(err) } - + queueingHintMap, err := buildQueueingHintMap(ctx, fwk.EnqueueExtensions()) + if err != nil { + t.Fatal(err) + } queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{ - "default": buildQueueingHintMap(fwk.EnqueueExtensions()), + "default": queueingHintMap, } got := unionedGVKs(queueingHintsPerProfile) @@ -1115,10 +1136,10 @@ func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1. return nil } -func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *fakeNodePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn}, - } + }, nil } var hintFromFakePod = framework.QueueingHint(101) @@ -1135,10 +1156,10 @@ func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.P return nil } -func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *fakePodPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn}, - } + }, nil } type emptyEventPlugin struct{} @@ -1149,10 +1170,23 @@ func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v return nil } -func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *emptyEventPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, nil +} + +// errorEventsToRegisterPlugin is a mock plugin that returns an error for EventsToRegister method +type errorEventsToRegisterPlugin struct{} + +func (*errorEventsToRegisterPlugin) Name() string { return errorEventsToRegister } + +func (*errorEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { return nil } +func (*errorEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, errors.New("mock error") +} + // emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister. // This can simulate a plugin registered at scheduler setup, but does nothing // due to some disabled feature gate. @@ -1164,7 +1198,9 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle return nil } -func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } +func (*emptyEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, nil +} // fakePermitPlugin only implements PermitPlugin interface. type fakePermitPlugin struct { diff --git a/test/integration/scheduler/eventhandler/eventhandler_test.go b/test/integration/scheduler/eventhandler/eventhandler_test.go index 987e99767c5..495d357d6f9 100644 --- a/test/integration/scheduler/eventhandler/eventhandler_test.go +++ b/test/integration/scheduler/eventhandler/eventhandler_test.go @@ -56,10 +56,10 @@ func (pl *fooPlugin) Filter(ctx context.Context, state *framework.CycleState, po return framework.NewStatus(framework.Unschedulable) } -func (pl *fooPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *fooPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}}, - } + }, nil } // newPlugin returns a plugin factory with specified Plugin. diff --git a/test/integration/scheduler/plugins/plugins_test.go b/test/integration/scheduler/plugins/plugins_test.go index 944ed0f1fe2..a147bba66ec 100644 --- a/test/integration/scheduler/plugins/plugins_test.go +++ b/test/integration/scheduler/plugins/plugins_test.go @@ -2627,10 +2627,10 @@ func (pl *SchedulingGatesPluginWithEvents) PreEnqueue(ctx context.Context, p *v1 return pl.SchedulingGates.PreEnqueue(ctx, p) } -func (pl *SchedulingGatesPluginWithEvents) EventsToRegister() []framework.ClusterEventWithHint { +func (pl *SchedulingGatesPluginWithEvents) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}}, - } + }, nil } type SchedulingGatesPluginWOEvents struct { @@ -2647,8 +2647,8 @@ func (pl *SchedulingGatesPluginWOEvents) PreEnqueue(ctx context.Context, p *v1.P return pl.SchedulingGates.PreEnqueue(ctx, p) } -func (pl *SchedulingGatesPluginWOEvents) EventsToRegister() []framework.ClusterEventWithHint { - return nil +func (pl *SchedulingGatesPluginWOEvents) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { + return nil, nil } // This test helps to verify registering nil events for PreEnqueue plugin works as expected. diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 0ff6b0a4352..a483f2ab0ad 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -487,10 +487,10 @@ func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1. // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. -func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (f *fakeCRPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}}, - } + }, nil } // TestCustomResourceEnqueue constructs a fake plugin that registers custom resources @@ -867,8 +867,8 @@ func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleSta return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout } -func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (p *fakePermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint}, - } + }, nil } diff --git a/test/integration/scheduler/rescheduling_test.go b/test/integration/scheduler/rescheduling_test.go index 8a547ca4609..be09574293e 100644 --- a/test/integration/scheduler/rescheduling_test.go +++ b/test/integration/scheduler/rescheduling_test.go @@ -66,7 +66,7 @@ func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleSt rp.numUnreserveCalled += 1 } -func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (rp *ReservePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ { Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, @@ -74,7 +74,7 @@ func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint { return framework.Queue, nil }, }, - } + }, nil } type PermitPlugin struct { @@ -103,7 +103,7 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, return nil, 0 } -func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { +func (pp *PermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) { return []framework.ClusterEventWithHint{ { Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, @@ -111,7 +111,7 @@ func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { return framework.Queue, nil }, }, - } + }, nil } func TestReScheduling(t *testing.T) {