From f322019d7a1a549e93058641ea4b1ca18c788021 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 28 Jan 2021 22:29:10 -0800 Subject: [PATCH] Store a cluster event to plugin map in SchedulerQueue --- pkg/scheduler/core/generic_scheduler.go | 1 + pkg/scheduler/factory.go | 8 +- pkg/scheduler/framework/interface.go | 10 + pkg/scheduler/framework/runtime/framework.go | 53 +++++ .../framework/runtime/framework_test.go | 161 +++++++++++++ pkg/scheduler/framework/types.go | 48 ++++ pkg/scheduler/internal/queue/events.go | 29 +++ .../internal/queue/scheduling_queue.go | 76 ++++++- .../internal/queue/scheduling_queue_test.go | 213 ++++++++++++++---- 9 files changed, 546 insertions(+), 53 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index cdc3884d35c..ec22ca28e24 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -241,6 +241,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framewor for _, n := range allNodes { diagnosis.NodeToStatusMap[n.Node().Name] = s } + // Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins. diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin()) return nil, diagnosis, nil } diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 5475f1c8bca..648abd45c67 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -132,6 +132,8 @@ func (c *Configurator) create() (*Scheduler, error) { // The nominator will be passed all the way to framework instantiation. nominator := internalqueue.NewPodNominator() + // It's a "cluster event" -> "plugin names" map. + clusterEventMap := make(map[framework.ClusterEvent]sets.String) profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, frameworkruntime.WithClientSet(c.client), frameworkruntime.WithInformerFactory(c.informerFactory), @@ -139,6 +141,7 @@ func (c *Configurator) create() (*Scheduler, error) { frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates), frameworkruntime.WithPodNominator(nominator), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)), + frameworkruntime.WithClusterEventMap(clusterEventMap), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) @@ -153,6 +156,7 @@ func (c *Configurator) create() (*Scheduler, error) { internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), + internalqueue.WithClusterEventMap(clusterEventMap), ) // Setup cache debugger. @@ -317,7 +321,9 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL pod := podInfo.Pod if err == core.ErrNoNodesAvailable { klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) - } else if _, ok := err.(*framework.FitError); ok { + } else if fitError, ok := err.(*framework.FitError); ok { + // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. + podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) } else if apierrors.IsNotFound(err) { klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 022aa557955..8ae1c50fa02 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -279,6 +279,16 @@ type QueueSortPlugin interface { Less(*QueuedPodInfo, *QueuedPodInfo) bool } +// EnqueueExtensions is an optional interface that plugins can implement to efficiently +// move unschedulable Pods in internal scheduling queues. +type EnqueueExtensions interface { + // EventsToRegister returns a series of interested events that + // will be registered when instantiating the internal scheduling queue. + // Note: the returned list needs to be static (not depend on configuration parameters); + // otherwise it would lead to undefined behavior. + EventsToRegister() []ClusterEvent +} + // PreFilterExtensions is an interface that is included in plugins that allow specifying // callbacks to make incremental updates to its supposedly pre-calculated // state. diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 2c9f70654be..cab97369933 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -60,6 +60,16 @@ const ( permit = "Permit" ) +var allClusterEvents = []framework.ClusterEvent{ + {Resource: framework.Pod, ActionType: framework.All}, + {Resource: framework.Node, ActionType: framework.All}, + {Resource: framework.CSINode, ActionType: framework.All}, + {Resource: framework.PersistentVolume, ActionType: framework.All}, + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}, + {Resource: framework.Service, ActionType: framework.All}, + {Resource: framework.StorageClass, ActionType: framework.All}, +} + var configDecoder = scheme.Codecs.UniversalDecoder() // frameworkImpl is the component responsible for initializing and running scheduler @@ -139,6 +149,7 @@ type frameworkOptions struct { extenders []framework.Extender runAllFilters bool captureProfile CaptureProfile + clusterEventMap map[framework.ClusterEvent]sets.String } // Option for the frameworkImpl. @@ -221,6 +232,14 @@ func WithCaptureProfile(c CaptureProfile) Option { func defaultFrameworkOptions() frameworkOptions { return frameworkOptions{ metricsRecorder: newMetricsRecorder(1000, time.Second), + clusterEventMap: make(map[framework.ClusterEvent]sets.String), + } +} + +// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl. +func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option { + return func(o *frameworkOptions) { + o.clusterEventMap = m } } @@ -292,6 +311,9 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi } pluginsMap[name] = p + // Update ClusterEventMap in place. + fillEventToPluginMap(p, options.clusterEventMap) + // a weight of zero is not permitted, plugins can be disabled explicitly // when configured. f.pluginNameToWeightMap[name] = int(pg[name].Weight) @@ -343,6 +365,37 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi return f, nil } +func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.String) { + ext, ok := p.(framework.EnqueueExtensions) + if !ok { + // If interface EnqueueExtensions is not implemented, register the default events + // to the plugin. This is to ensure backward compatibility. + registerClusterEvents(p.Name(), eventToPlugins, allClusterEvents) + return + } + + events := ext.EventsToRegister() + // It's rare that a plugin implements EnqueueExtensions but returns nil. + // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin + // cannot be moved by any regular cluster event. + if len(events) == 0 { + klog.InfoS("Plugin's EventsToRegister() returned nil", "plugin", p.Name()) + return + } + // The most common case: a plugin implements EnqueueExtensions and returns non-nil result. + registerClusterEvents(p.Name(), eventToPlugins, events) +} + +func registerClusterEvents(name string, eventToPlugins map[framework.ClusterEvent]sets.String, evts []framework.ClusterEvent) { + for _, evt := range evts { + if eventToPlugins[evt] == nil { + eventToPlugins[evt] = sets.NewString(name) + } else { + eventToPlugins[evt].Insert(name) + } + } +} + // getPluginArgsOrDefault returns a configuration provided by the user or builds // a default from the scheme. Returns `nil, nil` if the plugin does not have a // defined arg types, such as in-tree plugins that don't require configuration diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 8ef9d1c3d16..c2f5c514949 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -639,6 +640,166 @@ func TestNewFrameworkPluginDefaults(t *testing.T) { } } +// fakeNoopPlugin doesn't implement interface framework.EnqueueExtensions. +type fakeNoopPlugin struct{} + +func (*fakeNoopPlugin) Name() string { return "fakeNoop" } + +func (*fakeNoopPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +type fakeNodePlugin struct{} + +func (*fakeNodePlugin) Name() string { return "fakeNode" } + +func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (*fakeNodePlugin) EventsToRegister() []framework.ClusterEvent { + return []framework.ClusterEvent{ + {Resource: framework.Pod, ActionType: framework.All}, + {Resource: framework.Node, ActionType: framework.Delete}, + {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}, + } +} + +type fakePodPlugin struct{} + +func (*fakePodPlugin) Name() string { return "fakePod" } + +func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (*fakePodPlugin) EventsToRegister() []framework.ClusterEvent { + return []framework.ClusterEvent{ + {Resource: framework.Pod, ActionType: framework.All}, + {Resource: framework.Node, ActionType: framework.Add | framework.Delete}, + {Resource: framework.Service, ActionType: framework.Delete}, + } +} + +// fakeNoopRuntimePlugin implement interface framework.EnqueueExtensions, but returns nil +// at runtime. This can simulate a plugin registered at scheduler setup, but does nothing +// due to some disabled feature gate. +type fakeNoopRuntimePlugin struct{} + +func (*fakeNoopRuntimePlugin) Name() string { return "fakeNoopRuntime" } + +func (*fakeNoopRuntimePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status { + return nil +} + +func (*fakeNoopRuntimePlugin) EventsToRegister() []framework.ClusterEvent { return nil } + +func TestNewFrameworkFillEventToPluginMap(t *testing.T) { + tests := []struct { + name string + plugins []framework.Plugin + want map[framework.ClusterEvent]sets.String + }{ + { + name: "no-op plugin", + plugins: []framework.Plugin{&fakeNoopPlugin{}}, + want: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + {Resource: framework.Service, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), + }, + }, + { + name: "node plugin", + plugins: []framework.Plugin{&fakeNodePlugin{}}, + want: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakeNode", bindPlugin, queueSortPlugin), + {Resource: framework.Node, ActionType: framework.Delete}: sets.NewString("fakeNode"), + {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.NewString("fakeNode"), + {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + }, + }, + { + name: "pod plugin", + plugins: []framework.Plugin{&fakePodPlugin{}}, + want: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakePod", bindPlugin, queueSortPlugin), + {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.Service, ActionType: framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + }, + }, + { + name: "node and pod plugin", + plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}}, + want: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.Delete}: sets.NewString("fakeNode"), + {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakeNode", "fakePod", bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.NewString("fakeNode"), + {Resource: framework.Service, ActionType: framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + }, + }, + { + name: "no-op runtime plugin", + plugins: []framework.Plugin{&fakeNoopRuntimePlugin{}}, + want: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Pod, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := Registry{} + cfgPls := &config.Plugins{} + for _, pl := range tt.plugins { + tmpPl := pl + if err := registry.Register(pl.Name(), func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("fail to register filter plugin (%s)", pl.Name()) + } + cfgPls.Filter.Enabled = append(cfgPls.Filter.Enabled, config.Plugin{Name: pl.Name()}) + } + + got := make(map[framework.ClusterEvent]sets.String) + _, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs, WithClusterEventMap(got)) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff) + } + }) + } +} + func TestRunScorePlugins(t *testing.T) { tests := []struct { name string diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 109d7d350d5..92afd098c98 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -40,6 +40,52 @@ import ( var generation int64 +// ActionType is an integer to represent one type of resource change. +// Different ActionTypes can be bit-wised to compose new semantics. +type ActionType int64 + +// Constants for ActionTypes. +const ( + Add ActionType = 1 << iota // 1 + Delete // 10 + // UpdateNodeXYZ is only applicable for Node events. + UpdateNodeAllocatable // 100 + UpdateNodeLabel // 1000 + UpdateNodeTaint // 10000 + UpdateNodeCondition // 100000 + + All ActionType = 1< len(y) { + x, y = y, x + } + for v := range x { + if y.Has(v) { + return true + } + } + return false +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index af09279660e..7d715dd4c95 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/metrics/testutil" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -361,40 +362,56 @@ func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + m := map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fooPlugin"), + } + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q.Add(medPriorityPodInfo.Pod) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + // Construct a Pod, but don't associate its scheduler failure to any plugin + hpp1 := highPriorityPodInfo.Pod.DeepCopy() + hpp1.Name = "hpp1" + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + // Construct another Pod, and associate its scheduler failure to plugin "barPlugin". + hpp2 := highPriorityPodInfo.Pod.DeepCopy() + hpp2.Name = "hpp2" + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(hpp2, "barPlugin"), q.SchedulingCycle()) // Pods is still backing off, move the pod into backoffQ. - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue(NodeAdd) if q.activeQ.Len() != 1 { t.Errorf("Expected 1 item to be in activeQ, but got: %v", q.activeQ.Len()) } - if q.podBackoffQ.Len() != 2 { - t.Errorf("Expected 2 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) + // hpp2 won't be moved. + if q.podBackoffQ.Len() != 3 { + t.Fatalf("Expected 3 items to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) } // pop out the pods in the backoffQ. - q.podBackoffQ.Pop() - q.podBackoffQ.Pop() + for q.podBackoffQ.Len() != 0 { + q.podBackoffQ.Pop() + } q.schedulingCycle++ - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod), q.SchedulingCycle()) - if q.unschedulableQ.get(unschedulablePodInfo.Pod) == nil || q.unschedulableQ.get(highPriorityPodInfo.Pod) == nil { - t.Errorf("Expected %v and %v in the unschedulableQ", unschedulablePodInfo.Pod.Name, highPriorityPodInfo.Pod.Name) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(highPriorityPodInfo.Pod, "fooPlugin"), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(hpp1), q.SchedulingCycle()) + for _, pod := range []*v1.Pod{unschedulablePodInfo.Pod, highPriorityPodInfo.Pod, hpp1, hpp2} { + if q.unschedulableQ.get(pod) == nil { + t.Errorf("Expected %v in the unschedulableQ", pod.Name) + } } // Move clock by podInitialBackoffDuration, so that pods in the unschedulableQ would pass the backing off, // and the pods will be moved into activeQ. c.Step(q.podInitialBackoffDuration) - q.MoveAllToActiveOrBackoffQueue("test") - if q.activeQ.Len() != 3 { - t.Errorf("Expected 3 items to be in activeQ, but got: %v", q.activeQ.Len()) + q.MoveAllToActiveOrBackoffQueue(NodeAdd) + // hpp2 won't be moved regardless of its backoff timer. + if q.activeQ.Len() != 4 { + t.Errorf("Expected 4 items to be in activeQ, but got: %v", q.activeQ.Len()) } if q.podBackoffQ.Len() != 0 { t.Errorf("Expected 0 item to be in podBackoffQ, but got: %v", q.podBackoffQ.Len()) } - } // TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that @@ -434,11 +451,14 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) { } c := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + m := map[framework.ClusterEvent]sets.String{ + {Resource: framework.Pod, ActionType: framework.Add}: sets.NewString("fakePlugin"), + } + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) q.Add(medPriorityPodInfo.Pod) // Add a couple of pods to the unschedulableQ. - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(affinityPod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(unschedulablePodInfo.Pod, "fakePlugin"), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(affinityPod, "fakePlugin"), q.SchedulingCycle()) // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) @@ -647,35 +667,35 @@ func TestUnschedulablePodsMap(t *testing.T) { name: "create, update, delete subset of pods", podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]}, expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0])}, - util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1])}, - util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, - util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3]), UnschedulablePlugins: sets.NewString()}, }, podsToUpdate: []*v1.Pod{updatedPods[0]}, expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(updatedPods[0])}, - util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1])}, - util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, - util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(updatedPods[0]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3]), UnschedulablePlugins: sets.NewString()}, }, podsToDelete: []*v1.Pod{pods[0], pods[1]}, expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, - util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3]), UnschedulablePlugins: sets.NewString()}, }, }, { name: "create, update, delete all", podsToAdd: []*v1.Pod{pods[0], pods[3]}, expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0])}, - util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3])}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(pods[3]), UnschedulablePlugins: sets.NewString()}, }, podsToUpdate: []*v1.Pod{updatedPods[3]}, expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0])}, - util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(updatedPods[3])}, + util.GetPodFullName(pods[0]): {PodInfo: framework.NewPodInfo(pods[0]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[3]): {PodInfo: framework.NewPodInfo(updatedPods[3]), UnschedulablePlugins: sets.NewString()}, }, podsToDelete: []*v1.Pod{pods[0], pods[3]}, expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{}, @@ -684,17 +704,17 @@ func TestUnschedulablePodsMap(t *testing.T) { name: "delete non-existing and existing pods", podsToAdd: []*v1.Pod{pods[1], pods[2]}, expectedMapAfterAdd: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1])}, - util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(pods[1]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2]), UnschedulablePlugins: sets.NewString()}, }, podsToUpdate: []*v1.Pod{updatedPods[1]}, expectedMapAfterUpdate: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(updatedPods[1])}, - util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2])}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(updatedPods[1]), UnschedulablePlugins: sets.NewString()}, + util.GetPodFullName(pods[2]): {PodInfo: framework.NewPodInfo(pods[2]), UnschedulablePlugins: sets.NewString()}, }, podsToDelete: []*v1.Pod{pods[2], pods[3]}, expectedMapAfterDelete: map[string]*framework.QueuedPodInfo{ - util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(updatedPods[1])}, + util.GetPodFullName(pods[1]): {PodInfo: framework.NewPodInfo(updatedPods[1]), UnschedulablePlugins: sets.NewString()}, }, }, } @@ -796,7 +816,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) { q.AddUnschedulableIfNotPresent(p1, q.SchedulingCycle()) c.Step(DefaultPodInitialBackoffDuration) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) // Simulation is over. Now let's pop all pods. The pod popped first should be // the last one we pop here. for i := 0; i < 5; i++ { @@ -848,7 +868,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) // Simulate a pod being popped by the scheduler, // At this time, unschedulable pod should be popped. @@ -891,7 +911,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { // Move clock to make the unschedulable pods complete backoff. c.Step(DefaultPodInitialBackoffDuration + time.Second) // Move all unschedulable pods to the active queue. - q.MoveAllToActiveOrBackoffQueue("test") + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) // At this time, newerPod should be popped // because it is the oldest tried pod. @@ -971,7 +991,10 @@ func TestHighPriorityBackoff(t *testing.T) { // activeQ after one minutes if it is in unschedulableQ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { c := clock.NewFakeClock(time.Now()) - q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c)) + m := map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.Add}: sets.NewString("fakePlugin"), + } + q := NewPriorityQueue(newDefaultQueueSort(), WithClock(c), WithClusterEventMap(m)) midPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-midpod", @@ -1015,8 +1038,8 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { Message: "fake scheduling failure", }) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod), q.SchedulingCycle()) - q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&highPod, "fakePlugin"), q.SchedulingCycle()) + q.AddUnschedulableIfNotPresent(q.newQueuedPodInfo(&midPod, "fakePlugin"), q.SchedulingCycle()) c.Step(unschedulableQTimeInterval + time.Second) q.flushUnschedulableQLeftover() @@ -1060,7 +1083,7 @@ var ( queue.podBackoffQ.Add(pInfo) } moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { - queue.MoveAllToActiveOrBackoffQueue("test") + queue.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) } flushBackoffQ = func(queue *PriorityQueue, _ *framework.QueuedPodInfo) { queue.clock.(*clock.FakeClock).Step(2 * time.Second) @@ -1440,7 +1463,7 @@ func TestIncomingPodsMetrics(t *testing.T) { moveAllToActiveOrBackoffQ, }, want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 - scheduler_queue_incoming_pods_total{event="test",queue="backoff"} 3 + scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="backoff"} 3 `, }, { @@ -1451,7 +1474,7 @@ func TestIncomingPodsMetrics(t *testing.T) { moveAllToActiveOrBackoffQ, }, want: ` scheduler_queue_incoming_pods_total{event="ScheduleAttemptFailure",queue="unschedulable"} 3 - scheduler_queue_incoming_pods_total{event="test",queue="active"} 3 + scheduler_queue_incoming_pods_total{event="UnschedulableTimeout",queue="active"} 3 `, }, { @@ -1540,7 +1563,7 @@ func TestBackOffFlow(t *testing.T) { } // An event happens. - q.MoveAllToActiveOrBackoffQueue("deleted pod") + q.MoveAllToActiveOrBackoffQueue(UnschedulableTimeout) if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok { t.Errorf("pod %v is not in the backoff queue", podID) @@ -1570,6 +1593,100 @@ func TestBackOffFlow(t *testing.T) { } } +func TestPodMatchesEvent(t *testing.T) { + tests := []struct { + name string + podInfo *framework.QueuedPodInfo + event string + clusterEventMap map[framework.ClusterEvent]sets.String + want bool + }{ + { + name: "event not registered", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}), + event: "ClusterTearDown", + clusterEventMap: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo"), + }, + want: false, + }, + { + name: "pod's failed plugin matches but event does not match", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), + event: PodAdd, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo", "bar"), + }, + want: false, + }, + { + name: "wildcard event wins regardless of event matching", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), + event: UnschedulableTimeout, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo"), + }, + want: true, + }, + { + name: "pod's failed plugin and event both match", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), + event: NodeTaintChange, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo", "bar"), + }, + want: true, + }, + { + name: "pod's failed plugin registers fine-grained event", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "bar"), + event: NodeTaintChange, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("foo"), + {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: sets.NewString("bar"), + }, + want: true, + }, + { + name: "if pod failed by multiple plugins, a single match gets a final match", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo", "bar"), + event: NodeAdd, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + {Resource: framework.Node, ActionType: framework.All}: sets.NewString("bar"), + }, + want: true, + }, + { + name: "plugin returns WildCardEvent and plugin name matches", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo"), + event: PvAdd, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + framework.WildCardEvent: sets.NewString("foo"), + }, + want: true, + }, + { + name: "plugin returns WildCardEvent but plugin name not match", + podInfo: newQueuedPodInfoForLookup(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p"}}, "foo"), + event: PvAdd, + clusterEventMap: map[framework.ClusterEvent]sets.String{ + framework.WildCardEvent: sets.NewString("bar"), + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewPriorityQueue(newDefaultQueueSort()) + q.clusterEventMap = tt.clusterEventMap + if got := q.podMatchesEvent(tt.podInfo, tt.event); got != tt.want { + t.Errorf("Want %v, but got %v", tt.want, got) + } + }) + } +} + func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo { var pInfos = make([]*framework.QueuedPodInfo, 0, num) for i := 1; i <= num; i++ {