From 0b27f25252d827afe2cac2d9ab67595275315608 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 7 Nov 2022 14:02:22 -0800 Subject: [PATCH] PreEnqueue implementation - Add PreEnqueuePlugin to Scheduler Framework - Implement PreEnqueuePlugin in scheduler queue - Implementation of SchedulingGates plugin - Metrics --- .../app/options/options_test.go | 11 ++ cmd/kube-scheduler/app/server_test.go | 23 ++- pkg/scheduler/framework/interface.go | 15 ++ .../framework/plugins/feature/feature.go | 1 + .../framework/plugins/names/names.go | 1 + pkg/scheduler/framework/plugins/registry.go | 3 + .../schedulinggates/scheduling_gates.go | 67 ++++++++ .../schedulinggates/scheduling_gates_test.go | 77 +++++++++ pkg/scheduler/framework/runtime/framework.go | 7 + .../framework/runtime/framework_test.go | 68 ++++++++ .../internal/queue/scheduling_queue.go | 153 +++++++++++++----- .../internal/queue/scheduling_queue_test.go | 143 +++++++++++++--- pkg/scheduler/metrics/metric_recorder.go | 7 + pkg/scheduler/metrics/metrics.go | 7 +- pkg/scheduler/scheduler.go | 5 + pkg/scheduler/testing/wrappers.go | 8 + .../testdata/stable-metrics-list.yaml | 4 +- 17 files changed, 530 insertions(+), 70 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go create mode 100644 pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go diff --git a/cmd/kube-scheduler/app/options/options_test.go b/cmd/kube-scheduler/app/options/options_test.go index 772f017293f..cdb8fb271a5 100644 --- a/cmd/kube-scheduler/app/options/options_test.go +++ b/cmd/kube-scheduler/app/options/options_test.go @@ -221,6 +221,9 @@ clientConnection: kubeconfig: '%s' profiles: - plugins: + preEnqueue: + enabled: + - name: foo reserve: enabled: - name: foo @@ -830,6 +833,11 @@ profiles: { SchedulerName: "default-scheduler", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: kubeschedulerconfig.PluginSet{ + Enabled: []kubeschedulerconfig.Plugin{ + {Name: "foo"}, + }, + }, Reserve: kubeschedulerconfig.PluginSet{ Enabled: []kubeschedulerconfig.Plugin{ {Name: "foo"}, @@ -944,6 +952,7 @@ profiles: { SchedulerName: "default-scheduler", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: defaults.PluginsV1beta3.PreEnqueue, QueueSort: defaults.PluginsV1beta3.QueueSort, PreFilter: defaults.PluginsV1beta3.PreFilter, Filter: defaults.PluginsV1beta3.Filter, @@ -1065,6 +1074,7 @@ profiles: { SchedulerName: "default-scheduler", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: defaults.PluginsV1beta2.PreEnqueue, QueueSort: defaults.PluginsV1beta2.QueueSort, PreFilter: defaults.PluginsV1beta2.PreFilter, Filter: defaults.PluginsV1beta2.Filter, @@ -1427,6 +1437,7 @@ profiles: { SchedulerName: "bar-profile", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: defaults.PluginsV1beta2.PreEnqueue, QueueSort: defaults.PluginsV1beta2.QueueSort, PreFilter: defaults.PluginsV1beta2.PreFilter, Filter: defaults.PluginsV1beta2.Filter, diff --git a/cmd/kube-scheduler/app/server_test.go b/cmd/kube-scheduler/app/server_test.go index 2c568be096a..ec8733b3cc9 100644 --- a/cmd/kube-scheduler/app/server_test.go +++ b/cmd/kube-scheduler/app/server_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" ) func TestSetup(t *testing.T) { @@ -93,6 +94,7 @@ profiles: - plugins: multiPoint: enabled: + - name: SchedulingGates - name: DefaultBinder - name: PrioritySort - name: DefaultPreemption @@ -131,6 +133,7 @@ profiles: - plugins: multiPoint: enabled: + - name: SchedulingGates - name: DefaultBinder - name: PrioritySort - name: DefaultPreemption @@ -315,16 +318,21 @@ leaderElection: wantLeaderElection *componentbaseconfig.LeaderElectionConfiguration }{ { - name: "default config with an alpha feature enabled", + name: "default config with two alpha features enabled", flags: []string{ "--kubeconfig", configKubeconfig, - "--feature-gates=VolumeCapacityPriority=true", + "--feature-gates=VolumeCapacityPriority=true,PodSchedulingReadiness=true", }, wantPlugins: map[string]*config.Plugins{ - "default-scheduler": defaults.ExpandedPluginsV1, + "default-scheduler": func() *config.Plugins { + plugins := defaults.ExpandedPluginsV1.DeepCopy() + plugins.PreEnqueue.Enabled = append(plugins.PreEnqueue.Enabled, config.Plugin{Name: names.SchedulingGates}) + return plugins + }(), }, restoreFeatures: map[featuregate.Feature]bool{ features.VolumeCapacityPriority: false, + features.PodSchedulingReadiness: false, }, }, { @@ -384,7 +392,8 @@ leaderElection: }, wantPlugins: map[string]*config.Plugins{ "default-scheduler": { - Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, + PreEnqueue: config.PluginSet{Enabled: []config.Plugin{{Name: "SchedulingGates"}}}, + Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, Filter: config.PluginSet{ Enabled: []config.Plugin{ {Name: "NodeResourcesFit"}, @@ -424,7 +433,8 @@ leaderElection: }, wantPlugins: map[string]*config.Plugins{ "default-scheduler": { - Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, + PreEnqueue: config.PluginSet{Enabled: []config.Plugin{{Name: "SchedulingGates"}}}, + Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, Filter: config.PluginSet{ Enabled: []config.Plugin{ {Name: "NodeResourcesFit"}, @@ -515,7 +525,8 @@ leaderElection: registryOptions: []Option{WithPlugin("Foo", newFoo)}, wantPlugins: map[string]*config.Plugins{ "default-scheduler": { - Bind: defaults.ExpandedPluginsV1.Bind, + PreEnqueue: defaults.ExpandedPluginsV1.PreEnqueue, + Bind: defaults.ExpandedPluginsV1.Bind, Filter: config.PluginSet{ Enabled: append(defaults.ExpandedPluginsV1.Filter.Enabled, config.Plugin{Name: "Foo"}), }, diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 67b12bd7baf..0049e7acb1b 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -323,6 +323,17 @@ type Plugin interface { Name() string } +// PreEnqueuePlugin is an interface that must be implemented by "PreEnqueue" plugins. +// These plugins are called prior to adding Pods to activeQ. +// Note: an preEnqueue plugin is expected to be lightweight and efficient, so it's not expected to +// involve expensive calls like accessing external endpoints; otherwise it'd block other +// Pods' enqueuing in event handlers. +type PreEnqueuePlugin interface { + Plugin + // PreEnqueue is called prior to adding Pods to activeQ. + PreEnqueue(ctx context.Context, p *v1.Pod) *Status +} + // LessFunc is the function to sort pod info type LessFunc func(podInfo1, podInfo2 *QueuedPodInfo) bool @@ -521,6 +532,10 @@ type BindPlugin interface { // Configured plugins are called at specified points in a scheduling context. type Framework interface { Handle + + // PreEnqueuePlugins returns the registered preEnqueue plugins. + PreEnqueuePlugins() []PreEnqueuePlugin + // QueueSortFunc returns the function to sort pods in scheduling queue QueueSortFunc() LessFunc diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index 55bd2991518..48cd00b7abe 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -25,4 +25,5 @@ type Features struct { EnableMinDomainsInPodTopologySpread bool EnableNodeInclusionPolicyInPodTopologySpread bool EnableMatchLabelKeysInPodTopologySpread bool + EnablePodSchedulingReadiness bool } diff --git a/pkg/scheduler/framework/plugins/names/names.go b/pkg/scheduler/framework/plugins/names/names.go index 1fd9d72a668..659a5ab4073 100644 --- a/pkg/scheduler/framework/plugins/names/names.go +++ b/pkg/scheduler/framework/plugins/names/names.go @@ -33,6 +33,7 @@ const ( EBSLimits = "EBSLimits" GCEPDLimits = "GCEPDLimits" PodTopologySpread = "PodTopologySpread" + SchedulingGates = "SchedulingGates" SelectorSpread = "SelectorSpread" TaintToleration = "TaintToleration" VolumeBinding = "VolumeBinding" diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 3c822536a5e..fdd1334aaae 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" @@ -50,6 +51,7 @@ func NewInTreeRegistry() runtime.Registry { EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread), EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread), EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread), + EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness), } return runtime.Registry{ @@ -74,5 +76,6 @@ func NewInTreeRegistry() runtime.Registry { queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New), + schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New), } } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go new file mode 100644 index 00000000000..249e31a3b57 --- /dev/null +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -0,0 +1,67 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulinggates + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +// Name of the plugin used in the plugin registry and configurations. +const Name = names.SchedulingGates + +// SchedulingGates checks if a Pod carries .spec.schedulingGates. +type SchedulingGates struct { + enablePodSchedulingReadiness bool +} + +var _ framework.PreEnqueuePlugin = &SchedulingGates{} +var _ framework.EnqueueExtensions = &SchedulingGates{} + +func (pl *SchedulingGates) Name() string { + return Name +} + +func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + if !pl.enablePodSchedulingReadiness || len(p.Spec.SchedulingGates) == 0 { + return nil + } + var gates []string + for _, gate := range p.Spec.SchedulingGates { + gates = append(gates, gate.Name) + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates)) +} + +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEvent { + return []framework.ClusterEvent{ + {Resource: framework.Pod, ActionType: framework.Update}, + } +} + +// New initializes a new plugin and returns it. +func New(_ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &SchedulingGates{enablePodSchedulingReadiness: fts.EnablePodSchedulingReadiness}, nil +} diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go new file mode 100644 index 00000000000..670989a1eb9 --- /dev/null +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulinggates + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + + v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +func TestPreEnqueue(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + enablePodSchedulingReadiness bool + want *framework.Status + }{ + { + name: "pod does not carry scheduling gates, feature disabled", + pod: st.MakePod().Name("p").Obj(), + enablePodSchedulingReadiness: false, + want: nil, + }, + { + name: "pod does not carry scheduling gates, feature enabled", + pod: st.MakePod().Name("p").Obj(), + enablePodSchedulingReadiness: true, + want: nil, + }, + { + name: "pod carries scheduling gates, feature disabled", + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), + enablePodSchedulingReadiness: false, + want: nil, + }, + { + name: "pod carries scheduling gates, feature enabled", + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), + enablePodSchedulingReadiness: true, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for scheduling gates: [foo bar]"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := New(nil, nil, feature.Features{EnablePodSchedulingReadiness: tt.enablePodSchedulingReadiness}) + if err != nil { + t.Fatalf("Creating plugin: %v", err) + } + + got := p.(framework.PreEnqueuePlugin).PreEnqueue(context.Background(), tt.pod) + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected status (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 0001cc19b1e..53e270c2ab7 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -75,6 +75,7 @@ type frameworkImpl struct { snapshotSharedLister framework.SharedLister waitingPods *waitingPodsMap scorePluginWeight map[string]int + preEnqueuePlugins []framework.PreEnqueuePlugin queueSortPlugins []framework.QueueSortPlugin preFilterPlugins []framework.PreFilterPlugin filterPlugins []framework.FilterPlugin @@ -125,6 +126,7 @@ func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionP {&plugins.Bind, &f.bindPlugins}, {&plugins.PostBind, &f.postBindPlugins}, {&plugins.Permit, &f.permitPlugins}, + {&plugins.PreEnqueue, &f.preEnqueuePlugins}, {&plugins.QueueSort, &f.queueSortPlugins}, } } @@ -574,6 +576,11 @@ func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, plugin return nil } +// EnqueuePlugins returns the registered enqueue plugins. +func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin { + return f.preEnqueuePlugins +} + // QueueSortFunc returns the function to sort pods in scheduling queue func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { if f == nil { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 472ee0deb80..ddb7a251a5f 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -41,6 +41,7 @@ import ( ) const ( + preEnqueuePlugin = "preEnqueue-plugin" queueSortPlugin = "no-op-queue-sort-plugin" scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" @@ -302,6 +303,18 @@ func (pp *TestPermitPlugin) Permit(ctx context.Context, state *framework.CycleSt return framework.NewStatus(framework.Wait), 10 * time.Second } +var _ framework.PreEnqueuePlugin = &TestPreEnqueuePlugin{} + +type TestPreEnqueuePlugin struct{} + +func (pl *TestPreEnqueuePlugin) Name() string { + return preEnqueuePlugin +} + +func (pl *TestPreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + return nil +} + var _ framework.QueueSortPlugin = &TestQueueSortPlugin{} func newQueueSortPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { @@ -984,6 +997,61 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { } } +func TestPreEnqueuePlugins(t *testing.T) { + tests := []struct { + name string + plugins []framework.Plugin + want []framework.PreEnqueuePlugin + }{ + { + name: "no PreEnqueuePlugin registered", + }, + { + name: "one PreEnqueuePlugin registered", + plugins: []framework.Plugin{ + &TestPreEnqueuePlugin{}, + }, + want: []framework.PreEnqueuePlugin{ + &TestPreEnqueuePlugin{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := Registry{} + cfgPls := &config.Plugins{} + for _, pl := range tt.plugins { + // register all plugins + tmpPl := pl + if err := registry.Register(pl.Name(), + func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("fail to register preEnqueue plugin (%s)", pl.Name()) + } + // append plugins to filter pluginset + cfgPls.PreEnqueue.Enabled = append( + cfgPls.PreEnqueue.Enabled, + config.Plugin{Name: pl.Name()}, + ) + } + profile := config.KubeSchedulerProfile{Plugins: cfgPls} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + + got := f.PreEnqueuePlugins() + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("PreEnqueuePlugins(): want %v, but got %v", tt.want, got) + } + }) + } +} + func TestRunScorePlugins(t *testing.T) { tests := []struct { name string diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index ffb30d5611e..5332e00d661 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -27,6 +27,7 @@ limitations under the License. package queue import ( + "context" "fmt" "reflect" "sync" @@ -63,6 +64,8 @@ const ( activeQName = "Active" backoffQName = "Backoff" unschedulablePods = "Unschedulable" + + preEnqueue = "PreEnqueue" ) const ( @@ -172,6 +175,8 @@ type PriorityQueue struct { moveRequestCycle int64 clusterEventMap map[framework.ClusterEvent]sets.String + // preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins. + preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. @@ -187,6 +192,7 @@ type priorityQueueOptions struct { podMaxInUnschedulablePodsDuration time.Duration podNominator framework.PodNominator clusterEventMap map[framework.ClusterEvent]sets.String + preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin } // Option configures a PriorityQueue @@ -234,6 +240,13 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option { } } +// WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue. +func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option { + return func(o *priorityQueueOptions) { + o.preEnqueuePluginMap = m + } +} + var defaultPriorityQueueOptions = priorityQueueOptions{ clock: clock.RealClock{}, podInitialBackoffDuration: DefaultPodInitialBackoffDuration, @@ -283,9 +296,10 @@ func NewPriorityQueue( podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), - unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()), + unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), moveRequestCycle: -1, clusterEventMap: options.clusterEventMap, + preEnqueuePluginMap: options.preEnqueuePluginMap, } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) @@ -300,19 +314,66 @@ func (p *PriorityQueue) Run() { go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop) } +// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin. +// It returns true if all PreEnqueue function run successfully; otherwise returns false +// upon the first failure. +// Note: we need to associate the failed plugin to `pInfo`, so that the pod can be moved back +// to activeQ by related cluster event. +func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool { + var s *framework.Status + pod := pInfo.Pod + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime)) + }() + + for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] { + s = pl.PreEnqueue(ctx, pod) + if s.IsSuccess() { + continue + } + pInfo.UnschedulablePlugins.Insert(pl.Name()) + metrics.UnschedulableReason(pl.Name(), pod.Spec.SchedulerName).Inc() + if s.Code() == framework.Error { + klog.ErrorS(s.AsError(), "Unexpected error running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name()) + } else { + klog.V(5).InfoS("Status after running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", s) + } + return false + } + return true +} + +// addToActiveQ tries to add pod to active queue. It returns 2 parameters: +// 1. a boolean flag to indicate whether the pod is added successfully. +// 2. an error for the caller to act on. +func (p *PriorityQueue) addToActiveQ(pInfo *framework.QueuedPodInfo) (bool, error) { + pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) + if pInfo.Gated { + // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. + p.unschedulablePods.addOrUpdate(pInfo) + return false, nil + } + if err := p.activeQ.Add(pInfo); err != nil { + klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod)) + return false, err + } + return true, nil +} + // Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() + pInfo := p.newQueuedPodInfo(pod) - if err := p.activeQ.Add(pInfo); err != nil { - klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod)) + if added, err := p.addToActiveQ(pInfo); !added { return err } if p.unschedulablePods.get(pod) != nil { klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { @@ -367,11 +428,10 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool { return false } - if err := p.activeQ.Add(pInfo); err != nil { - klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) + if added, _ := p.addToActiveQ(pInfo); !added { return false } - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) p.podBackoffQ.Delete(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) @@ -446,8 +506,9 @@ func (p *PriorityQueue) flushBackoffQCompleted() { if rawPodInfo == nil { break } - pod := rawPodInfo.(*framework.QueuedPodInfo).Pod - if p.isPodBackingoff(rawPodInfo.(*framework.QueuedPodInfo)) { + pInfo := rawPodInfo.(*framework.QueuedPodInfo) + pod := pInfo.Pod + if p.isPodBackingoff(pInfo) { break } _, err := p.podBackoffQ.Pop() @@ -455,10 +516,11 @@ func (p *PriorityQueue) flushBackoffQCompleted() { klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } - p.activeQ.Add(rawPodInfo) - klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() - activated = true + if added, _ := p.addToActiveQ(pInfo); added { + klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() + activated = true + } } if activated { @@ -560,13 +622,13 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { if err := p.podBackoffQ.Add(pInfo); err != nil { return err } - p.unschedulablePods.delete(usPodInfo.Pod) + p.unschedulablePods.delete(usPodInfo) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQName) } else { - if err := p.activeQ.Add(pInfo); err != nil { + if added, err := p.addToActiveQ(pInfo); !added { return err } - p.unschedulablePods.delete(usPodInfo.Pod) + p.unschedulablePods.delete(usPodInfo) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQName) p.cond.Broadcast() } @@ -579,7 +641,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { } // If pod is not in any of the queues, we put it in the active queue. pInfo := p.newQueuedPodInfo(newPod) - if err := p.activeQ.Add(pInfo); err != nil { + if added, err := p.addToActiveQ(pInfo); !added { return err } p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) @@ -594,10 +656,11 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() p.PodNominator.DeleteNominatedPodIfExists(pod) - if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil { + pInfo := newQueuedPodInfoForLookup(pod) + if err := p.activeQ.Delete(pInfo); err != nil { // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod)) - p.unschedulablePods.delete(pod) + p.podBackoffQ.Delete(pInfo) + p.unschedulablePods.delete(pInfo) } return nil } @@ -652,16 +715,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework. } else { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc() - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) } } else { - if err := p.activeQ.Add(pInfo); err != nil { - klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) - } else { + if added, _ := p.addToActiveQ(pInfo); added { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName) activated = true metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc() - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) } } } @@ -806,25 +867,33 @@ type UnschedulablePods struct { // podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo. podInfoMap map[string]*framework.QueuedPodInfo keyFunc func(*v1.Pod) string - // metricRecorder updates the counter when elements of an unschedulablePodsMap - // get added or removed, and it does nothing if it's nil - metricRecorder metrics.MetricRecorder + // unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap + // get added or removed, and it does nothing if it's nil. + unschedulableRecorder, gatedRecorder metrics.MetricRecorder } // Add adds a pod to the unschedulable podInfoMap. func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { podID := u.keyFunc(pInfo.Pod) - if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil { - u.metricRecorder.Inc() + if _, exists := u.podInfoMap[podID]; !exists { + if pInfo.Gated && u.gatedRecorder != nil { + u.gatedRecorder.Inc() + } else if !pInfo.Gated && u.unschedulableRecorder != nil { + u.unschedulableRecorder.Inc() + } } u.podInfoMap[podID] = pInfo } // Delete deletes a pod from the unschedulable podInfoMap. -func (u *UnschedulablePods) delete(pod *v1.Pod) { - podID := u.keyFunc(pod) - if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil { - u.metricRecorder.Dec() +func (u *UnschedulablePods) delete(pInfo *framework.QueuedPodInfo) { + podID := u.keyFunc(pInfo.Pod) + if _, exists := u.podInfoMap[podID]; exists { + if pInfo.Gated && u.gatedRecorder != nil { + u.gatedRecorder.Dec() + } else if !pInfo.Gated && u.unschedulableRecorder != nil { + u.unschedulableRecorder.Dec() + } } delete(u.podInfoMap, podID) } @@ -842,17 +911,21 @@ func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo { // Clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePods) clear() { u.podInfoMap = make(map[string]*framework.QueuedPodInfo) - if u.metricRecorder != nil { - u.metricRecorder.Clear() + if u.unschedulableRecorder != nil { + u.unschedulableRecorder.Clear() + } + if u.gatedRecorder != nil { + u.gatedRecorder.Clear() } } // newUnschedulablePods initializes a new object of UnschedulablePods. -func newUnschedulablePods(metricRecorder metrics.MetricRecorder) *UnschedulablePods { +func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods { return &UnschedulablePods{ - podInfoMap: make(map[string]*framework.QueuedPodInfo), - keyFunc: util.GetPodFullName, - metricRecorder: metricRecorder, + podInfoMap: make(map[string]*framework.QueuedPodInfo), + keyFunc: util.GetPodFullName, + unschedulableRecorder: unschedulableRecorder, + gatedRecorder: gatedRecorder, } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 5b0a1832cbe..95062a43274 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -446,6 +446,84 @@ func TestPriorityQueue_Activate(t *testing.T) { } } +type preEnqueuePlugin struct { + allowlists []string +} + +func (pl *preEnqueuePlugin) Name() string { + return "preEnqueuePlugin" +} + +func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + for _, allowed := range pl.allowlists { + if strings.Contains(p.Name, allowed) { + return nil + } + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists") +} + +func TestPriorityQueue_addToActiveQ(t *testing.T) { + tests := []struct { + name string + plugins []framework.PreEnqueuePlugin + pod *v1.Pod + wantUnschedulablePods int + wantSuccess bool + }{ + { + name: "no plugins registered", + pod: st.MakePod().Name("p").Obj(), + wantUnschedulablePods: 0, + wantSuccess: true, + }, + { + name: "preEnqueue plugin registered, pod name not in allowlists", + plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, + pod: st.MakePod().Name("p").Obj(), + wantUnschedulablePods: 1, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, pod failed one preEnqueue plugin", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"foo"}}, + }, + pod: st.MakePod().Name("bar").Obj(), + wantUnschedulablePods: 1, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, pod passed all preEnqueue plugins", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"bar"}}, + }, + pod: st.MakePod().Name("bar").Obj(), + wantUnschedulablePods: 0, + wantSuccess: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m)) + got, _ := q.addToActiveQ(newQueuedPodInfoForLookup(tt.pod)) + if got != tt.wantSuccess { + t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) + } + if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { + t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) + } + }) + } +} + func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { tests := []struct { name string @@ -949,7 +1027,7 @@ func TestUnschedulablePodsMap(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - upm := newUnschedulablePods(nil) + upm := newUnschedulablePods(nil, nil) for _, p := range test.podsToAdd { upm.addOrUpdate(newQueuedPodInfoForLookup(p)) } @@ -968,7 +1046,7 @@ func TestUnschedulablePodsMap(t *testing.T) { } } for _, p := range test.podsToDelete { - upm.delete(p) + upm.delete(newQueuedPodInfoForLookup(p)) } if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) { t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v", @@ -1307,13 +1385,15 @@ var ( queue.activeQ.Update(pInfo) } addPodUnschedulablePods = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - // Update pod condition to unschedulable. - podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - }) + if !pInfo.Gated { + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + } queue.unschedulablePods.addOrUpdate(pInfo) } addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { @@ -1434,10 +1514,20 @@ func TestPodTimestamp(t *testing.T) { func TestPendingPodsMetric(t *testing.T) { timestamp := time.Now() metrics.Register() - total := 50 - pInfos := makeQueuedPodInfos(total, timestamp) + total := 60 + queueableNum := 50 + queueable := "queueable" + // First 50 Pods are queueable. + pInfos := makeQueuedPodInfos(queueableNum, queueable, timestamp) + // The last 10 Pods are not queueable. + gated := makeQueuedPodInfos(total-queueableNum, "fail-me", timestamp) + // Manually mark them as gated=true. + for _, pInfo := range gated { + pInfo.Gated = true + } + pInfos = append(pInfos, gated...) totalWithDelay := 20 - pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, timestamp.Add(2*time.Second)) + pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, queueable, timestamp.Add(2*time.Second)) tests := []struct { name string @@ -1458,10 +1548,11 @@ func TestPendingPodsMetric(t *testing.T) { }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 30 scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 20 `, }, @@ -1479,10 +1570,11 @@ scheduler_pending_pods{queue="unschedulable"} 20 }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 15 scheduler_pending_pods{queue="backoff"} 25 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 10 `, }, @@ -1500,10 +1592,11 @@ scheduler_pending_pods{queue="unschedulable"} 10 }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 50 scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 0 `, }, @@ -1523,10 +1616,11 @@ scheduler_pending_pods{queue="unschedulable"} 0 }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 30 scheduler_pending_pods{queue="backoff"} 20 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 0 `, }, @@ -1540,16 +1634,17 @@ scheduler_pending_pods{queue="unschedulable"} 0 }, operands: [][]*framework.QueuedPodInfo{ pInfos[:40], - pInfos[40:], + pInfos[40:50], {nil}, {nil}, }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 50 scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 0 scheduler_pending_pods{queue="unschedulable"} 0 `, }, @@ -1559,6 +1654,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 metrics.ActivePods().Set(0) metrics.BackoffPods().Set(0) metrics.UnschedulablePods().Set(0) + metrics.GatedPods().Set(0) } for _, test := range tests { @@ -1566,7 +1662,9 @@ scheduler_pending_pods{queue="unschedulable"} 0 resetMetrics() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) + + m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}} + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m)) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1986,12 +2084,13 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { } } -func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo { +func makeQueuedPodInfos(num int, namePrefix string, timestamp time.Time) []*framework.QueuedPodInfo { var pInfos = make([]*framework.QueuedPodInfo, 0, num) for i := 1; i <= num; i++ { p := &framework.QueuedPodInfo{ - PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("test-pod-%d", i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()), - Timestamp: timestamp, + PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()), + Timestamp: timestamp, + UnschedulablePlugins: sets.NewString(), } pInfos = append(pInfos, p) } diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index 5534923fa15..b52b6a5455b 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -56,6 +56,13 @@ func NewBackoffPodsRecorder() *PendingPodsRecorder { } } +// NewGatedPodsRecorder returns GatedPods in a Prometheus metric fashion +func NewGatedPodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: GatedPods(), + } +} + // Inc increases a metric counter by 1, in an atomic way func (r *PendingPodsRecorder) Inc() { r.recorder.Inc() diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index abe97d700c4..335cba29396 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -92,7 +92,7 @@ var ( &metrics.GaugeOpts{ Subsystem: SchedulerSubsystem, Name: "pending_pods", - Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.", + Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.", StabilityLevel: metrics.STABLE, }, []string{"queue"}) // SchedulerGoroutines isn't called in some parts where goroutines start. @@ -249,6 +249,11 @@ func UnschedulablePods() metrics.GaugeMetric { return pendingPods.With(metrics.Labels{"queue": "unschedulable"}) } +// GatedPods returns the pending pods metrics with the label gated +func GatedPods() metrics.GaugeMetric { + return pendingPods.With(metrics.Labels{"queue": "gated"}) +} + // SinceInSeconds gets the time since the specified start in seconds. func SinceInSeconds(start time.Time) float64 { return time.Since(start).Seconds() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 573cd9142ab..581c792a7cf 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -309,6 +309,10 @@ func New(client clientset.Interface, return nil, errors.New("at least one profile is required") } + preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) + for profileName, profile := range profiles { + preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() + } podQueue := internalqueue.NewSchedulingQueue( profiles[options.profiles[0].SchedulerName].QueueSortFunc(), informerFactory, @@ -317,6 +321,7 @@ func New(client clientset.Interface, internalqueue.WithPodNominator(nominator), internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), + internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), ) schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything) diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 3114f1c52aa..991ba9f1250 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -385,6 +385,14 @@ func (p *PodWrapper) Volume(volume v1.Volume) *PodWrapper { return p } +// SchedulingGates sets `gates` as additional SchedulerGates of the inner pod. +func (p *PodWrapper) SchedulingGates(gates []string) *PodWrapper { + for _, gate := range gates { + p.Spec.SchedulingGates = append(p.Spec.SchedulingGates, v1.PodSchedulingGate{Name: gate}) + } + return p +} + // PodAffinityKind represents different kinds of PodAffinity. type PodAffinityKind int diff --git a/test/instrumentation/testdata/stable-metrics-list.yaml b/test/instrumentation/testdata/stable-metrics-list.yaml index 590b81ce802..bafa5cb3eaa 100644 --- a/test/instrumentation/testdata/stable-metrics-list.yaml +++ b/test/instrumentation/testdata/stable-metrics-list.yaml @@ -49,7 +49,9 @@ subsystem: scheduler help: Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number - of pods in unschedulablePods. + of pods in unschedulablePods that the scheduler attempted to schedule and failed; + 'gated' is the number of unschedulable pods that the scheduler never attempted + to schedule because they are gated. type: Gauge stabilityLevel: STABLE labels: