mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
scheduler: Add ctx param and error return to EnqueueExtensions.EventsToRegister()
This commit is contained in:
parent
ff78057631
commit
a3978e8315
@ -375,11 +375,13 @@ type EnqueueExtensions interface {
|
||||
// filters out events to reduce useless retry of Pod's scheduling.
|
||||
// The events will be registered when instantiating the internal scheduling queue,
|
||||
// and leveraged to build event handlers dynamically.
|
||||
// Note: the returned list needs to be static (not depend on configuration parameters);
|
||||
// otherwise it would lead to undefined behavior.
|
||||
// When it returns an error, the scheduler fails to start.
|
||||
// Note: the returned list needs to be determined at a startup,
|
||||
// and the scheduler only evaluates it once during start up.
|
||||
// Do not change the result during runtime, for example, based on the cluster's state etc.
|
||||
//
|
||||
// Appropriate implementation of this function will make Pod's re-scheduling accurate and performant.
|
||||
EventsToRegister() []ClusterEventWithHint
|
||||
EventsToRegister(context.Context) ([]ClusterEventWithHint, error)
|
||||
}
|
||||
|
||||
// PreFilterExtensions is an interface that is included in plugins that allow specifying
|
||||
|
@ -429,9 +429,9 @@ func (pl *dynamicResources) Name() string {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
if !pl.enabled {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
events := []framework.ClusterEventWithHint{
|
||||
@ -460,7 +460,7 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint
|
||||
// A pod might be waiting for a class to get created or modified.
|
||||
{Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}},
|
||||
}
|
||||
return events
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// PreEnqueue checks if there are known reasons why a pod currently cannot be
|
||||
|
@ -57,7 +57,7 @@ func (pl *InterPodAffinity) Name() string {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a failed Pod
|
||||
// schedulable
|
||||
func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *InterPodAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// All ActionType includes the following events:
|
||||
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints,
|
||||
@ -77,7 +77,7 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint
|
||||
// See: https://github.com/kubernetes/kubernetes/issues/110175
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
|
@ -83,10 +83,10 @@ func (s *preFilterState) Clone() framework.StateData {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *NodeAffinity) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether
|
||||
|
@ -41,10 +41,10 @@ const (
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *NodeName) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *NodeName) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Name returns name of the plugin. It is used in logs, etc.
|
||||
|
@ -111,7 +111,7 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error)
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *NodePorts) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// Due to immutable fields `spec.containers[*].ports`, pod update events are ignored.
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
|
||||
@ -122,7 +122,7 @@ func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
// We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens.
|
||||
// (the same as Queue)
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether
|
||||
|
@ -247,7 +247,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (f *Fit) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
podActionType := framework.Delete
|
||||
if f.enableInPlacePodVerticalScaling {
|
||||
// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered
|
||||
@ -257,7 +257,7 @@ func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange},
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterPodChange is invoked whenever a pod deleted or updated. It checks whether
|
||||
|
@ -1112,7 +1112,10 @@ func TestEventsToRegister(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fp := &Fit{enableInPlacePodVerticalScaling: test.inPlacePodVerticalScalingEnabled}
|
||||
actualClusterEvents := fp.EventsToRegister()
|
||||
actualClusterEvents, err := fp.EventsToRegister(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := range actualClusterEvents {
|
||||
actualClusterEvents[i].QueueingHintFn = nil
|
||||
}
|
||||
|
@ -48,10 +48,10 @@ const (
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *NodeUnschedulable) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *NodeUnschedulable) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterNodeChange is invoked for all node events reported by
|
||||
|
@ -76,14 +76,14 @@ func (pl *CSILimits) Name() string {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod.
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *CSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// We don't register any `QueueingHintFn` intentionally
|
||||
// because any new CSINode could make pods that were rejected by CSI volumes schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pl *CSILimits) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
|
@ -206,12 +206,12 @@ func (pl *nonCSILimits) Name() string {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *nonCSILimits) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PreFilter invoked at the prefilter extension point
|
||||
|
@ -134,7 +134,7 @@ func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory)
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *PodTopologySpread) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// All ActionType includes the following events:
|
||||
// - Add. An unschedulable Pod may fail due to violating topology spread constraints,
|
||||
@ -156,7 +156,7 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint
|
||||
// We can remove UpdateNodeTaint when we remove the preCheck feature.
|
||||
// See: https://github.com/kubernetes/kubernetes/issues/110175
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool {
|
||||
|
@ -58,9 +58,9 @@ func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *SchedulingGates) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
if !pl.enableSchedulingQueueHint {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
// When the QueueingHint feature is enabled,
|
||||
// the scheduling queue uses Pod/Update Queueing Hint
|
||||
@ -69,7 +69,7 @@ func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// Pods can be more schedulable once it's gates are removed
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
|
@ -58,12 +58,12 @@ func (pl *TaintToleration) Name() string {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *TaintToleration) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
clusterEventWithHint := []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
|
||||
}
|
||||
if !pl.enableSchedulingQueueHint {
|
||||
return clusterEventWithHint
|
||||
return clusterEventWithHint, nil
|
||||
}
|
||||
// When the QueueingHint feature is enabled,
|
||||
// the scheduling queue uses Pod/Update Queueing Hint
|
||||
@ -71,7 +71,7 @@ func (pl *TaintToleration) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
// https://github.com/kubernetes/kubernetes/pull/122234
|
||||
clusterEventWithHint = append(clusterEventWithHint,
|
||||
framework.ClusterEventWithHint{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodChange})
|
||||
return clusterEventWithHint
|
||||
return clusterEventWithHint, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterNodeChange is invoked for all node events reported by
|
||||
|
@ -94,7 +94,7 @@ func (pl *VolumeBinding) Name() string {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *VolumeBinding) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
events := []framework.ClusterEventWithHint{
|
||||
// Pods may fail because of missing or mis-configured storage class
|
||||
// (e.g., allowedTopologies, volumeBindingMode), and hence may become
|
||||
@ -128,7 +128,7 @@ func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
|
||||
}
|
||||
return events
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (pl *VolumeBinding) isSchedulableAfterCSINodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
|
@ -318,7 +318,7 @@ func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *VolumeRestrictions) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
|
||||
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
|
||||
@ -331,7 +331,7 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
|
||||
// This PVC is required to exist to check its access modes.
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add},
|
||||
QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimAdded},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isSchedulableAfterPersistentVolumeClaimAdded is invoked whenever a PersistentVolumeClaim added or changed, It checks whether
|
||||
|
@ -259,7 +259,7 @@ func getErrorAsStatus(err error) *framework.Status {
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *VolumeZone) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable.
|
||||
// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
|
||||
@ -280,7 +280,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
|
||||
// A new pv or updating a pv's volume zone labels may make a pod schedulable.
|
||||
{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeChange},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// getPersistentVolumeClaimNameFromPod gets pvc names bound to a pod.
|
||||
|
@ -629,12 +629,12 @@ type defaultEnqueueExtension struct {
|
||||
}
|
||||
|
||||
func (p *defaultEnqueueExtension) Name() string { return p.pluginName }
|
||||
func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (p *defaultEnqueueExtension) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
// need to return all specific cluster events with framework.All action instead of wildcard event
|
||||
// because the returning values are used to register event handlers.
|
||||
// If we return the wildcard here, it won't affect the event handlers registered by the plugin
|
||||
// and some events may not be registered in the event handlers.
|
||||
return framework.UnrollWildCardResource()
|
||||
return framework.UnrollWildCardResource(), nil
|
||||
}
|
||||
|
||||
func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {
|
||||
|
@ -324,9 +324,17 @@ func New(ctx context.Context,
|
||||
|
||||
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
|
||||
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
|
||||
var returnErr error
|
||||
for profileName, profile := range profiles {
|
||||
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
|
||||
queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
|
||||
queueingHintsPerProfile[profileName], err = buildQueueingHintMap(ctx, profile.EnqueueExtensions())
|
||||
if err != nil {
|
||||
returnErr = errors.Join(returnErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
if returnErr != nil {
|
||||
return nil, returnErr
|
||||
}
|
||||
|
||||
podQueue := internalqueue.NewSchedulingQueue(
|
||||
@ -379,10 +387,14 @@ var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (fr
|
||||
return framework.Queue, nil
|
||||
}
|
||||
|
||||
func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
|
||||
func buildQueueingHintMap(ctx context.Context, es []framework.EnqueueExtensions) (internalqueue.QueueingHintMap, error) {
|
||||
queueingHintMap := make(internalqueue.QueueingHintMap)
|
||||
var returnErr error
|
||||
for _, e := range es {
|
||||
events := e.EventsToRegister()
|
||||
events, err := e.EventsToRegister(ctx)
|
||||
if err != nil {
|
||||
returnErr = errors.Join(returnErr, err)
|
||||
}
|
||||
|
||||
// This will happen when plugin registers with empty events, it's usually the case a pod
|
||||
// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
|
||||
@ -438,7 +450,10 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei
|
||||
)
|
||||
}
|
||||
}
|
||||
return queueingHintMap
|
||||
if returnErr != nil {
|
||||
return nil, returnErr
|
||||
}
|
||||
return queueingHintMap, nil
|
||||
}
|
||||
|
||||
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
|
||||
|
@ -18,6 +18,7 @@ package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
@ -596,6 +597,7 @@ const (
|
||||
fakeNode = "fakeNode"
|
||||
fakePod = "fakePod"
|
||||
emptyEventsToRegister = "emptyEventsToRegister"
|
||||
errorEventsToRegister = "errorEventsToRegister"
|
||||
queueSort = "no-op-queue-sort-plugin"
|
||||
fakeBind = "bind-plugin"
|
||||
emptyEventExtensions = "emptyEventExtensions"
|
||||
@ -608,6 +610,7 @@ func Test_buildQueueingHintMap(t *testing.T) {
|
||||
plugins []framework.Plugin
|
||||
want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
|
||||
featuregateDisabled bool
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
name: "filter without EnqueueExtensions plugin",
|
||||
@ -705,6 +708,12 @@ func Test_buildQueueingHintMap(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one EventsToRegister returns an error",
|
||||
plugins: []framework.Plugin{&errorEventsToRegisterPlugin{}},
|
||||
want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
|
||||
wantErr: errors.New("mock error"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
@ -738,7 +747,16 @@ func Test_buildQueueingHintMap(t *testing.T) {
|
||||
return exts[i].Name() < exts[j].Name()
|
||||
})
|
||||
|
||||
got := buildQueueingHintMap(exts)
|
||||
got, err := buildQueueingHintMap(ctx, exts)
|
||||
if err != nil {
|
||||
if tt.wantErr != nil && tt.wantErr.Error() != err.Error() {
|
||||
t.Fatalf("unexpected error from buildQueueingHintMap: expected: %v, actual: %v", tt.wantErr, err)
|
||||
}
|
||||
|
||||
if tt.wantErr == nil {
|
||||
t.Fatalf("unexpected error from buildQueueingHintMap: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for e, fns := range got {
|
||||
wantfns, ok := tt.want[e]
|
||||
@ -894,9 +912,12 @@ func Test_UnionedGVKs(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
queueingHintMap, err := buildQueueingHintMap(ctx, fwk.EnqueueExtensions())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
|
||||
"default": buildQueueingHintMap(fwk.EnqueueExtensions()),
|
||||
"default": queueingHintMap,
|
||||
}
|
||||
got := unionedGVKs(queueingHintsPerProfile)
|
||||
|
||||
@ -1115,10 +1136,10 @@ func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *fakeNodePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
var hintFromFakePod = framework.QueueingHint(101)
|
||||
@ -1135,10 +1156,10 @@ func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.P
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *fakePodPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type emptyEventPlugin struct{}
|
||||
@ -1149,10 +1170,23 @@ func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *emptyEventPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// errorEventsToRegisterPlugin is a mock plugin that returns an error for EventsToRegister method
|
||||
type errorEventsToRegisterPlugin struct{}
|
||||
|
||||
func (*errorEventsToRegisterPlugin) Name() string { return errorEventsToRegister }
|
||||
|
||||
func (*errorEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*errorEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, errors.New("mock error")
|
||||
}
|
||||
|
||||
// emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
|
||||
// This can simulate a plugin registered at scheduler setup, but does nothing
|
||||
// due to some disabled feature gate.
|
||||
@ -1164,7 +1198,9 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
|
||||
func (*emptyEventsToRegisterPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// fakePermitPlugin only implements PermitPlugin interface.
|
||||
type fakePermitPlugin struct {
|
||||
|
@ -56,10 +56,10 @@ func (pl *fooPlugin) Filter(ctx context.Context, state *framework.CycleState, po
|
||||
return framework.NewStatus(framework.Unschedulable)
|
||||
}
|
||||
|
||||
func (pl *fooPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *fooPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// newPlugin returns a plugin factory with specified Plugin.
|
||||
|
@ -2627,10 +2627,10 @@ func (pl *SchedulingGatesPluginWithEvents) PreEnqueue(ctx context.Context, p *v1
|
||||
return pl.SchedulingGates.PreEnqueue(ctx, p)
|
||||
}
|
||||
|
||||
func (pl *SchedulingGatesPluginWithEvents) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pl *SchedulingGatesPluginWithEvents) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Update}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type SchedulingGatesPluginWOEvents struct {
|
||||
@ -2647,8 +2647,8 @@ func (pl *SchedulingGatesPluginWOEvents) PreEnqueue(ctx context.Context, p *v1.P
|
||||
return pl.SchedulingGates.PreEnqueue(ctx, p)
|
||||
}
|
||||
|
||||
func (pl *SchedulingGatesPluginWOEvents) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
return nil
|
||||
func (pl *SchedulingGatesPluginWOEvents) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// This test helps to verify registering nil events for PreEnqueue plugin works as expected.
|
||||
|
@ -487,10 +487,10 @@ func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (f *fakeCRPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TestCustomResourceEnqueue constructs a fake plugin that registers custom resources
|
||||
@ -867,8 +867,8 @@ func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleSta
|
||||
return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout
|
||||
}
|
||||
|
||||
func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (p *fakePermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleSt
|
||||
rp.numUnreserveCalled += 1
|
||||
}
|
||||
|
||||
func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (rp *ReservePlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{
|
||||
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
|
||||
@ -74,7 +74,7 @@ func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
return framework.Queue, nil
|
||||
},
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type PermitPlugin struct {
|
||||
@ -103,7 +103,7 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
func (pp *PermitPlugin) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
|
||||
return []framework.ClusterEventWithHint{
|
||||
{
|
||||
Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add},
|
||||
@ -111,7 +111,7 @@ func (pp *PermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
return framework.Queue, nil
|
||||
},
|
||||
},
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestReScheduling(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user