Merge pull request #126113 from googs1025/enqueueExtensions_refactor

scheduler: Add ctx param and error return to EnqueueExtensions.EventsToRegister()
This commit is contained in:
Kubernetes Prow Robot 2024-07-18 00:53:25 -07:00 committed by GitHub
commit 24fbb13eaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 122 additions and 66 deletions

View File

@ -375,11 +375,13 @@ type EnqueueExtensions interface {
// filters out events to reduce useless retry of Pod's scheduling. // filters out events to reduce useless retry of Pod's scheduling.
// The events will be registered when instantiating the internal scheduling queue, // The events will be registered when instantiating the internal scheduling queue,
// and leveraged to build event handlers dynamically. // and leveraged to build event handlers dynamically.
// Note: the returned list needs to be static (not depend on configuration parameters); // When it returns an error, the scheduler fails to start.
// otherwise it would lead to undefined behavior. // Note: the returned list needs to be determined at a startup,
// and the scheduler only evaluates it once during start up.
// Do not change the result during runtime, for example, based on the cluster's state etc.
// //
// Appropriate implementation of this function will make Pod's re-scheduling accurate and performant. // Appropriate implementation of this function will make Pod's re-scheduling accurate and performant.
EventsToRegister() []ClusterEventWithHint EventsToRegister(context.Context) ([]ClusterEventWithHint, error)
} }
// PreFilterExtensions is an interface that is included in plugins that allow specifying // PreFilterExtensions is an interface that is included in plugins that allow specifying

View File

@ -429,9 +429,9 @@ func (pl *dynamicResources) Name() string {
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint { func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
if !pl.enabled { if !pl.enabled {
return nil return nil, nil
} }
events := []framework.ClusterEventWithHint{ events := []framework.ClusterEventWithHint{
@ -460,7 +460,7 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint
// A pod might be waiting for a class to get created or modified. // A pod might be waiting for a class to get created or modified.
{Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}},
} }
return events return events, nil
} }
// PreEnqueue checks if there are known reasons why a pod currently cannot be // PreEnqueue checks if there are known reasons why a pod currently cannot be

View File

@ -57,7 +57,7 @@ func (pl *InterPodAffinity) Name() string {
// EventsToRegister returns the possible events that may make a failed Pod // EventsToRegister returns the possible events that may make a failed Pod
// schedulable // schedulable
func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint { func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// All ActionType includes the following events: // All ActionType includes the following events:
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints, // - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints,
@ -77,7 +77,7 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint
// See: https://github.com/kubernetes/kubernetes/issues/110175 // See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
} }, nil
} }
// New initializes a new plugin and returns it. // New initializes a new plugin and returns it.

View File

@ -83,10 +83,10 @@ func (s *preFilterState) Clone() framework.StateData {
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint { func (pl *NodeAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
} }, nil
} }
// isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether // isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether

View File

@ -41,10 +41,10 @@ const (
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *NodeName) EventsToRegister() []framework.ClusterEventWithHint { func (pl *NodeName) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
} }, nil
} }
// Name returns name of the plugin. It is used in logs, etc. // Name returns name of the plugin. It is used in logs, etc.

View File

@ -111,7 +111,7 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error)
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint { func (pl *NodePorts) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// Due to immutable fields `spec.containers[*].ports`, pod update events are ignored. // Due to immutable fields `spec.containers[*].ports`, pod update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
@ -122,7 +122,7 @@ func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
// We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens. // We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens.
// (the same as Queue) // (the same as Queue)
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
} }, nil
} }
// isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether // isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether

View File

@ -247,7 +247,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint { func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
podActionType := framework.Delete podActionType := framework.Delete
if f.enableInPlacePodVerticalScaling { if f.enableInPlacePodVerticalScaling {
// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered // If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered
@ -257,7 +257,7 @@ func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange},
} }, nil
} }
// isSchedulableAfterPodChange is invoked whenever a pod deleted or updated. It checks whether // isSchedulableAfterPodChange is invoked whenever a pod deleted or updated. It checks whether

View File

@ -1112,7 +1112,10 @@ func TestEventsToRegister(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled} fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled}
actualClusterEvents := fp.EventsToRegister() actualClusterEvents, err := fp.EventsToRegister(context.TODO())
if err != nil {
t.Fatal(err)
}
for i := range actualClusterEvents { for i := range actualClusterEvents {
actualClusterEvents[i].QueueingHintFn = nil actualClusterEvents[i].QueueingHintFn = nil
} }

View File

@ -48,10 +48,10 @@ const (
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint { func (pl *NodeUnschedulable) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
} }, nil
} }
// isSchedulableAfterNodeChange is invoked for all node events reported by // isSchedulableAfterNodeChange is invoked for all node events reported by

View File

@ -76,14 +76,14 @@ func (pl *CSILimits) Name() string {
// EventsToRegister returns the possible events that may make a Pod. // EventsToRegister returns the possible events that may make a Pod.
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint { func (pl *CSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// We don't register any `QueueingHintFn` intentionally // We don't register any `QueueingHintFn` intentionally
// because any new CSINode could make pods that were rejected by CSI volumes schedulable. // because any new CSINode could make pods that were rejected by CSI volumes schedulable.
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
} }, nil
} }
func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {

View File

@ -206,12 +206,12 @@ func (pl *nonCSILimits) Name() string {
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint { func (pl *nonCSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
} }, nil
} }
// PreFilter invoked at the prefilter extension point // PreFilter invoked at the prefilter extension point

View File

@ -134,7 +134,7 @@ func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory)
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint { func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// All ActionType includes the following events: // All ActionType includes the following events:
// - Add. An unschedulable Pod may fail due to violating topology spread constraints, // - Add. An unschedulable Pod may fail due to violating topology spread constraints,
@ -156,7 +156,7 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint
// We can remove UpdateNodeTaint when we remove the preCheck feature. // We can remove UpdateNodeTaint when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175 // See: https://github.com/kubernetes/kubernetes/issues/110175
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
} }, nil
} }
func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool { func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool {

View File

@ -58,9 +58,9 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint { func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
if !pl.enableSchedulingQueueHint { if !pl.enableSchedulingQueueHint {
return nil return nil, nil
} }
// When the QueueingHint feature is enabled, // When the QueueingHint feature is enabled,
// the scheduling queue uses Pod/Update Queueing Hint // the scheduling queue uses Pod/Update Queueing Hint
@ -69,7 +69,7 @@ func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// Pods can be more schedulable once it's gates are removed // Pods can be more schedulable once it's gates are removed
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange},
} }, nil
} }
// New initializes a new plugin and returns it. // New initializes a new plugin and returns it.

View File

@ -58,12 +58,12 @@ func (pl *TaintToleration) Name() string {
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint { func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
clusterEventWithHint := []framework.ClusterEventWithHint{ clusterEventWithHint := []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
} }
if !pl.enableSchedulingQueueHint { if !pl.enableSchedulingQueueHint {
return clusterEventWithHint return clusterEventWithHint, nil
} }
// When the QueueingHint feature is enabled, // When the QueueingHint feature is enabled,
// the scheduling queue uses Pod/Update Queueing Hint // the scheduling queue uses Pod/Update Queueing Hint
@ -71,7 +71,7 @@ func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint {
// https://github.com/kubernetes/kubernetes/pull/122234 // https://github.com/kubernetes/kubernetes/pull/122234
clusterEventWithHint = append(clusterEventWithHint, clusterEventWithHint = append(clusterEventWithHint,
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange}) framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange})
return clusterEventWithHint return clusterEventWithHint, nil
} }
// isSchedulableAfterNodeChange is invoked for all node events reported by // isSchedulableAfterNodeChange is invoked for all node events reported by

View File

@ -94,7 +94,7 @@ func (pl *VolumeBinding) Name() string {
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint { func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
events := []framework.ClusterEventWithHint{ events := []framework.ClusterEventWithHint{
// Pods may fail because of missing or mis-configured storage class // Pods may fail because of missing or mis-configured storage class
// (e.g., allowedTopologies, volumeBindingMode), and hence may become // (e.g., allowedTopologies, volumeBindingMode), and hence may become
@ -128,7 +128,7 @@ func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
} }
return events return events, nil
} }
func (pl *VolumeBinding) isSchedulableAfterCSINodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { func (pl *VolumeBinding) isSchedulableAfterCSINodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {

View File

@ -318,7 +318,7 @@ func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint { func (pl *VolumeRestrictions) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// Pods may fail to schedule because of volumes conflicting with other pods on same node. // Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable. // Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
@ -331,7 +331,7 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
// This PVC is required to exist to check its access modes. // This PVC is required to exist to check its access modes.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add},
QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimAdded}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimAdded},
} }, nil
} }
// isSchedulableAfterPersistentVolumeClaimAdded is invoked whenever a PersistentVolumeClaim added or changed, It checks whether // isSchedulableAfterPersistentVolumeClaimAdded is invoked whenever a PersistentVolumeClaim added or changed, It checks whether

View File

@ -259,7 +259,7 @@ func getErrorAsStatus(err error) *framework.Status {
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { func (pl *VolumeZone) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
// New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable. // New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable.
// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored. // Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
@ -280,7 +280,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
// A new pv or updating a pv's volume zone labels may make a pod schedulable. // A new pv or updating a pv's volume zone labels may make a pod schedulable.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange}, {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange},
} }, nil
} }
// getPersistentVolumeClaimNameFromPod gets pvc names bound to a pod. // getPersistentVolumeClaimNameFromPod gets pvc names bound to a pod.

View File

@ -629,12 +629,12 @@ type defaultEnqueueExtension struct {
} }
func (p *defaultEnqueueExtension) Name() string { return p.pluginName } func (p *defaultEnqueueExtension) Name() string { return p.pluginName }
func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint { func (p *defaultEnqueueExtension) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
// need to return all specific cluster events with framework.All action instead of wildcard event // need to return all specific cluster events with framework.All action instead of wildcard event
// because the returning values are used to register event handlers. // because the returning values are used to register event handlers.
// If we return the wildcard here, it won't affect the event handlers registered by the plugin // If we return the wildcard here, it won't affect the event handlers registered by the plugin
// and some events may not be registered in the event handlers. // and some events may not be registered in the event handlers.
return framework.UnrollWildCardResource() return framework.UnrollWildCardResource(), nil
} }
func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error { func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {

View File

@ -324,9 +324,17 @@ func New(ctx context.Context,
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile) queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
var returnErr error
for profileName, profile := range profiles { for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions()) queueingHintsPerProfile[profileName], err = buildQueueingHintMap(ctx, profile.EnqueueExtensions())
if err != nil {
returnErr = errors.Join(returnErr, err)
}
}
if returnErr != nil {
return nil, returnErr
} }
podQueue := internalqueue.NewSchedulingQueue( podQueue := internalqueue.NewSchedulingQueue(
@ -379,10 +387,14 @@ var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (fr
return framework.Queue, nil return framework.Queue, nil
} }
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { func buildQueueingHintMap(ctx context.Context, es []framework.EnqueueExtensions) (internalqueue.QueueingHintMap, error) {
queueingHintMap := make(internalqueue.QueueingHintMap) queueingHintMap := make(internalqueue.QueueingHintMap)
var returnErr error
for _, e := range es { for _, e := range es {
events := e.EventsToRegister() events, err := e.EventsToRegister(ctx)
if err != nil {
returnErr = errors.Join(returnErr, err)
}
// This will happen when plugin registers with empty events, it's usually the case a pod // This will happen when plugin registers with empty events, it's usually the case a pod
// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod // will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
@ -438,7 +450,10 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei
) )
} }
} }
return queueingHintMap if returnErr != nil {
return nil, returnErr
}
return queueingHintMap, nil
} }
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done. // Run begins watching and scheduling. It starts scheduling and blocked until the context is done.

View File

@ -18,6 +18,7 @@ package scheduler
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
@ -596,6 +597,7 @@ const (
fakeNode = "fakeNode" fakeNode = "fakeNode"
fakePod = "fakePod" fakePod = "fakePod"
emptyEventsToRegister = "emptyEventsToRegister" emptyEventsToRegister = "emptyEventsToRegister"
errorEventsToRegister = "errorEventsToRegister"
queueSort = "no-op-queue-sort-plugin" queueSort = "no-op-queue-sort-plugin"
fakeBind = "bind-plugin" fakeBind = "bind-plugin"
emptyEventExtensions = "emptyEventExtensions" emptyEventExtensions = "emptyEventExtensions"
@ -608,6 +610,7 @@ func Test_buildQueueingHintMap(t *testing.T) {
plugins []framework.Plugin plugins []framework.Plugin
want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
featuregateDisabled bool featuregateDisabled bool
wantErr error
}{ }{
{ {
name: "filter without EnqueueExtensions plugin", name: "filter without EnqueueExtensions plugin",
@ -705,6 +708,12 @@ func Test_buildQueueingHintMap(t *testing.T) {
}, },
}, },
}, },
{
name: "one EventsToRegister returns an error",
plugins: []framework.Plugin{&errorEventsToRegisterPlugin{}},
want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
wantErr: errors.New("mock error"),
},
} }
for _, tt := range tests { for _, tt := range tests {
@ -738,7 +747,16 @@ func Test_buildQueueingHintMap(t *testing.T) {
return exts[i].Name() < exts[j].Name() return exts[i].Name() < exts[j].Name()
}) })
got := buildQueueingHintMap(exts) got, err := buildQueueingHintMap(ctx, exts)
if err != nil {
if tt.wantErr != nil && tt.wantErr.Error() != err.Error() {
t.Fatalf("unexpected error from buildQueueingHintMap: expected: %v, actual: %v", tt.wantErr, err)
}
if tt.wantErr == nil {
t.Fatalf("unexpected error from buildQueueingHintMap: %v", err)
}
}
for e, fns := range got { for e, fns := range got {
wantfns, ok := tt.want[e] wantfns, ok := tt.want[e]
@ -894,9 +912,12 @@ func Test_UnionedGVKs(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
queueingHintMap, err := buildQueueingHintMap(ctx, fwk.EnqueueExtensions())
if err != nil {
t.Fatal(err)
}
queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{ queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
"default": buildQueueingHintMap(fwk.EnqueueExtensions()), "default": queueingHintMap,
} }
got := unionedGVKs(queueingHintsPerProfile) got := unionedGVKs(queueingHintsPerProfile)
@ -1115,10 +1136,10 @@ func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.
return nil return nil
} }
func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pl *fakeNodePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
} }, nil
} }
var hintFromFakePod = framework.QueueingHint(101) var hintFromFakePod = framework.QueueingHint(101)
@ -1135,10 +1156,10 @@ func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.P
return nil return nil
} }
func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pl *fakePodPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
} }, nil
} }
type emptyEventPlugin struct{} type emptyEventPlugin struct{}
@ -1149,10 +1170,23 @@ func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v
return nil return nil
} }
func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pl *emptyEventPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return nil, nil
}
// errorEventsToRegisterPlugin is a mock plugin that returns an error for EventsToRegister method
type errorEventsToRegisterPlugin struct{}
func (*errorEventsToRegisterPlugin) Name() string { return errorEventsToRegister }
func (*errorEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return nil return nil
} }
func (*errorEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return nil, errors.New("mock error")
}
// emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister. // emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
// This can simulate a plugin registered at scheduler setup, but does nothing // This can simulate a plugin registered at scheduler setup, but does nothing
// due to some disabled feature gate. // due to some disabled feature gate.
@ -1164,7 +1198,9 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle
return nil return nil
} }
func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } func (*emptyEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return nil, nil
}
// fakePermitPlugin only implements PermitPlugin interface. // fakePermitPlugin only implements PermitPlugin interface.
type fakePermitPlugin struct { type fakePermitPlugin struct {

View File

@ -56,10 +56,10 @@ func (pl *fooPlugin) Filter(ctx context.Context, state *framework.CycleState, po
return framework.NewStatus(framework.Unschedulable) return framework.NewStatus(framework.Unschedulable)
} }
func (pl *fooPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pl *fooPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}},
} }, nil
} }
// newPlugin returns a plugin factory with specified Plugin. // newPlugin returns a plugin factory with specified Plugin.

