chore(scheduler): use framework.Features in scheduler plugins

This commit is contained in:
googs1025 2025-02-25 21:28:46 +08:00
parent 566f939b19
commit 239aad8e4b
9 changed files with 53 additions and 41 deletions

View File

@ -23,6 +23,8 @@ type Features struct {
EnableDRAAdminAccess bool
EnableDynamicResourceAllocation bool
EnableVolumeCapacityPriority bool
EnableVolumeAttributesClass bool
EnableCSIMigrationPortworx bool
EnableNodeInclusionPolicyInPodTopologySpread bool
EnableMatchLabelKeysInPodTopologySpread bool
EnableInPlacePodVerticalScaling bool

View File

@ -128,6 +128,8 @@ func NewBalancedAllocation(_ context.Context, baArgs runtime.Object, h framework
handle: h,
resourceAllocationScorer: resourceAllocationScorer{
Name: BalancedAllocationName,
enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling,
enablePodLevelResources: fts.EnablePodLevelResources,
scorer: balancedResourceScorer,
useRequested: true,
resources: args.Resources,

View File

@ -21,11 +21,9 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
resourcehelper "k8s.io/component-helpers/resource"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
@ -37,6 +35,8 @@ type scorer func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer
// resourceAllocationScorer contains information to calculate resource allocation score.
type resourceAllocationScorer struct {
Name string
enableInPlacePodVerticalScaling bool
enablePodLevelResources bool
// used to decide whether to use Requested or NonZeroRequested for
// cpu and memory.
useRequested bool
@ -118,9 +118,9 @@ func (r *resourceAllocationScorer) calculateResourceAllocatableRequest(logger kl
func (r *resourceAllocationScorer) calculatePodResourceRequest(pod *v1.Pod, resourceName v1.ResourceName) int64 {
opts := resourcehelper.PodResourcesOptions{
UseStatusResources: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
UseStatusResources: r.enableInPlacePodVerticalScaling,
// SkipPodLevelResources is set to false when PodLevelResources feature is enabled.
SkipPodLevelResources: !utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources),
SkipPodLevelResources: !r.enablePodLevelResources,
}
if !r.useRequested {

View File

@ -63,6 +63,7 @@ type CSILimits struct {
scLister storagelisters.StorageClassLister
vaLister storagelisters.VolumeAttachmentLister
enableCSIMigrationPortworx bool
randomVolumeIDPrefix string
translator InTreeToCSITranslator
@ -377,7 +378,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
if err != nil {
return fmt.Errorf("looking up provisioner name for volume %s: %w", vol.Name, err)
}
if !isCSIMigrationOn(csiNode, inTreeProvisionerName) {
if !isCSIMigrationOn(csiNode, inTreeProvisionerName, pl.enableCSIMigrationPortworx) {
csiNodeName := ""
if csiNode != nil {
csiNodeName = csiNode.Name
@ -438,7 +439,7 @@ func (pl *CSILimits) getCSIDriverInfo(logger klog.Logger, csiNode *storagev1.CSI
return "", ""
}
if !isCSIMigrationOn(csiNode, pluginName) {
if !isCSIMigrationOn(csiNode, pluginName, pl.enableCSIMigrationPortworx) {
logger.V(5).Info("CSI Migration of plugin is not enabled", "plugin", pluginName)
return "", ""
}
@ -486,7 +487,7 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(logger klog.Logger, csiNode *storage
provisioner := storageClass.Provisioner
if pl.translator.IsMigratableIntreePluginByName(provisioner) {
if !isCSIMigrationOn(csiNode, provisioner) {
if !isCSIMigrationOn(csiNode, provisioner, pl.enableCSIMigrationPortworx) {
logger.V(5).Info("CSI Migration of provisioner is not enabled", "provisioner", provisioner)
return "", ""
}
@ -518,6 +519,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
pvcLister: pvcLister,
scLister: scLister,
vaLister: vaLister,
enableCSIMigrationPortworx: fts.EnableCSIMigrationPortworx,
randomVolumeIDPrefix: rand.String(32),
translator: csiTranslator,
}, nil

View File

@ -22,14 +22,12 @@ import (
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
csilibplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/features"
)
// isCSIMigrationOn returns a boolean value indicating whether
// the CSI migration has been enabled for a particular storage plugin.
func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string, enableCSIMigrationPortworx bool) bool {
if csiNode == nil || len(pluginName) == 0 {
return false
}
@ -40,7 +38,7 @@ func isCSIMigrationOn(csiNode *storagev1.CSINode, pluginName string) bool {
case csilibplugins.AWSEBSInTreePluginName:
return true
case csilibplugins.PortworxVolumePluginName:
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx) {
if !enableCSIMigrationPortworx {
return false
}
case csilibplugins.GCEPDInTreePluginName:

View File

@ -49,6 +49,8 @@ func NewInTreeRegistry() runtime.Registry {
EnableDRAAdminAccess: feature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableVolumeAttributesClass: feature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass),
EnableCSIMigrationPortworx: feature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),

View File

@ -33,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
@ -45,7 +44,7 @@ import (
csiplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
@ -204,6 +203,8 @@ type PodVolumeClaims struct {
type volumeBinder struct {
kubeClient clientset.Interface
enableVolumeAttributesClass bool
enableCSIMigrationPortworx bool
classLister storagelisters.StorageClassLister
podLister corelisters.PodLister
@ -238,6 +239,7 @@ type CapacityCheck struct {
func NewVolumeBinder(
logger klog.Logger,
kubeClient clientset.Interface,
fts feature.Features,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
@ -248,6 +250,8 @@ func NewVolumeBinder(
bindTimeout time.Duration) SchedulerVolumeBinder {
b := &volumeBinder{
kubeClient: kubeClient,
enableVolumeAttributesClass: fts.EnableVolumeAttributesClass,
enableCSIMigrationPortworx: fts.EnableCSIMigrationPortworx,
podLister: podInformer.Lister(),
classLister: storageClassInformer.Lister(),
nodeLister: nodeInformer.Lister(),
@ -855,7 +859,7 @@ func (b *volumeBinder) findMatchingVolumes(logger klog.Logger, pod *v1.Pod, clai
pvs := unboundVolumesDelayBinding[storageClassName]
// Find a matching PV
pv, err := volume.FindMatchingVolume(pvc, pvs, node, chosenPVs, true, utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass))
pv, err := volume.FindMatchingVolume(pvc, pvs, node, chosenPVs, true, b.enableVolumeAttributesClass)
if err != nil {
return false, nil, nil, err
}
@ -1033,7 +1037,7 @@ func (a byPVCSize) Less(i, j int) bool {
}
// isCSIMigrationOnForPlugin checks if CSI migration is enabled for a given plugin.
func isCSIMigrationOnForPlugin(pluginName string) bool {
func isCSIMigrationOnForPlugin(pluginName string, enableCSIMigrationPortworx bool) bool {
switch pluginName {
case csiplugins.AWSEBSInTreePluginName:
return true
@ -1044,7 +1048,7 @@ func isCSIMigrationOnForPlugin(pluginName string) bool {
case csiplugins.CinderInTreePluginName:
return true
case csiplugins.PortworxVolumePluginName:
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
return enableCSIMigrationPortworx
}
return false
}
@ -1083,7 +1087,7 @@ func (b *volumeBinder) tryTranslatePVToCSI(logger klog.Logger, pv *v1.Persistent
return nil, fmt.Errorf("could not get plugin name from pv: %v", err)
}
if !isCSIMigrationOnForPlugin(pluginName) {
if !isCSIMigrationOnForPlugin(pluginName, b.enableCSIMigrationPortworx) {
return pv, nil
}

View File

@ -45,6 +45,7 @@ import (
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
@ -167,6 +168,7 @@ func newTestBinder(t *testing.T, ctx context.Context) *testEnv {
binder := NewVolumeBinder(
logger,
client,
feature.Features{},
podInformer,
nodeInformer,
csiNodeInformer,

View File

@ -579,7 +579,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(),
CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1().CSIStorageCapacities(),
}
binder := NewVolumeBinder(klog.FromContext(ctx), fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
binder := NewVolumeBinder(klog.FromContext(ctx), fh.ClientSet(), fts, podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
// build score function
var scorer volumeCapacityScorer