mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #103493 from cofyc/fix103431
scheduler/volumebinding: migrate to use pkg/scheduler/framework/plugins/feature
This commit is contained in:
commit
c6dfe7343e
@ -311,15 +311,27 @@ func ValidateNodeAffinityArgs(path *field.Path, args *config.NodeAffinityArgs) e
|
||||
return errors.Flatten(errors.NewAggregate(errs))
|
||||
}
|
||||
|
||||
// VolumeBindingArgsValidationOptions contains the different settings for validation.
|
||||
type VolumeBindingArgsValidationOptions struct {
|
||||
AllowVolumeCapacityPriority bool
|
||||
}
|
||||
|
||||
// ValidateVolumeBindingArgs validates that VolumeBindingArgs are set correctly.
|
||||
func ValidateVolumeBindingArgs(path *field.Path, args *config.VolumeBindingArgs) error {
|
||||
return ValidateVolumeBindingArgsWithOptions(path, args, VolumeBindingArgsValidationOptions{
|
||||
AllowVolumeCapacityPriority: utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
|
||||
})
|
||||
}
|
||||
|
||||
// ValidateVolumeBindingArgs validates that VolumeBindingArgs with scheduler features.
|
||||
func ValidateVolumeBindingArgsWithOptions(path *field.Path, args *config.VolumeBindingArgs, opts VolumeBindingArgsValidationOptions) error {
|
||||
var allErrs field.ErrorList
|
||||
|
||||
if args.BindTimeoutSeconds < 0 {
|
||||
allErrs = append(allErrs, field.Invalid(path.Child("bindTimeoutSeconds"), args.BindTimeoutSeconds, "invalid BindTimeoutSeconds, should not be a negative value"))
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
|
||||
if opts.AllowVolumeCapacityPriority {
|
||||
allErrs = append(allErrs, validateFunctionShape(args.Shape, path.Child("shape"))...)
|
||||
} else if args.Shape != nil {
|
||||
// When the feature is off, return an error if the config is not nil.
|
||||
|
@ -24,4 +24,7 @@ type Features struct {
|
||||
EnablePodDisruptionBudget bool
|
||||
EnablePodOverhead bool
|
||||
EnableReadWriteOncePod bool
|
||||
EnableVolumeCapacityPriority bool
|
||||
EnableCSIStorageCapacity bool
|
||||
EnableGenericEphemeralVolume bool
|
||||
}
|
||||
|
@ -54,6 +54,9 @@ func NewInTreeRegistry() runtime.Registry {
|
||||
EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
|
||||
EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead),
|
||||
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
|
||||
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
|
||||
EnableCSIStorageCapacity: feature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity),
|
||||
EnableGenericEphemeralVolume: feature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
|
||||
}
|
||||
|
||||
return runtime.Registry{
|
||||
@ -81,7 +84,9 @@ func NewInTreeRegistry() runtime.Registry {
|
||||
noderesources.RequestedToCapacityRatioName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return noderesources.NewRequestedToCapacityRatio(plArgs, fh, fts)
|
||||
},
|
||||
volumebinding.Name: volumebinding.New,
|
||||
volumebinding.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return volumebinding.New(plArgs, fh, fts)
|
||||
},
|
||||
volumerestrictions.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return volumerestrictions.New(plArgs, fh, fts)
|
||||
},
|
||||
|
@ -27,13 +27,12 @@ import (
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
)
|
||||
@ -74,6 +73,7 @@ type VolumeBinding struct {
|
||||
PVCLister corelisters.PersistentVolumeClaimLister
|
||||
GenericEphemeralVolumeFeatureEnabled bool
|
||||
scorer volumeCapacityScorer
|
||||
fts feature.Features
|
||||
}
|
||||
|
||||
var _ framework.PreFilterPlugin = &VolumeBinding{}
|
||||
@ -109,7 +109,7 @@ func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEvent {
|
||||
// We rely on CSI node to translate in-tree PV to CSI.
|
||||
{Resource: framework.CSINode, ActionType: framework.Add | framework.Update},
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity) {
|
||||
if pl.fts.EnableCSIStorageCapacity {
|
||||
// When CSIStorageCapacity is enabled, pods may become schedulable
|
||||
// on CSI driver & storage capacity changes.
|
||||
events = append(events, []framework.ClusterEvent{
|
||||
@ -358,12 +358,14 @@ func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState
|
||||
}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
||||
args, ok := plArgs.(*config.VolumeBindingArgs)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs)
|
||||
}
|
||||
if err := validation.ValidateVolumeBindingArgs(nil, args); err != nil {
|
||||
if err := validation.ValidateVolumeBindingArgsWithOptions(nil, args, validation.VolumeBindingArgsValidationOptions{
|
||||
AllowVolumeCapacityPriority: fts.EnableVolumeCapacityPriority,
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
podInformer := fh.SharedInformerFactory().Core().V1().Pods()
|
||||
@ -373,7 +375,7 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
|
||||
csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
|
||||
var capacityCheck *CapacityCheck
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity) {
|
||||
if fts.EnableCSIStorageCapacity {
|
||||
capacityCheck = &CapacityCheck{
|
||||
CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(),
|
||||
CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1beta1().CSIStorageCapacities(),
|
||||
@ -383,7 +385,7 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
|
||||
// build score function
|
||||
var scorer volumeCapacityScorer
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
|
||||
if fts.EnableVolumeCapacityPriority {
|
||||
shape := make(helper.FunctionShape, 0, len(args.Shape))
|
||||
for _, point := range args.Shape {
|
||||
shape = append(shape, helper.FunctionShapePoint{
|
||||
@ -396,7 +398,8 @@ func New(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return &VolumeBinding{
|
||||
Binder: binder,
|
||||
PVCLister: pvcInformer.Lister(),
|
||||
GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
|
||||
GenericEphemeralVolumeFeatureEnabled: fts.EnableGenericEphemeralVolume,
|
||||
scorer: scorer,
|
||||
fts: fts,
|
||||
}, nil
|
||||
}
|
||||
|
@ -27,14 +27,11 @@ import (
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
)
|
||||
|
||||
@ -79,7 +76,7 @@ func TestVolumeBinding(t *testing.T) {
|
||||
nodes []*v1.Node
|
||||
pvcs []*v1.PersistentVolumeClaim
|
||||
pvs []*v1.PersistentVolume
|
||||
feature featuregate.Feature
|
||||
fts feature.Features
|
||||
args *config.VolumeBindingArgs
|
||||
wantPreFilterStatus *framework.Status
|
||||
wantStateAfterPreFilter *stateData
|
||||
@ -298,7 +295,9 @@ func TestVolumeBinding(t *testing.T) {
|
||||
withCapacity(resource.MustParse("100Gi")).
|
||||
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-b"}}).PersistentVolume,
|
||||
},
|
||||
feature: features.VolumeCapacityPriority,
|
||||
fts: feature.Features{
|
||||
EnableVolumeCapacityPriority: true,
|
||||
},
|
||||
wantPreFilterStatus: nil,
|
||||
wantStateAfterPreFilter: &stateData{
|
||||
boundClaims: []*v1.PersistentVolumeClaim{},
|
||||
@ -364,7 +363,9 @@ func TestVolumeBinding(t *testing.T) {
|
||||
withCapacity(resource.MustParse("100Gi")).
|
||||
withNodeAffinity(map[string][]string{v1.LabelHostname: {"node-b"}}).PersistentVolume,
|
||||
},
|
||||
feature: features.VolumeCapacityPriority,
|
||||
fts: feature.Features{
|
||||
EnableVolumeCapacityPriority: true,
|
||||
},
|
||||
wantPreFilterStatus: nil,
|
||||
wantStateAfterPreFilter: &stateData{
|
||||
boundClaims: []*v1.PersistentVolumeClaim{},
|
||||
@ -441,7 +442,9 @@ func TestVolumeBinding(t *testing.T) {
|
||||
"topology.kubernetes.io/zone": {"zone-b"},
|
||||
}).PersistentVolume,
|
||||
},
|
||||
feature: features.VolumeCapacityPriority,
|
||||
fts: feature.Features{
|
||||
EnableVolumeCapacityPriority: true,
|
||||
},
|
||||
wantPreFilterStatus: nil,
|
||||
wantStateAfterPreFilter: &stateData{
|
||||
boundClaims: []*v1.PersistentVolumeClaim{},
|
||||
@ -523,7 +526,9 @@ func TestVolumeBinding(t *testing.T) {
|
||||
"topology.kubernetes.io/zone": {"zone-b"},
|
||||
}).PersistentVolume,
|
||||
},
|
||||
feature: features.VolumeCapacityPriority,
|
||||
fts: feature.Features{
|
||||
EnableVolumeCapacityPriority: true,
|
||||
},
|
||||
args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 300,
|
||||
Shape: []config.UtilizationShapePoint{
|
||||
@ -570,9 +575,6 @@ func TestVolumeBinding(t *testing.T) {
|
||||
|
||||
for _, item := range table {
|
||||
t.Run(item.name, func(t *testing.T) {
|
||||
if item.feature != "" {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, item.feature, true)()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
client := fake.NewSimpleClientset()
|
||||
@ -592,12 +594,12 @@ func TestVolumeBinding(t *testing.T) {
|
||||
args = &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 300,
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
|
||||
if item.fts.EnableVolumeCapacityPriority {
|
||||
args.Shape = defaultShapePoint
|
||||
}
|
||||
}
|
||||
|
||||
pl, err := New(args, fh)
|
||||
pl, err := New(args, fh, item.fts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -606,7 +606,9 @@ func TestGenericScheduler(t *testing.T) {
|
||||
// Pod with existing PVC
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New),
|
||||
st.RegisterPreFilterPlugin(volumebinding.Name, func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return volumebinding.New(plArgs, fh, feature.Features{})
|
||||
}),
|
||||
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
@ -639,7 +641,9 @@ func TestGenericScheduler(t *testing.T) {
|
||||
// Pod with non existing PVC
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New),
|
||||
st.RegisterPreFilterPlugin(volumebinding.Name, func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return volumebinding.New(plArgs, fh, feature.Features{})
|
||||
}),
|
||||
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
@ -688,7 +692,9 @@ func TestGenericScheduler(t *testing.T) {
|
||||
// Pod with deleting PVC
|
||||
registerPlugins: []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New),
|
||||
st.RegisterPreFilterPlugin(volumebinding.Name, func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
|
||||
return volumebinding.New(plArgs, fh, feature.Features{})
|
||||
}),
|
||||
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user