View File

@ -2627,10 +2627,10 @@ func (pl *SchedulingGatesPluginWithEvents) PreEnqueue(ctx context.Context, p *v1
return pl.SchedulingGates.PreEnqueue(ctx, p) return pl.SchedulingGates.PreEnqueue(ctx, p)
} }
func (pl *SchedulingGatesPluginWithEvents) EventsToRegister() []framework.ClusterEventWithHint { func (pl *SchedulingGatesPluginWithEvents) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}}, {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
} }, nil
} }
type SchedulingGatesPluginWOEvents struct { type SchedulingGatesPluginWOEvents struct {
@ -2647,8 +2647,8 @@ func (pl *SchedulingGatesPluginWOEvents) PreEnqueue(ctx context.Context, p *v1.P
return pl.SchedulingGates.PreEnqueue(ctx, p) return pl.SchedulingGates.PreEnqueue(ctx, p)
} }
func (pl *SchedulingGatesPluginWOEvents) EventsToRegister() []framework.ClusterEventWithHint { func (pl *SchedulingGatesPluginWOEvents) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return nil return nil, nil
} }
// This test helps to verify registering nil events for PreEnqueue plugin works as expected. // This test helps to verify registering nil events for PreEnqueue plugin works as expected.

View File

@ -487,10 +487,10 @@ func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.
// EventsToRegister returns the possible events that may make a Pod // EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable. // failed by this plugin schedulable.
func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (f *fakeCRPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}}, {Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}},
} }, nil
} }
// TestCustomResourceEnqueue constructs a fake plugin that registers custom resources // TestCustomResourceEnqueue constructs a fake plugin that registers custom resources
@ -867,8 +867,8 @@ func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleSta
return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout
} }
func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (p *fakePermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint}, {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
} }, nil
} }

View File

@ -66,7 +66,7 @@ func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleSt
rp.numUnreserveCalled += 1 rp.numUnreserveCalled += 1
} }
func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint { func (rp *ReservePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{ {
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
@ -74,7 +74,7 @@ func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint {
return framework.Queue, nil return framework.Queue, nil
}, },
}, },
} }, nil
} }
type PermitPlugin struct { type PermitPlugin struct {
@ -103,7 +103,7 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
return nil, 0 return nil, 0
} }
func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint { func (pp *PermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
return []framework.ClusterEventWithHint{ return []framework.ClusterEventWithHint{
{ {
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
@ -111,7 +111,7 @@ func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
return framework.Queue, nil return framework.Queue, nil
}, },
}, },
} }, nil
} }
func TestReScheduling(t *testing.T) { func TestReScheduling(t *testing.T) {