From 2de75d92bf94b8a7d38c74818bf20ee7b1c3bfd6 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 7 Nov 2022 14:01:48 -0800 Subject: [PATCH 1/2] API, Codegen, UT for PreEnqueue extension point --- pkg/generated/openapi/zz_generated.openapi.go | 21 +++++ pkg/scheduler/apis/config/types.go | 4 + .../apis/config/v1/default_plugins.go | 10 +++ .../apis/config/v1/default_plugins_test.go | 34 +++++++ pkg/scheduler/apis/config/v1/defaults.go | 1 + .../apis/config/v1/zz_generated.conversion.go | 6 ++ .../apis/config/v1beta2/conversion.go | 4 + .../apis/config/v1beta2/default_plugins.go | 3 + .../config/v1beta2/default_plugins_test.go | 88 +++++++++++++++++++ .../config/v1beta2/zz_generated.conversion.go | 21 ++--- .../apis/config/v1beta3/conversion.go | 5 ++ .../apis/config/v1beta3/default_plugins.go | 9 ++ .../config/v1beta3/default_plugins_test.go | 34 +++++++ .../config/v1beta3/zz_generated.conversion.go | 21 ++--- .../apis/config/validation/validation.go | 1 + .../apis/config/validation/validation_test.go | 6 ++ .../apis/config/zz_generated.deepcopy.go | 1 + pkg/scheduler/framework/types.go | 10 ++- .../k8s.io/kube-scheduler/config/v1/types.go | 3 + .../config/v1/zz_generated.deepcopy.go | 1 + .../kube-scheduler/config/v1beta2/types.go | 3 + .../config/v1beta2/zz_generated.deepcopy.go | 1 + .../kube-scheduler/config/v1beta3/types.go | 3 + .../config/v1beta3/zz_generated.deepcopy.go | 1 + 24 files changed, 267 insertions(+), 24 deletions(-) diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 27560d75213..185e927e869 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -52981,6 +52981,13 @@ func schema_k8sio_kube_scheduler_config_v1_Plugins(ref common.ReferenceCallback) Description: "Plugins include multiple extension points. When specified, the list of plugins for a particular extension point are the only ones enabled. If an extension point is omitted from the config, then the default set of plugins is used for that extension point. Enabled plugins are called in the order specified here, after default plugins. If they need to be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.", Type: []string{"object"}, Properties: map[string]spec.Schema{ + "preEnqueue": { + SchemaProps: spec.SchemaProps{ + Description: "PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.", + Default: map[string]interface{}{}, + Ref: ref("k8s.io/kube-scheduler/config/v1.PluginSet"), + }, + }, "queueSort": { SchemaProps: spec.SchemaProps{ Description: "QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.", @@ -54085,6 +54092,13 @@ func schema_k8sio_kube_scheduler_config_v1beta2_Plugins(ref common.ReferenceCall Description: "Plugins include multiple extension points. When specified, the list of plugins for a particular extension point are the only ones enabled. If an extension point is omitted from the config, then the default set of plugins is used for that extension point. Enabled plugins are called in the order specified here, after default plugins. If they need to be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.", Type: []string{"object"}, Properties: map[string]spec.Schema{ + "preEnqueue": { + SchemaProps: spec.SchemaProps{ + Description: "PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.", + Default: map[string]interface{}{}, + Ref: ref("k8s.io/kube-scheduler/config/v1beta2.PluginSet"), + }, + }, "queueSort": { SchemaProps: spec.SchemaProps{ Description: "QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.", @@ -55175,6 +55189,13 @@ func schema_k8sio_kube_scheduler_config_v1beta3_Plugins(ref common.ReferenceCall Description: "Plugins include multiple extension points. When specified, the list of plugins for a particular extension point are the only ones enabled. If an extension point is omitted from the config, then the default set of plugins is used for that extension point. Enabled plugins are called in the order specified here, after default plugins. If they need to be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.", Type: []string{"object"}, Properties: map[string]spec.Schema{ + "preEnqueue": { + SchemaProps: spec.SchemaProps{ + Description: "PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.", + Default: map[string]interface{}{}, + Ref: ref("k8s.io/kube-scheduler/config/v1beta3.PluginSet"), + }, + }, "queueSort": { SchemaProps: spec.SchemaProps{ Description: "QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.", diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 2c5567f2189..8db4e35c987 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -138,6 +138,9 @@ type KubeSchedulerProfile struct { // Enabled plugins are called in the order specified here, after default plugins. If they need to // be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order. type Plugins struct { + // PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue. + PreEnqueue PluginSet + // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort PluginSet @@ -231,6 +234,7 @@ func (p *Plugins) Names() []string { return nil } extensions := []PluginSet{ + p.PreEnqueue, p.PreFilter, p.Filter, p.PostFilter, diff --git a/pkg/scheduler/apis/config/v1/default_plugins.go b/pkg/scheduler/apis/config/v1/default_plugins.go index 73c3635b57e..daeb3370ed0 100644 --- a/pkg/scheduler/apis/config/v1/default_plugins.go +++ b/pkg/scheduler/apis/config/v1/default_plugins.go @@ -18,8 +18,10 @@ package v1 import ( "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" v1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/utils/pointer" ) @@ -52,10 +54,17 @@ func getDefaultPlugins() *v1.Plugins { }, }, } + applyFeatureGates(plugins) return plugins } +func applyFeatureGates(config *v1.Plugins) { + if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) { + config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.SchedulingGates}) + } +} + // mergePlugins merges the custom set into the given default one, handling disabled sets. func mergePlugins(defaultPlugins, customPlugins *v1.Plugins) *v1.Plugins { if customPlugins == nil { @@ -63,6 +72,7 @@ func mergePlugins(defaultPlugins, customPlugins *v1.Plugins) *v1.Plugins { } defaultPlugins.MultiPoint = mergePluginSet(defaultPlugins.MultiPoint, customPlugins.MultiPoint) + defaultPlugins.PreEnqueue = mergePluginSet(defaultPlugins.PreEnqueue, customPlugins.PreEnqueue) defaultPlugins.QueueSort = mergePluginSet(defaultPlugins.QueueSort, customPlugins.QueueSort) defaultPlugins.PreFilter = mergePluginSet(defaultPlugins.PreFilter, customPlugins.PreFilter) defaultPlugins.Filter = mergePluginSet(defaultPlugins.Filter, customPlugins.Filter) diff --git a/pkg/scheduler/apis/config/v1/default_plugins_test.go b/pkg/scheduler/apis/config/v1/default_plugins_test.go index b459000ad3e..d9ceb2ae527 100644 --- a/pkg/scheduler/apis/config/v1/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1/default_plugins_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" v1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/utils/pointer" ) @@ -63,6 +64,39 @@ func TestApplyFeatureGates(t *testing.T) { }, }, }, + { + name: "Feature gate PodSchedulingReadiness enabled", + features: map[featuregate.Feature]bool{ + features.PodSchedulingReadiness: true, + }, + wantConfig: &v1.Plugins{ + MultiPoint: v1.PluginSet{ + Enabled: []v1.Plugin{ + {Name: names.PrioritySort}, + {Name: names.NodeUnschedulable}, + {Name: names.NodeName}, + {Name: names.TaintToleration, Weight: pointer.Int32(3)}, + {Name: names.NodeAffinity, Weight: pointer.Int32(2)}, + {Name: names.NodePorts}, + {Name: names.NodeResourcesFit, Weight: pointer.Int32(1)}, + {Name: names.VolumeRestrictions}, + {Name: names.EBSLimits}, + {Name: names.GCEPDLimits}, + {Name: names.NodeVolumeLimits}, + {Name: names.AzureDiskLimits}, + {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, + {Name: names.PodTopologySpread, Weight: pointer.Int32(2)}, + {Name: names.InterPodAffinity, Weight: pointer.Int32(2)}, + {Name: names.DefaultPreemption}, + {Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)}, + {Name: names.ImageLocality, Weight: pointer.Int32(1)}, + {Name: names.DefaultBinder}, + {Name: names.SchedulingGates}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/pkg/scheduler/apis/config/v1/defaults.go b/pkg/scheduler/apis/config/v1/defaults.go index d85e78222c2..8e86712fa25 100644 --- a/pkg/scheduler/apis/config/v1/defaults.go +++ b/pkg/scheduler/apis/config/v1/defaults.go @@ -53,6 +53,7 @@ func pluginsNames(p *configv1.Plugins) []string { p.Bind, p.PostBind, p.Permit, + p.PreEnqueue, p.QueueSort, } n := sets.NewString() diff --git a/pkg/scheduler/apis/config/v1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1/zz_generated.conversion.go index 3fca82d3a1f..d9dbc31adbc 100644 --- a/pkg/scheduler/apis/config/v1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1/zz_generated.conversion.go @@ -712,6 +712,9 @@ func Convert_config_PluginSet_To_v1_PluginSet(in *config.PluginSet, out *v1.Plug } func autoConvert_v1_Plugins_To_config_Plugins(in *v1.Plugins, out *config.Plugins, s conversion.Scope) error { + if err := Convert_v1_PluginSet_To_config_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil { + return err + } if err := Convert_v1_PluginSet_To_config_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil { return err } @@ -757,6 +760,9 @@ func Convert_v1_Plugins_To_config_Plugins(in *v1.Plugins, out *config.Plugins, s } func autoConvert_config_Plugins_To_v1_Plugins(in *config.Plugins, out *v1.Plugins, s conversion.Scope) error { + if err := Convert_config_PluginSet_To_v1_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil { + return err + } if err := Convert_config_PluginSet_To_v1_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil { return err } diff --git a/pkg/scheduler/apis/config/v1beta2/conversion.go b/pkg/scheduler/apis/config/v1beta2/conversion.go index 0a6b6e779f2..c0d89d75ee9 100644 --- a/pkg/scheduler/apis/config/v1beta2/conversion.go +++ b/pkg/scheduler/apis/config/v1beta2/conversion.go @@ -111,3 +111,7 @@ func convertToExternalPluginConfigArgs(out *v1beta2.KubeSchedulerConfiguration) func Convert_config_KubeSchedulerProfile_To_v1beta2_KubeSchedulerProfile(in *config.KubeSchedulerProfile, out *v1beta2.KubeSchedulerProfile, s conversion.Scope) error { return autoConvert_config_KubeSchedulerProfile_To_v1beta2_KubeSchedulerProfile(in, out, s) } + +func Convert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1beta2.Plugins, s conversion.Scope) error { + return autoConvert_config_Plugins_To_v1beta2_Plugins(in, out, s) +} diff --git a/pkg/scheduler/apis/config/v1beta2/default_plugins.go b/pkg/scheduler/apis/config/v1beta2/default_plugins.go index 5b2a2017bb6..37341a4233e 100644 --- a/pkg/scheduler/apis/config/v1beta2/default_plugins.go +++ b/pkg/scheduler/apis/config/v1beta2/default_plugins.go @@ -116,6 +116,9 @@ func applyFeatureGates(config *v1beta2.Plugins) { if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) { config.Score.Enabled = append(config.Score.Enabled, v1beta2.Plugin{Name: names.VolumeBinding, Weight: pointer.Int32(1)}) } + if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) { + config.PreEnqueue.Enabled = append(config.PreEnqueue.Enabled, v1beta2.Plugin{Name: names.SchedulingGates}) + } } // mergePlugins merges the custom set into the given default one, handling disabled sets. diff --git a/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go b/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go index 27f6e53db44..aaa5d44d15a 100644 --- a/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kube-scheduler/config/v1beta2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/utils/pointer" ) @@ -113,6 +114,93 @@ func TestApplyFeatureGates(t *testing.T) { }, }, }, + { + name: "Feature gate PodSchedulingReadiness enabled", + features: map[featuregate.Feature]bool{ + features.PodSchedulingReadiness: true, + }, + wantConfig: &v1beta2.Plugins{ + PreEnqueue: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.SchedulingGates}, + }, + }, + QueueSort: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.PrioritySort}, + }, + }, + PreFilter: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.NodeResourcesFit}, + {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, + {Name: names.PodTopologySpread}, + {Name: names.InterPodAffinity}, + {Name: names.VolumeBinding}, + {Name: names.NodeAffinity}, + }, + }, + Filter: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.NodeUnschedulable}, + {Name: names.NodeName}, + {Name: names.TaintToleration}, + {Name: names.NodeAffinity}, + {Name: names.NodePorts}, + {Name: names.NodeResourcesFit}, + {Name: names.VolumeRestrictions}, + {Name: names.EBSLimits}, + {Name: names.GCEPDLimits}, + {Name: names.NodeVolumeLimits}, + {Name: names.AzureDiskLimits}, + {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, + {Name: names.PodTopologySpread}, + {Name: names.InterPodAffinity}, + }, + }, + PostFilter: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.DefaultPreemption}, + }, + }, + PreScore: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.InterPodAffinity}, + {Name: names.PodTopologySpread}, + {Name: names.TaintToleration}, + {Name: names.NodeAffinity}, + }, + }, + Score: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)}, + {Name: names.ImageLocality, Weight: pointer.Int32(1)}, + {Name: names.InterPodAffinity, Weight: pointer.Int32(1)}, + {Name: names.NodeResourcesFit, Weight: pointer.Int32(1)}, + {Name: names.NodeAffinity, Weight: pointer.Int32(1)}, + {Name: names.PodTopologySpread, Weight: pointer.Int32(2)}, + {Name: names.TaintToleration, Weight: pointer.Int32(1)}, + }, + }, + Reserve: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.VolumeBinding}, + }, + }, + PreBind: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.VolumeBinding}, + }, + }, + Bind: v1beta2.PluginSet{ + Enabled: []v1beta2.Plugin{ + {Name: names.DefaultBinder}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go index 953d61b19f9..642729e388f 100644 --- a/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go @@ -160,11 +160,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*config.Plugins)(nil), (*v1beta2.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_config_Plugins_To_v1beta2_Plugins(a.(*config.Plugins), b.(*v1beta2.Plugins), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*v1beta2.PodTopologySpreadArgs)(nil), (*config.PodTopologySpreadArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta2_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(a.(*v1beta2.PodTopologySpreadArgs), b.(*config.PodTopologySpreadArgs), scope) }); err != nil { @@ -235,6 +230,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*config.Plugins)(nil), (*v1beta2.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_Plugins_To_v1beta2_Plugins(a.(*config.Plugins), b.(*v1beta2.Plugins), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*v1beta2.KubeSchedulerConfiguration)(nil), (*config.KubeSchedulerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta2_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration(a.(*v1beta2.KubeSchedulerConfiguration), b.(*config.KubeSchedulerConfiguration), scope) }); err != nil { @@ -716,6 +716,9 @@ func Convert_config_PluginSet_To_v1beta2_PluginSet(in *config.PluginSet, out *v1 } func autoConvert_v1beta2_Plugins_To_config_Plugins(in *v1beta2.Plugins, out *config.Plugins, s conversion.Scope) error { + if err := Convert_v1beta2_PluginSet_To_config_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil { + return err + } if err := Convert_v1beta2_PluginSet_To_config_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil { return err } @@ -761,6 +764,9 @@ func Convert_v1beta2_Plugins_To_config_Plugins(in *v1beta2.Plugins, out *config. } func autoConvert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1beta2.Plugins, s conversion.Scope) error { + if err := Convert_config_PluginSet_To_v1beta2_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil { + return err + } if err := Convert_config_PluginSet_To_v1beta2_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil { return err } @@ -800,11 +806,6 @@ func autoConvert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1be return nil } -// Convert_config_Plugins_To_v1beta2_Plugins is an autogenerated conversion function. -func Convert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1beta2.Plugins, s conversion.Scope) error { - return autoConvert_config_Plugins_To_v1beta2_Plugins(in, out, s) -} - func autoConvert_v1beta2_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(in *v1beta2.PodTopologySpreadArgs, out *config.PodTopologySpreadArgs, s conversion.Scope) error { out.DefaultConstraints = *(*[]corev1.TopologySpreadConstraint)(unsafe.Pointer(&in.DefaultConstraints)) out.DefaultingType = config.PodTopologySpreadConstraintsDefaulting(in.DefaultingType) diff --git a/pkg/scheduler/apis/config/v1beta3/conversion.go b/pkg/scheduler/apis/config/v1beta3/conversion.go index 262f1e6ef2e..bbb8ff4f264 100644 --- a/pkg/scheduler/apis/config/v1beta3/conversion.go +++ b/pkg/scheduler/apis/config/v1beta3/conversion.go @@ -111,3 +111,8 @@ func convertToExternalPluginConfigArgs(out *v1beta3.KubeSchedulerConfiguration) func Convert_config_KubeSchedulerProfile_To_v1beta3_KubeSchedulerProfile(in *config.KubeSchedulerProfile, out *v1beta3.KubeSchedulerProfile, s conversion.Scope) error { return autoConvert_config_KubeSchedulerProfile_To_v1beta3_KubeSchedulerProfile(in, out, s) } + +// Convert_config_Plugins_To_v1beta3_Plugins is an autogenerated conversion function. +func Convert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1beta3.Plugins, s conversion.Scope) error { + return autoConvert_config_Plugins_To_v1beta3_Plugins(in, out, s) +} diff --git a/pkg/scheduler/apis/config/v1beta3/default_plugins.go b/pkg/scheduler/apis/config/v1beta3/default_plugins.go index ca360b78260..13d479fece5 100644 --- a/pkg/scheduler/apis/config/v1beta3/default_plugins.go +++ b/pkg/scheduler/apis/config/v1beta3/default_plugins.go @@ -18,8 +18,10 @@ package v1beta3 import ( "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kube-scheduler/config/v1beta3" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/utils/pointer" ) @@ -52,10 +54,17 @@ func getDefaultPlugins() *v1beta3.Plugins { }, }, } + applyFeatureGates(plugins) return plugins } +func applyFeatureGates(config *v1beta3.Plugins) { + if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) { + config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1beta3.Plugin{Name: names.SchedulingGates}) + } +} + // mergePlugins merges the custom set into the given default one, handling disabled sets. func mergePlugins(defaultPlugins, customPlugins *v1beta3.Plugins) *v1beta3.Plugins { if customPlugins == nil { diff --git a/pkg/scheduler/apis/config/v1beta3/default_plugins_test.go b/pkg/scheduler/apis/config/v1beta3/default_plugins_test.go index 66e5f81235a..e1ab9d84a02 100644 --- a/pkg/scheduler/apis/config/v1beta3/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1beta3/default_plugins_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kube-scheduler/config/v1beta3" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/utils/pointer" ) @@ -63,6 +64,39 @@ func TestApplyFeatureGates(t *testing.T) { }, }, }, + { + name: "Feature gate PodSchedulingReadiness enabled", + features: map[featuregate.Feature]bool{ + features.PodSchedulingReadiness: true, + }, + wantConfig: &v1beta3.Plugins{ + MultiPoint: v1beta3.PluginSet{ + Enabled: []v1beta3.Plugin{ + {Name: names.PrioritySort}, + {Name: names.NodeUnschedulable}, + {Name: names.NodeName}, + {Name: names.TaintToleration, Weight: pointer.Int32(3)}, + {Name: names.NodeAffinity, Weight: pointer.Int32(2)}, + {Name: names.NodePorts}, + {Name: names.NodeResourcesFit, Weight: pointer.Int32(1)}, + {Name: names.VolumeRestrictions}, + {Name: names.EBSLimits}, + {Name: names.GCEPDLimits}, + {Name: names.NodeVolumeLimits}, + {Name: names.AzureDiskLimits}, + {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, + {Name: names.PodTopologySpread, Weight: pointer.Int32(2)}, + {Name: names.InterPodAffinity, Weight: pointer.Int32(2)}, + {Name: names.DefaultPreemption}, + {Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)}, + {Name: names.ImageLocality, Weight: pointer.Int32(1)}, + {Name: names.DefaultBinder}, + {Name: names.SchedulingGates}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go index 48800c0ec75..1e8fd12ba64 100644 --- a/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go @@ -160,11 +160,6 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*config.Plugins)(nil), (*v1beta3.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_config_Plugins_To_v1beta3_Plugins(a.(*config.Plugins), b.(*v1beta3.Plugins), scope) - }); err != nil { - return err - } if err := s.AddGeneratedConversionFunc((*v1beta3.PodTopologySpreadArgs)(nil), (*config.PodTopologySpreadArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta3_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(a.(*v1beta3.PodTopologySpreadArgs), b.(*config.PodTopologySpreadArgs), scope) }); err != nil { @@ -235,6 +230,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*config.Plugins)(nil), (*v1beta3.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_Plugins_To_v1beta3_Plugins(a.(*config.Plugins), b.(*v1beta3.Plugins), scope) + }); err != nil { + return err + } if err := s.AddConversionFunc((*v1beta3.KubeSchedulerConfiguration)(nil), (*config.KubeSchedulerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta3_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration(a.(*v1beta3.KubeSchedulerConfiguration), b.(*config.KubeSchedulerConfiguration), scope) }); err != nil { @@ -706,6 +706,9 @@ func Convert_config_PluginSet_To_v1beta3_PluginSet(in *config.PluginSet, out *v1 } func autoConvert_v1beta3_Plugins_To_config_Plugins(in *v1beta3.Plugins, out *config.Plugins, s conversion.Scope) error { + if err := Convert_v1beta3_PluginSet_To_config_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil { + return err + } if err := Convert_v1beta3_PluginSet_To_config_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil { return err } @@ -751,6 +754,9 @@ func Convert_v1beta3_Plugins_To_config_Plugins(in *v1beta3.Plugins, out *config. } func autoConvert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1beta3.Plugins, s conversion.Scope) error { + if err := Convert_config_PluginSet_To_v1beta3_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil { + return err + } if err := Convert_config_PluginSet_To_v1beta3_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil { return err } @@ -790,11 +796,6 @@ func autoConvert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1be return nil } -// Convert_config_Plugins_To_v1beta3_Plugins is an autogenerated conversion function. -func Convert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1beta3.Plugins, s conversion.Scope) error { - return autoConvert_config_Plugins_To_v1beta3_Plugins(in, out, s) -} - func autoConvert_v1beta3_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(in *v1beta3.PodTopologySpreadArgs, out *config.PodTopologySpreadArgs, s conversion.Scope) error { out.DefaultConstraints = *(*[]corev1.TopologySpreadConstraint)(unsafe.Pointer(&in.DefaultConstraints)) out.DefaultingType = config.PodTopologySpreadConstraintsDefaulting(in.DefaultingType) diff --git a/pkg/scheduler/apis/config/validation/validation.go b/pkg/scheduler/apis/config/validation/validation.go index 47d85717be2..1dcfc699e80 100644 --- a/pkg/scheduler/apis/config/validation/validation.go +++ b/pkg/scheduler/apis/config/validation/validation.go @@ -198,6 +198,7 @@ func validatePluginConfig(path *field.Path, apiVersion string, profile *config.K if profile.Plugins != nil { stagesToPluginSet := map[string]config.PluginSet{ + "preEnqueue": profile.Plugins.PreEnqueue, "queueSort": profile.Plugins.QueueSort, "preFilter": profile.Plugins.PreFilter, "filter": profile.Plugins.Filter, diff --git a/pkg/scheduler/apis/config/validation/validation_test.go b/pkg/scheduler/apis/config/validation/validation_test.go index 679bb87484f..6dce867b3c6 100644 --- a/pkg/scheduler/apis/config/validation/validation_test.go +++ b/pkg/scheduler/apis/config/validation/validation_test.go @@ -543,6 +543,12 @@ func TestValidateKubeSchedulerConfigurationV1beta3(t *testing.T) { }, } + duplicatedPlugins := validConfig.DeepCopy() + duplicatedPlugins.Profiles[0].Plugins.PreEnqueue.Enabled = []config.Plugin{ + {Name: "CustomPreEnqueue"}, + {Name: "CustomPreEnqueue"}, + } + duplicatedPluginConfig := validConfig.DeepCopy() duplicatedPluginConfig.Profiles[0].PluginConfig = []config.PluginConfig{ { diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index b139c909333..f5baa62218c 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -394,6 +394,7 @@ func (in *PluginSet) DeepCopy() *PluginSet { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Plugins) DeepCopyInto(out *Plugins) { *out = *in + in.PreEnqueue.DeepCopyInto(&out.PreEnqueue) in.QueueSort.DeepCopyInto(&out.QueueSort) in.PreFilter.DeepCopyInto(&out.PreFilter) in.Filter.DeepCopyInto(&out.Filter) diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 3b0e9514516..420aa8a73d5 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -102,6 +102,8 @@ type QueuedPodInfo struct { InitialAttemptTimestamp time.Time // If a Pod failed in a scheduling cycle, record the plugin names it failed by. UnschedulablePlugins sets.String + // Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not. + Gated bool } // DeepCopy returns a deep copy of the QueuedPodInfo object. @@ -331,9 +333,9 @@ func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution } // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { // terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} + // } } return terms } @@ -344,9 +346,9 @@ func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution } // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { // terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} + // } } return terms } diff --git a/staging/src/k8s.io/kube-scheduler/config/v1/types.go b/staging/src/k8s.io/kube-scheduler/config/v1/types.go index 9fdec69d1ee..703516fb78c 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1/types.go @@ -170,6 +170,9 @@ type KubeSchedulerProfile struct { // Enabled plugins are called in the order specified here, after default plugins. If they need to // be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order. type Plugins struct { + // PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue. + PreEnqueue PluginSet `json:"preEnqueue,omitempty"` + // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort PluginSet `json:"queueSort,omitempty"` diff --git a/staging/src/k8s.io/kube-scheduler/config/v1/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1/zz_generated.deepcopy.go index 48d2dffaef4..1203cdd3b93 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1/zz_generated.deepcopy.go @@ -436,6 +436,7 @@ func (in *PluginSet) DeepCopy() *PluginSet { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Plugins) DeepCopyInto(out *Plugins) { *out = *in + in.PreEnqueue.DeepCopyInto(&out.PreEnqueue) in.QueueSort.DeepCopyInto(&out.QueueSort) in.PreFilter.DeepCopyInto(&out.PreFilter) in.Filter.DeepCopyInto(&out.Filter) diff --git a/staging/src/k8s.io/kube-scheduler/config/v1beta2/types.go b/staging/src/k8s.io/kube-scheduler/config/v1beta2/types.go index f561260e974..0e47967adb4 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1beta2/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1beta2/types.go @@ -166,6 +166,9 @@ type KubeSchedulerProfile struct { // Enabled plugins are called in the order specified here, after default plugins. If they need to // be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order. type Plugins struct { + // PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue. + PreEnqueue PluginSet `json:"preEnqueue,omitempty"` + // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort PluginSet `json:"queueSort,omitempty"` diff --git a/staging/src/k8s.io/kube-scheduler/config/v1beta2/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1beta2/zz_generated.deepcopy.go index 3b787773bf6..7ffacf0f3da 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1beta2/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1beta2/zz_generated.deepcopy.go @@ -441,6 +441,7 @@ func (in *PluginSet) DeepCopy() *PluginSet { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Plugins) DeepCopyInto(out *Plugins) { *out = *in + in.PreEnqueue.DeepCopyInto(&out.PreEnqueue) in.QueueSort.DeepCopyInto(&out.QueueSort) in.PreFilter.DeepCopyInto(&out.PreFilter) in.Filter.DeepCopyInto(&out.Filter) diff --git a/staging/src/k8s.io/kube-scheduler/config/v1beta3/types.go b/staging/src/k8s.io/kube-scheduler/config/v1beta3/types.go index 385109f16e8..45371c9d960 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1beta3/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1beta3/types.go @@ -159,6 +159,9 @@ type KubeSchedulerProfile struct { // Enabled plugins are called in the order specified here, after default plugins. If they need to // be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order. type Plugins struct { + // PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue. + PreEnqueue PluginSet `json:"preEnqueue,omitempty"` + // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort PluginSet `json:"queueSort,omitempty"` diff --git a/staging/src/k8s.io/kube-scheduler/config/v1beta3/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1beta3/zz_generated.deepcopy.go index f1a3e7b2991..2b549d4990a 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1beta3/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1beta3/zz_generated.deepcopy.go @@ -431,6 +431,7 @@ func (in *PluginSet) DeepCopy() *PluginSet { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Plugins) DeepCopyInto(out *Plugins) { *out = *in + in.PreEnqueue.DeepCopyInto(&out.PreEnqueue) in.QueueSort.DeepCopyInto(&out.QueueSort) in.PreFilter.DeepCopyInto(&out.PreFilter) in.Filter.DeepCopyInto(&out.Filter) From 0b27f25252d827afe2cac2d9ab67595275315608 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Mon, 7 Nov 2022 14:02:22 -0800 Subject: [PATCH 2/2] PreEnqueue implementation - Add PreEnqueuePlugin to Scheduler Framework - Implement PreEnqueuePlugin in scheduler queue - Implementation of SchedulingGates plugin - Metrics --- .../app/options/options_test.go | 11 ++ cmd/kube-scheduler/app/server_test.go | 23 ++- pkg/scheduler/framework/interface.go | 15 ++ .../framework/plugins/feature/feature.go | 1 + .../framework/plugins/names/names.go | 1 + pkg/scheduler/framework/plugins/registry.go | 3 + .../schedulinggates/scheduling_gates.go | 67 ++++++++ .../schedulinggates/scheduling_gates_test.go | 77 +++++++++ pkg/scheduler/framework/runtime/framework.go | 7 + .../framework/runtime/framework_test.go | 68 ++++++++ .../internal/queue/scheduling_queue.go | 153 +++++++++++++----- .../internal/queue/scheduling_queue_test.go | 143 +++++++++++++--- pkg/scheduler/metrics/metric_recorder.go | 7 + pkg/scheduler/metrics/metrics.go | 7 +- pkg/scheduler/scheduler.go | 5 + pkg/scheduler/testing/wrappers.go | 8 + .../testdata/stable-metrics-list.yaml | 4 +- 17 files changed, 530 insertions(+), 70 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go create mode 100644 pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go diff --git a/cmd/kube-scheduler/app/options/options_test.go b/cmd/kube-scheduler/app/options/options_test.go index 772f017293f..cdb8fb271a5 100644 --- a/cmd/kube-scheduler/app/options/options_test.go +++ b/cmd/kube-scheduler/app/options/options_test.go @@ -221,6 +221,9 @@ clientConnection: kubeconfig: '%s' profiles: - plugins: + preEnqueue: + enabled: + - name: foo reserve: enabled: - name: foo @@ -830,6 +833,11 @@ profiles: { SchedulerName: "default-scheduler", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: kubeschedulerconfig.PluginSet{ + Enabled: []kubeschedulerconfig.Plugin{ + {Name: "foo"}, + }, + }, Reserve: kubeschedulerconfig.PluginSet{ Enabled: []kubeschedulerconfig.Plugin{ {Name: "foo"}, @@ -944,6 +952,7 @@ profiles: { SchedulerName: "default-scheduler", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: defaults.PluginsV1beta3.PreEnqueue, QueueSort: defaults.PluginsV1beta3.QueueSort, PreFilter: defaults.PluginsV1beta3.PreFilter, Filter: defaults.PluginsV1beta3.Filter, @@ -1065,6 +1074,7 @@ profiles: { SchedulerName: "default-scheduler", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: defaults.PluginsV1beta2.PreEnqueue, QueueSort: defaults.PluginsV1beta2.QueueSort, PreFilter: defaults.PluginsV1beta2.PreFilter, Filter: defaults.PluginsV1beta2.Filter, @@ -1427,6 +1437,7 @@ profiles: { SchedulerName: "bar-profile", Plugins: &kubeschedulerconfig.Plugins{ + PreEnqueue: defaults.PluginsV1beta2.PreEnqueue, QueueSort: defaults.PluginsV1beta2.QueueSort, PreFilter: defaults.PluginsV1beta2.PreFilter, Filter: defaults.PluginsV1beta2.Filter, diff --git a/cmd/kube-scheduler/app/server_test.go b/cmd/kube-scheduler/app/server_test.go index 2c568be096a..ec8733b3cc9 100644 --- a/cmd/kube-scheduler/app/server_test.go +++ b/cmd/kube-scheduler/app/server_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" ) func TestSetup(t *testing.T) { @@ -93,6 +94,7 @@ profiles: - plugins: multiPoint: enabled: + - name: SchedulingGates - name: DefaultBinder - name: PrioritySort - name: DefaultPreemption @@ -131,6 +133,7 @@ profiles: - plugins: multiPoint: enabled: + - name: SchedulingGates - name: DefaultBinder - name: PrioritySort - name: DefaultPreemption @@ -315,16 +318,21 @@ leaderElection: wantLeaderElection *componentbaseconfig.LeaderElectionConfiguration }{ { - name: "default config with an alpha feature enabled", + name: "default config with two alpha features enabled", flags: []string{ "--kubeconfig", configKubeconfig, - "--feature-gates=VolumeCapacityPriority=true", + "--feature-gates=VolumeCapacityPriority=true,PodSchedulingReadiness=true", }, wantPlugins: map[string]*config.Plugins{ - "default-scheduler": defaults.ExpandedPluginsV1, + "default-scheduler": func() *config.Plugins { + plugins := defaults.ExpandedPluginsV1.DeepCopy() + plugins.PreEnqueue.Enabled = append(plugins.PreEnqueue.Enabled, config.Plugin{Name: names.SchedulingGates}) + return plugins + }(), }, restoreFeatures: map[featuregate.Feature]bool{ features.VolumeCapacityPriority: false, + features.PodSchedulingReadiness: false, }, }, { @@ -384,7 +392,8 @@ leaderElection: }, wantPlugins: map[string]*config.Plugins{ "default-scheduler": { - Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, + PreEnqueue: config.PluginSet{Enabled: []config.Plugin{{Name: "SchedulingGates"}}}, + Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, Filter: config.PluginSet{ Enabled: []config.Plugin{ {Name: "NodeResourcesFit"}, @@ -424,7 +433,8 @@ leaderElection: }, wantPlugins: map[string]*config.Plugins{ "default-scheduler": { - Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, + PreEnqueue: config.PluginSet{Enabled: []config.Plugin{{Name: "SchedulingGates"}}}, + Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}}, Filter: config.PluginSet{ Enabled: []config.Plugin{ {Name: "NodeResourcesFit"}, @@ -515,7 +525,8 @@ leaderElection: registryOptions: []Option{WithPlugin("Foo", newFoo)}, wantPlugins: map[string]*config.Plugins{ "default-scheduler": { - Bind: defaults.ExpandedPluginsV1.Bind, + PreEnqueue: defaults.ExpandedPluginsV1.PreEnqueue, + Bind: defaults.ExpandedPluginsV1.Bind, Filter: config.PluginSet{ Enabled: append(defaults.ExpandedPluginsV1.Filter.Enabled, config.Plugin{Name: "Foo"}), }, diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 67b12bd7baf..0049e7acb1b 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -323,6 +323,17 @@ type Plugin interface { Name() string } +// PreEnqueuePlugin is an interface that must be implemented by "PreEnqueue" plugins. +// These plugins are called prior to adding Pods to activeQ. +// Note: an preEnqueue plugin is expected to be lightweight and efficient, so it's not expected to +// involve expensive calls like accessing external endpoints; otherwise it'd block other +// Pods' enqueuing in event handlers. +type PreEnqueuePlugin interface { + Plugin + // PreEnqueue is called prior to adding Pods to activeQ. + PreEnqueue(ctx context.Context, p *v1.Pod) *Status +} + // LessFunc is the function to sort pod info type LessFunc func(podInfo1, podInfo2 *QueuedPodInfo) bool @@ -521,6 +532,10 @@ type BindPlugin interface { // Configured plugins are called at specified points in a scheduling context. type Framework interface { Handle + + // PreEnqueuePlugins returns the registered preEnqueue plugins. + PreEnqueuePlugins() []PreEnqueuePlugin + // QueueSortFunc returns the function to sort pods in scheduling queue QueueSortFunc() LessFunc diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index 55bd2991518..48cd00b7abe 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -25,4 +25,5 @@ type Features struct { EnableMinDomainsInPodTopologySpread bool EnableNodeInclusionPolicyInPodTopologySpread bool EnableMatchLabelKeysInPodTopologySpread bool + EnablePodSchedulingReadiness bool } diff --git a/pkg/scheduler/framework/plugins/names/names.go b/pkg/scheduler/framework/plugins/names/names.go index 1fd9d72a668..659a5ab4073 100644 --- a/pkg/scheduler/framework/plugins/names/names.go +++ b/pkg/scheduler/framework/plugins/names/names.go @@ -33,6 +33,7 @@ const ( EBSLimits = "EBSLimits" GCEPDLimits = "GCEPDLimits" PodTopologySpread = "PodTopologySpread" + SchedulingGates = "SchedulingGates" SelectorSpread = "SelectorSpread" TaintToleration = "TaintToleration" VolumeBinding = "VolumeBinding" diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 3c822536a5e..fdd1334aaae 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" @@ -50,6 +51,7 @@ func NewInTreeRegistry() runtime.Registry { EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread), EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread), EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread), + EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness), } return runtime.Registry{ @@ -74,5 +76,6 @@ func NewInTreeRegistry() runtime.Registry { queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New), + schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New), } } diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go new file mode 100644 index 00000000000..249e31a3b57 --- /dev/null +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates.go @@ -0,0 +1,67 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulinggates + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +// Name of the plugin used in the plugin registry and configurations. +const Name = names.SchedulingGates + +// SchedulingGates checks if a Pod carries .spec.schedulingGates. +type SchedulingGates struct { + enablePodSchedulingReadiness bool +} + +var _ framework.PreEnqueuePlugin = &SchedulingGates{} +var _ framework.EnqueueExtensions = &SchedulingGates{} + +func (pl *SchedulingGates) Name() string { + return Name +} + +func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + if !pl.enablePodSchedulingReadiness || len(p.Spec.SchedulingGates) == 0 { + return nil + } + var gates []string + for _, gate := range p.Spec.SchedulingGates { + gates = append(gates, gate.Name) + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates)) +} + +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEvent { + return []framework.ClusterEvent{ + {Resource: framework.Pod, ActionType: framework.Update}, + } +} + +// New initializes a new plugin and returns it. +func New(_ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) { + return &SchedulingGates{enablePodSchedulingReadiness: fts.EnablePodSchedulingReadiness}, nil +} diff --git a/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go new file mode 100644 index 00000000000..670989a1eb9 --- /dev/null +++ b/pkg/scheduler/framework/plugins/schedulinggates/scheduling_gates_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulinggates + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + + v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +func TestPreEnqueue(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + enablePodSchedulingReadiness bool + want *framework.Status + }{ + { + name: "pod does not carry scheduling gates, feature disabled", + pod: st.MakePod().Name("p").Obj(), + enablePodSchedulingReadiness: false, + want: nil, + }, + { + name: "pod does not carry scheduling gates, feature enabled", + pod: st.MakePod().Name("p").Obj(), + enablePodSchedulingReadiness: true, + want: nil, + }, + { + name: "pod carries scheduling gates, feature disabled", + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), + enablePodSchedulingReadiness: false, + want: nil, + }, + { + name: "pod carries scheduling gates, feature enabled", + pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(), + enablePodSchedulingReadiness: true, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for scheduling gates: [foo bar]"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := New(nil, nil, feature.Features{EnablePodSchedulingReadiness: tt.enablePodSchedulingReadiness}) + if err != nil { + t.Fatalf("Creating plugin: %v", err) + } + + got := p.(framework.PreEnqueuePlugin).PreEnqueue(context.Background(), tt.pod) + if diff := cmp.Diff(tt.want, got); diff != "" { + t.Errorf("unexpected status (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 0001cc19b1e..53e270c2ab7 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -75,6 +75,7 @@ type frameworkImpl struct { snapshotSharedLister framework.SharedLister waitingPods *waitingPodsMap scorePluginWeight map[string]int + preEnqueuePlugins []framework.PreEnqueuePlugin queueSortPlugins []framework.QueueSortPlugin preFilterPlugins []framework.PreFilterPlugin filterPlugins []framework.FilterPlugin @@ -125,6 +126,7 @@ func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionP {&plugins.Bind, &f.bindPlugins}, {&plugins.PostBind, &f.postBindPlugins}, {&plugins.Permit, &f.permitPlugins}, + {&plugins.PreEnqueue, &f.preEnqueuePlugins}, {&plugins.QueueSort, &f.queueSortPlugins}, } } @@ -574,6 +576,11 @@ func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, plugin return nil } +// EnqueuePlugins returns the registered enqueue plugins. +func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin { + return f.preEnqueuePlugins +} + // QueueSortFunc returns the function to sort pods in scheduling queue func (f *frameworkImpl) QueueSortFunc() framework.LessFunc { if f == nil { diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index 472ee0deb80..ddb7a251a5f 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -41,6 +41,7 @@ import ( ) const ( + preEnqueuePlugin = "preEnqueue-plugin" queueSortPlugin = "no-op-queue-sort-plugin" scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1" scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2" @@ -302,6 +303,18 @@ func (pp *TestPermitPlugin) Permit(ctx context.Context, state *framework.CycleSt return framework.NewStatus(framework.Wait), 10 * time.Second } +var _ framework.PreEnqueuePlugin = &TestPreEnqueuePlugin{} + +type TestPreEnqueuePlugin struct{} + +func (pl *TestPreEnqueuePlugin) Name() string { + return preEnqueuePlugin +} + +func (pl *TestPreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + return nil +} + var _ framework.QueueSortPlugin = &TestQueueSortPlugin{} func newQueueSortPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { @@ -984,6 +997,61 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { } } +func TestPreEnqueuePlugins(t *testing.T) { + tests := []struct { + name string + plugins []framework.Plugin + want []framework.PreEnqueuePlugin + }{ + { + name: "no PreEnqueuePlugin registered", + }, + { + name: "one PreEnqueuePlugin registered", + plugins: []framework.Plugin{ + &TestPreEnqueuePlugin{}, + }, + want: []framework.PreEnqueuePlugin{ + &TestPreEnqueuePlugin{}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registry := Registry{} + cfgPls := &config.Plugins{} + for _, pl := range tt.plugins { + // register all plugins + tmpPl := pl + if err := registry.Register(pl.Name(), + func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return tmpPl, nil + }); err != nil { + t.Fatalf("fail to register preEnqueue plugin (%s)", pl.Name()) + } + // append plugins to filter pluginset + cfgPls.PreEnqueue.Enabled = append( + cfgPls.PreEnqueue.Enabled, + config.Plugin{Name: pl.Name()}, + ) + } + profile := config.KubeSchedulerProfile{Plugins: cfgPls} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done()) + if err != nil { + t.Fatalf("fail to create framework: %s", err) + } + + got := f.PreEnqueuePlugins() + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("PreEnqueuePlugins(): want %v, but got %v", tt.want, got) + } + }) + } +} + func TestRunScorePlugins(t *testing.T) { tests := []struct { name string diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index ffb30d5611e..5332e00d661 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -27,6 +27,7 @@ limitations under the License. package queue import ( + "context" "fmt" "reflect" "sync" @@ -63,6 +64,8 @@ const ( activeQName = "Active" backoffQName = "Backoff" unschedulablePods = "Unschedulable" + + preEnqueue = "PreEnqueue" ) const ( @@ -172,6 +175,8 @@ type PriorityQueue struct { moveRequestCycle int64 clusterEventMap map[framework.ClusterEvent]sets.String + // preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins. + preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. @@ -187,6 +192,7 @@ type priorityQueueOptions struct { podMaxInUnschedulablePodsDuration time.Duration podNominator framework.PodNominator clusterEventMap map[framework.ClusterEvent]sets.String + preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin } // Option configures a PriorityQueue @@ -234,6 +240,13 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option { } } +// WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue. +func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option { + return func(o *priorityQueueOptions) { + o.preEnqueuePluginMap = m + } +} + var defaultPriorityQueueOptions = priorityQueueOptions{ clock: clock.RealClock{}, podInitialBackoffDuration: DefaultPodInitialBackoffDuration, @@ -283,9 +296,10 @@ func NewPriorityQueue( podMaxBackoffDuration: options.podMaxBackoffDuration, podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), - unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()), + unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()), moveRequestCycle: -1, clusterEventMap: options.clusterEventMap, + preEnqueuePluginMap: options.preEnqueuePluginMap, } pq.cond.L = &pq.lock pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) @@ -300,19 +314,66 @@ func (p *PriorityQueue) Run() { go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop) } +// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin. +// It returns true if all PreEnqueue function run successfully; otherwise returns false +// upon the first failure. +// Note: we need to associate the failed plugin to `pInfo`, so that the pod can be moved back +// to activeQ by related cluster event. +func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool { + var s *framework.Status + pod := pInfo.Pod + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime)) + }() + + for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] { + s = pl.PreEnqueue(ctx, pod) + if s.IsSuccess() { + continue + } + pInfo.UnschedulablePlugins.Insert(pl.Name()) + metrics.UnschedulableReason(pl.Name(), pod.Spec.SchedulerName).Inc() + if s.Code() == framework.Error { + klog.ErrorS(s.AsError(), "Unexpected error running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name()) + } else { + klog.V(5).InfoS("Status after running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", s) + } + return false + } + return true +} + +// addToActiveQ tries to add pod to active queue. It returns 2 parameters: +// 1. a boolean flag to indicate whether the pod is added successfully. +// 2. an error for the caller to act on. +func (p *PriorityQueue) addToActiveQ(pInfo *framework.QueuedPodInfo) (bool, error) { + pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo) + if pInfo.Gated { + // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins. + p.unschedulablePods.addOrUpdate(pInfo) + return false, nil + } + if err := p.activeQ.Add(pInfo); err != nil { + klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod)) + return false, err + } + return true, nil +} + // Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() + pInfo := p.newQueuedPodInfo(pod) - if err := p.activeQ.Add(pInfo); err != nil { - klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod)) + if added, err := p.addToActiveQ(pInfo); !added { return err } if p.unschedulablePods.get(pod) != nil { klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { @@ -367,11 +428,10 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool { return false } - if err := p.activeQ.Add(pInfo); err != nil { - klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) + if added, _ := p.addToActiveQ(pInfo); !added { return false } - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) p.podBackoffQ.Delete(pInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc() p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) @@ -446,8 +506,9 @@ func (p *PriorityQueue) flushBackoffQCompleted() { if rawPodInfo == nil { break } - pod := rawPodInfo.(*framework.QueuedPodInfo).Pod - if p.isPodBackingoff(rawPodInfo.(*framework.QueuedPodInfo)) { + pInfo := rawPodInfo.(*framework.QueuedPodInfo) + pod := pInfo.Pod + if p.isPodBackingoff(pInfo) { break } _, err := p.podBackoffQ.Pop() @@ -455,10 +516,11 @@ func (p *PriorityQueue) flushBackoffQCompleted() { klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) break } - p.activeQ.Add(rawPodInfo) - klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName) - metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() - activated = true + if added, _ := p.addToActiveQ(pInfo); added { + klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName) + metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() + activated = true + } } if activated { @@ -560,13 +622,13 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { if err := p.podBackoffQ.Add(pInfo); err != nil { return err } - p.unschedulablePods.delete(usPodInfo.Pod) + p.unschedulablePods.delete(usPodInfo) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQName) } else { - if err := p.activeQ.Add(pInfo); err != nil { + if added, err := p.addToActiveQ(pInfo); !added { return err } - p.unschedulablePods.delete(usPodInfo.Pod) + p.unschedulablePods.delete(usPodInfo) klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQName) p.cond.Broadcast() } @@ -579,7 +641,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error { } // If pod is not in any of the queues, we put it in the active queue. pInfo := p.newQueuedPodInfo(newPod) - if err := p.activeQ.Add(pInfo); err != nil { + if added, err := p.addToActiveQ(pInfo); !added { return err } p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) @@ -594,10 +656,11 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() p.PodNominator.DeleteNominatedPodIfExists(pod) - if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil { + pInfo := newQueuedPodInfoForLookup(pod) + if err := p.activeQ.Delete(pInfo); err != nil { // The item was probably not found in the activeQ. - p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod)) - p.unschedulablePods.delete(pod) + p.podBackoffQ.Delete(pInfo) + p.unschedulablePods.delete(pInfo) } return nil } @@ -652,16 +715,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework. } else { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName) metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc() - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) } } else { - if err := p.activeQ.Add(pInfo); err != nil { - klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod)) - } else { + if added, _ := p.addToActiveQ(pInfo); added { klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName) activated = true metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc() - p.unschedulablePods.delete(pod) + p.unschedulablePods.delete(pInfo) } } } @@ -806,25 +867,33 @@ type UnschedulablePods struct { // podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo. podInfoMap map[string]*framework.QueuedPodInfo keyFunc func(*v1.Pod) string - // metricRecorder updates the counter when elements of an unschedulablePodsMap - // get added or removed, and it does nothing if it's nil - metricRecorder metrics.MetricRecorder + // unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap + // get added or removed, and it does nothing if it's nil. + unschedulableRecorder, gatedRecorder metrics.MetricRecorder } // Add adds a pod to the unschedulable podInfoMap. func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) { podID := u.keyFunc(pInfo.Pod) - if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil { - u.metricRecorder.Inc() + if _, exists := u.podInfoMap[podID]; !exists { + if pInfo.Gated && u.gatedRecorder != nil { + u.gatedRecorder.Inc() + } else if !pInfo.Gated && u.unschedulableRecorder != nil { + u.unschedulableRecorder.Inc() + } } u.podInfoMap[podID] = pInfo } // Delete deletes a pod from the unschedulable podInfoMap. -func (u *UnschedulablePods) delete(pod *v1.Pod) { - podID := u.keyFunc(pod) - if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil { - u.metricRecorder.Dec() +func (u *UnschedulablePods) delete(pInfo *framework.QueuedPodInfo) { + podID := u.keyFunc(pInfo.Pod) + if _, exists := u.podInfoMap[podID]; exists { + if pInfo.Gated && u.gatedRecorder != nil { + u.gatedRecorder.Dec() + } else if !pInfo.Gated && u.unschedulableRecorder != nil { + u.unschedulableRecorder.Dec() + } } delete(u.podInfoMap, podID) } @@ -842,17 +911,21 @@ func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo { // Clear removes all the entries from the unschedulable podInfoMap. func (u *UnschedulablePods) clear() { u.podInfoMap = make(map[string]*framework.QueuedPodInfo) - if u.metricRecorder != nil { - u.metricRecorder.Clear() + if u.unschedulableRecorder != nil { + u.unschedulableRecorder.Clear() + } + if u.gatedRecorder != nil { + u.gatedRecorder.Clear() } } // newUnschedulablePods initializes a new object of UnschedulablePods. -func newUnschedulablePods(metricRecorder metrics.MetricRecorder) *UnschedulablePods { +func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods { return &UnschedulablePods{ - podInfoMap: make(map[string]*framework.QueuedPodInfo), - keyFunc: util.GetPodFullName, - metricRecorder: metricRecorder, + podInfoMap: make(map[string]*framework.QueuedPodInfo), + keyFunc: util.GetPodFullName, + unschedulableRecorder: unschedulableRecorder, + gatedRecorder: gatedRecorder, } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 5b0a1832cbe..95062a43274 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -446,6 +446,84 @@ func TestPriorityQueue_Activate(t *testing.T) { } } +type preEnqueuePlugin struct { + allowlists []string +} + +func (pl *preEnqueuePlugin) Name() string { + return "preEnqueuePlugin" +} + +func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status { + for _, allowed := range pl.allowlists { + if strings.Contains(p.Name, allowed) { + return nil + } + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists") +} + +func TestPriorityQueue_addToActiveQ(t *testing.T) { + tests := []struct { + name string + plugins []framework.PreEnqueuePlugin + pod *v1.Pod + wantUnschedulablePods int + wantSuccess bool + }{ + { + name: "no plugins registered", + pod: st.MakePod().Name("p").Obj(), + wantUnschedulablePods: 0, + wantSuccess: true, + }, + { + name: "preEnqueue plugin registered, pod name not in allowlists", + plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}}, + pod: st.MakePod().Name("p").Obj(), + wantUnschedulablePods: 1, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, pod failed one preEnqueue plugin", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"foo"}}, + }, + pod: st.MakePod().Name("bar").Obj(), + wantUnschedulablePods: 1, + wantSuccess: false, + }, + { + name: "preEnqueue plugin registered, pod passed all preEnqueue plugins", + plugins: []framework.PreEnqueuePlugin{ + &preEnqueuePlugin{allowlists: []string{"foo", "bar"}}, + &preEnqueuePlugin{allowlists: []string{"bar"}}, + }, + pod: st.MakePod().Name("bar").Obj(), + wantUnschedulablePods: 0, + wantSuccess: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins} + q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m)) + got, _ := q.addToActiveQ(newQueuedPodInfoForLookup(tt.pod)) + if got != tt.wantSuccess { + t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got) + } + if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) { + t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap)) + } + }) + } +} + func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) { tests := []struct { name string @@ -949,7 +1027,7 @@ func TestUnschedulablePodsMap(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - upm := newUnschedulablePods(nil) + upm := newUnschedulablePods(nil, nil) for _, p := range test.podsToAdd { upm.addOrUpdate(newQueuedPodInfoForLookup(p)) } @@ -968,7 +1046,7 @@ func TestUnschedulablePodsMap(t *testing.T) { } } for _, p := range test.podsToDelete { - upm.delete(p) + upm.delete(newQueuedPodInfoForLookup(p)) } if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) { t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v", @@ -1307,13 +1385,15 @@ var ( queue.activeQ.Update(pInfo) } addPodUnschedulablePods = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { - // Update pod condition to unschedulable. - podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{ - Type: v1.PodScheduled, - Status: v1.ConditionFalse, - Reason: v1.PodReasonUnschedulable, - Message: "fake scheduling failure", - }) + if !pInfo.Gated { + // Update pod condition to unschedulable. + podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{ + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + Reason: v1.PodReasonUnschedulable, + Message: "fake scheduling failure", + }) + } queue.unschedulablePods.addOrUpdate(pInfo) } addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) { @@ -1434,10 +1514,20 @@ func TestPodTimestamp(t *testing.T) { func TestPendingPodsMetric(t *testing.T) { timestamp := time.Now() metrics.Register() - total := 50 - pInfos := makeQueuedPodInfos(total, timestamp) + total := 60 + queueableNum := 50 + queueable := "queueable" + // First 50 Pods are queueable. + pInfos := makeQueuedPodInfos(queueableNum, queueable, timestamp) + // The last 10 Pods are not queueable. + gated := makeQueuedPodInfos(total-queueableNum, "fail-me", timestamp) + // Manually mark them as gated=true. + for _, pInfo := range gated { + pInfo.Gated = true + } + pInfos = append(pInfos, gated...) totalWithDelay := 20 - pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, timestamp.Add(2*time.Second)) + pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, queueable, timestamp.Add(2*time.Second)) tests := []struct { name string @@ -1458,10 +1548,11 @@ func TestPendingPodsMetric(t *testing.T) { }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 30 scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 20 `, }, @@ -1479,10 +1570,11 @@ scheduler_pending_pods{queue="unschedulable"} 20 }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 15 scheduler_pending_pods{queue="backoff"} 25 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 10 `, }, @@ -1500,10 +1592,11 @@ scheduler_pending_pods{queue="unschedulable"} 10 }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 50 scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 0 `, }, @@ -1523,10 +1616,11 @@ scheduler_pending_pods{queue="unschedulable"} 0 }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 30 scheduler_pending_pods{queue="backoff"} 20 +scheduler_pending_pods{queue="gated"} 10 scheduler_pending_pods{queue="unschedulable"} 0 `, }, @@ -1540,16 +1634,17 @@ scheduler_pending_pods{queue="unschedulable"} 0 }, operands: [][]*framework.QueuedPodInfo{ pInfos[:40], - pInfos[40:], + pInfos[40:50], {nil}, {nil}, }, metricsName: "scheduler_pending_pods", wants: ` -# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods. +# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated. # TYPE scheduler_pending_pods gauge scheduler_pending_pods{queue="active"} 50 scheduler_pending_pods{queue="backoff"} 0 +scheduler_pending_pods{queue="gated"} 0 scheduler_pending_pods{queue="unschedulable"} 0 `, }, @@ -1559,6 +1654,7 @@ scheduler_pending_pods{queue="unschedulable"} 0 metrics.ActivePods().Set(0) metrics.BackoffPods().Set(0) metrics.UnschedulablePods().Set(0) + metrics.GatedPods().Set(0) } for _, test := range tests { @@ -1566,7 +1662,9 @@ scheduler_pending_pods{queue="unschedulable"} 0 resetMetrics() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp))) + + m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}} + queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m)) for i, op := range test.operations { for _, pInfo := range test.operands[i] { op(queue, pInfo) @@ -1986,12 +2084,13 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) { } } -func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo { +func makeQueuedPodInfos(num int, namePrefix string, timestamp time.Time) []*framework.QueuedPodInfo { var pInfos = make([]*framework.QueuedPodInfo, 0, num) for i := 1; i <= num; i++ { p := &framework.QueuedPodInfo{ - PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("test-pod-%d", i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()), - Timestamp: timestamp, + PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()), + Timestamp: timestamp, + UnschedulablePlugins: sets.NewString(), } pInfos = append(pInfos, p) } diff --git a/pkg/scheduler/metrics/metric_recorder.go b/pkg/scheduler/metrics/metric_recorder.go index 5534923fa15..b52b6a5455b 100644 --- a/pkg/scheduler/metrics/metric_recorder.go +++ b/pkg/scheduler/metrics/metric_recorder.go @@ -56,6 +56,13 @@ func NewBackoffPodsRecorder() *PendingPodsRecorder { } } +// NewGatedPodsRecorder returns GatedPods in a Prometheus metric fashion +func NewGatedPodsRecorder() *PendingPodsRecorder { + return &PendingPodsRecorder{ + recorder: GatedPods(), + } +} + // Inc increases a metric counter by 1, in an atomic way func (r *PendingPodsRecorder) Inc() { r.recorder.Inc() diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index abe97d700c4..335cba29396 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -92,7 +92,7 @@ var ( &metrics.GaugeOpts{ Subsystem: SchedulerSubsystem, Name: "pending_pods", - Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.", + Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.", StabilityLevel: metrics.STABLE, }, []string{"queue"}) // SchedulerGoroutines isn't called in some parts where goroutines start. @@ -249,6 +249,11 @@ func UnschedulablePods() metrics.GaugeMetric { return pendingPods.With(metrics.Labels{"queue": "unschedulable"}) } +// GatedPods returns the pending pods metrics with the label gated +func GatedPods() metrics.GaugeMetric { + return pendingPods.With(metrics.Labels{"queue": "gated"}) +} + // SinceInSeconds gets the time since the specified start in seconds. func SinceInSeconds(start time.Time) float64 { return time.Since(start).Seconds() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 573cd9142ab..581c792a7cf 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -309,6 +309,10 @@ func New(client clientset.Interface, return nil, errors.New("at least one profile is required") } + preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) + for profileName, profile := range profiles { + preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() + } podQueue := internalqueue.NewSchedulingQueue( profiles[options.profiles[0].SchedulerName].QueueSortFunc(), informerFactory, @@ -317,6 +321,7 @@ func New(client clientset.Interface, internalqueue.WithPodNominator(nominator), internalqueue.WithClusterEventMap(clusterEventMap), internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), + internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), ) schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything) diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 3114f1c52aa..991ba9f1250 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -385,6 +385,14 @@ func (p *PodWrapper) Volume(volume v1.Volume) *PodWrapper { return p } +// SchedulingGates sets `gates` as additional SchedulerGates of the inner pod. +func (p *PodWrapper) SchedulingGates(gates []string) *PodWrapper { + for _, gate := range gates { + p.Spec.SchedulingGates = append(p.Spec.SchedulingGates, v1.PodSchedulingGate{Name: gate}) + } + return p +} + // PodAffinityKind represents different kinds of PodAffinity. type PodAffinityKind int diff --git a/test/instrumentation/testdata/stable-metrics-list.yaml b/test/instrumentation/testdata/stable-metrics-list.yaml index 590b81ce802..bafa5cb3eaa 100644 --- a/test/instrumentation/testdata/stable-metrics-list.yaml +++ b/test/instrumentation/testdata/stable-metrics-list.yaml @@ -49,7 +49,9 @@ subsystem: scheduler help: Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number - of pods in unschedulablePods. + of pods in unschedulablePods that the scheduler attempted to schedule and failed; + 'gated' is the number of unschedulable pods that the scheduler never attempted + to schedule because they are gated. type: Gauge stabilityLevel: STABLE labels: