diff --git a/cmd/kube-scheduler/app/server_test.go b/cmd/kube-scheduler/app/server_test.go index c4ae4ea4a99..b72b0c30817 100644 --- a/cmd/kube-scheduler/app/server_test.go +++ b/cmd/kube-scheduler/app/server_test.go @@ -191,7 +191,11 @@ profiles: {Name: "TaintToleration", Weight: 1}, {Name: "PodTopologySpread", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, } testcases := []struct { @@ -222,6 +226,10 @@ profiles: "PreScorePlugin": {{Name: "InterPodAffinity"}, {Name: "TaintToleration"}}, "QueueSortPlugin": {{Name: "PrioritySort"}}, "ScorePlugin": {{Name: "InterPodAffinity", Weight: 1}, {Name: "TaintToleration", Weight: 1}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, }, }, @@ -236,6 +244,10 @@ profiles: "profile-disable-all-filter-and-score-plugins": { "BindPlugin": {{Name: "DefaultBinder"}}, "QueueSortPlugin": {{Name: "PrioritySort"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, }, }, @@ -310,7 +322,11 @@ profiles: {Name: "TaintToleration", Weight: 1}, {Name: "PodTopologySpread", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, }, }, diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 467be48db95..2830ef6b9d5 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -11,7 +11,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/controller/volume/scheduling:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/apis/config:go_default_library", @@ -22,6 +21,7 @@ go_library( "//pkg/scheduler/framework/plugins/defaultbinder:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/queuesort:go_default_library", + "//pkg/scheduler/framework/plugins/volumebinding:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", diff --git a/pkg/scheduler/algorithmprovider/registry.go b/pkg/scheduler/algorithmprovider/registry.go index a992d63ea21..53afe5925d5 100644 --- a/pkg/scheduler/algorithmprovider/registry.go +++ b/pkg/scheduler/algorithmprovider/registry.go @@ -125,11 +125,31 @@ func getDefaultConfig() *schedulerapi.Plugins { {Name: tainttoleration.Name, Weight: 1}, }, }, + Reserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + Unreserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + PreBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, Bind: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: defaultbinder.Name}, }, }, + PostBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, } } diff --git a/pkg/scheduler/algorithmprovider/registry_test.go b/pkg/scheduler/algorithmprovider/registry_test.go index 0d1da7d636a..4e7366925bb 100644 --- a/pkg/scheduler/algorithmprovider/registry_test.go +++ b/pkg/scheduler/algorithmprovider/registry_test.go @@ -99,11 +99,31 @@ func TestClusterAutoscalerProvider(t *testing.T) { {Name: podtopologyspread.Name, Weight: 1}, }, }, + Reserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + Unreserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + PreBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, Bind: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: defaultbinder.Name}, }, }, + PostBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, } r := NewRegistry() @@ -172,11 +192,31 @@ func TestApplyFeatureGates(t *testing.T) { {Name: tainttoleration.Name, Weight: 1}, }, }, + Reserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + Unreserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + PreBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, Bind: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: defaultbinder.Name}, }, }, + PostBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, }, }, { @@ -238,11 +278,31 @@ func TestApplyFeatureGates(t *testing.T) { {Name: noderesources.ResourceLimitsName, Weight: 1}, }, }, + Reserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + Unreserve: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, + PreBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, Bind: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: defaultbinder.Name}, }, }, + PostBind: &schedulerapi.PluginSet{ + Enabled: []schedulerapi.Plugin{ + {Name: volumebinding.Name}, + }, + }, }, }, } diff --git a/pkg/scheduler/apis/config/register.go b/pkg/scheduler/apis/config/register.go index 9f41a57a4d4..c1f3c57d608 100644 --- a/pkg/scheduler/apis/config/register.go +++ b/pkg/scheduler/apis/config/register.go @@ -45,6 +45,7 @@ func addKnownTypes(scheme *runtime.Scheme) error { &PodTopologySpreadArgs{}, &RequestedToCapacityRatioArgs{}, &ServiceAffinityArgs{}, + &VolumeBindingArgs{}, ) scheme.AddKnownTypes(schema.GroupVersion{Group: "", Version: runtime.APIVersionInternal}, &Policy{}) return nil diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 501522c75cd..43f1e48da2d 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -702,7 +702,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, wantExtenders: []config.Extender{{ URLPrefix: "/prefix", @@ -805,7 +809,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, wantExtenders: []config.Extender{{ URLPrefix: "/prefix", @@ -921,7 +929,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, wantExtenders: []config.Extender{{ URLPrefix: "/prefix", @@ -1039,7 +1051,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, wantExtenders: []config.Extender{{ URLPrefix: "/prefix", @@ -1157,7 +1173,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, wantExtenders: []config.Extender{{ URLPrefix: "/prefix", @@ -1279,7 +1299,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 2}, {Name: "TaintToleration", Weight: 2}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, wantExtenders: []config.Extender{{ URLPrefix: "/prefix", @@ -1430,7 +1454,11 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { {Name: "TaintToleration", Weight: 1}, {Name: "PodTopologySpread", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, } testcases := []struct { @@ -1494,7 +1522,11 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { {Name: "TaintToleration", Weight: 1}, {Name: "PodTopologySpread", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, }, } @@ -1578,18 +1610,33 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { {Name: "TaintToleration", Weight: 1}, {Name: "PodTopologySpread", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, + } + + defaultPluginConfigs := []config.PluginConfig{ + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 600, + }, + }, } testcases := []struct { - name string - plugins config.Plugins - wantPlugins map[string][]config.Plugin - pluginConfig []config.PluginConfig + name string + plugins config.Plugins + wantPlugins map[string][]config.Plugin + pluginConfig []config.PluginConfig + wantPluginConfig []config.PluginConfig }{ { - name: "default plugins", - wantPlugins: defaultPlugins, + name: "default plugins", + wantPlugins: defaultPlugins, + wantPluginConfig: defaultPluginConfigs, }, { name: "default plugins with customized plugin config", @@ -1651,6 +1698,76 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { AntiAffinityLabelsPreference: []string{"disk", "flash"}, }, }, + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 300, + }, + }, + }, + wantPluginConfig: []config.PluginConfig{ + { + Name: "NodeResourcesFit", + Args: &config.NodeResourcesFitArgs{ + IgnoredResources: []string{"foo", "bar"}, + }, + }, + { + Name: "PodTopologySpread", + Args: &config.PodTopologySpreadArgs{ + DefaultConstraints: []v1.TopologySpreadConstraint{ + { + MaxSkew: 1, + TopologyKey: "foo", + WhenUnsatisfiable: v1.DoNotSchedule, + }, + { + MaxSkew: 10, + TopologyKey: "bar", + WhenUnsatisfiable: v1.ScheduleAnyway, + }, + }, + }, + }, + { + Name: "RequestedToCapacityRatio", + Args: &config.RequestedToCapacityRatioArgs{ + Shape: []config.UtilizationShapePoint{ + {Utilization: 5, Score: 5}, + }, + Resources: []config.ResourceSpec{ + {Name: "cpu", Weight: 10}, + }, + }, + }, + { + Name: "InterPodAffinity", + Args: &config.InterPodAffinityArgs{ + HardPodAffinityWeight: 100, + }, + }, + { + Name: "NodeLabel", + Args: &config.NodeLabelArgs{ + PresentLabels: []string{"foo", "bar"}, + AbsentLabels: []string{"apple"}, + PresentLabelsPreference: []string{"dog"}, + AbsentLabelsPreference: []string{"cat"}, + }, + }, + { + Name: "ServiceAffinity", + Args: &config.ServiceAffinityArgs{ + AffinityLabels: []string{"foo", "bar"}, + AntiAffinityLabelsPreference: []string{"disk", "flash"}, + }, + }, + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 300, + }, + }, }, }, { @@ -1704,6 +1821,26 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { {Name: "PodTopologySpread"}, }, }, + PreBind: &config.PluginSet{ + Disabled: []config.Plugin{ + {Name: "VolumeBinding"}, + }, + }, + PostBind: &config.PluginSet{ + Disabled: []config.Plugin{ + {Name: "VolumeBinding"}, + }, + }, + Reserve: &config.PluginSet{ + Disabled: []config.Plugin{ + {Name: "VolumeBinding"}, + }, + }, + Unreserve: &config.PluginSet{ + Disabled: []config.Plugin{ + {Name: "VolumeBinding"}, + }, + }, }, wantPlugins: map[string][]config.Plugin{ "QueueSortPlugin": { @@ -1824,8 +1961,13 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { {Name: "ImageLocality", Weight: 24}, {Name: "NodeResourcesBalancedAllocation", Weight: 24}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, + wantPluginConfig: defaultPluginConfigs, }, } for _, tc := range testcases { @@ -1850,7 +1992,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { if p.SchedulerName != v1.DefaultSchedulerName { t.Errorf("unexpected scheduler name (want %q, got %q)", v1.DefaultSchedulerName, p.SchedulerName) } - if diff := cmp.Diff(tc.pluginConfig, p.PluginConfig); diff != "" { + if diff := cmp.Diff(tc.wantPluginConfig, p.PluginConfig); diff != "" { t.Errorf("unexpected plugins diff (-want, +got): %s", diff) } }), diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index ea0161a015c..0b5f57f4255 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -93,6 +93,8 @@ type KubeSchedulerConfiguration struct { // Duration to wait for a binding operation to complete before timing out // Value must be non-negative integer. The value zero indicates no waiting. // If this value is nil, the default value will be used. + // DEPRECATED: BindTimeoutSeconds in deprecated. + // TODO(#90958) Remove this and the versioned counterparts in future API versions. BindTimeoutSeconds int64 // PodInitialBackoffSeconds is the initial backoff for unschedulable pods. diff --git a/pkg/scheduler/apis/config/types_pluginargs.go b/pkg/scheduler/apis/config/types_pluginargs.go index 7d5c18d4f87..3e193dd1cdd 100644 --- a/pkg/scheduler/apis/config/types_pluginargs.go +++ b/pkg/scheduler/apis/config/types_pluginargs.go @@ -115,3 +115,13 @@ type ServiceAffinityArgs struct { // AntiAffinityLabelsPreference are the labels to consider for service anti affinity scoring. AntiAffinityLabelsPreference []string } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// VolumeBindingArgs holds arguments used to configure the VolumeBinding plugin. +type VolumeBindingArgs struct { + metav1.TypeMeta + + // BindTimeoutSeconds is the timeout in seconds in volume binding. + BindTimeoutSeconds int64 +} diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index cf261eefbb4..98546873e87 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -863,3 +863,28 @@ func (in *UtilizationShapePoint) DeepCopy() *UtilizationShapePoint { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VolumeBindingArgs) DeepCopyInto(out *VolumeBindingArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeBindingArgs. +func (in *VolumeBindingArgs) DeepCopy() *VolumeBindingArgs { + if in == nil { + return nil + } + out := new(VolumeBindingArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *VolumeBindingArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 0b4f6d99011..543236b9ff6 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -206,10 +206,6 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { if err := sched.SchedulingQueue.Delete(pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) } - if sched.VolumeBinder != nil { - // Volume binder only wants to keep unassigned pods - sched.VolumeBinder.DeletePodBindings(pod) - } prof, err := sched.profileForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index a5fe324a8e1..ab1fb33b07f 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -40,7 +40,6 @@ import ( policylisters "k8s.io/client-go/listers/policy/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller/volume/scheduling" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -50,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" @@ -83,9 +83,6 @@ type Configurator struct { schedulerCache internalcache.Cache - // Handles volume binding decisions - volumeBinder scheduling.SchedulerVolumeBinder - // Disable pod preemption or not. disablePreemption bool @@ -120,7 +117,6 @@ func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (fram framework.WithInformerFactory(c.informerFactory), framework.WithSnapshotSharedLister(c.nodeInfoSnapshot), framework.WithRunAllFilters(c.alwaysCheckAllPredicates), - framework.WithVolumeBinder(c.volumeBinder), ) } @@ -211,11 +207,30 @@ func (c *Configurator) create() (*Scheduler, error) { NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache), StopEverything: c.StopEverything, - VolumeBinder: c.volumeBinder, SchedulingQueue: podQueue, }, nil } +func maybeAppendVolumeBindingArgs(plugins *schedulerapi.Plugins, pcs []schedulerapi.PluginConfig, config schedulerapi.PluginConfig) []schedulerapi.PluginConfig { + enabled := false + for _, p := range plugins.PreBind.Enabled { + if p.Name == volumebinding.Name { + enabled = true + } + } + if !enabled { + // skip if VolumeBinding is not enabled + return pcs + } + // append if not exist + for _, pc := range pcs { + if pc.Name == config.Name { + return pcs + } + } + return append(pcs, config) +} + // createFromProvider creates a scheduler from the name of a registered algorithm provider. func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) { klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) @@ -231,6 +246,12 @@ func (c *Configurator) createFromProvider(providerName string) (*Scheduler, erro plugins.Append(defaultPlugins) plugins.Apply(prof.Plugins) prof.Plugins = plugins + prof.PluginConfig = maybeAppendVolumeBindingArgs(prof.Plugins, prof.PluginConfig, schedulerapi.PluginConfig{ + Name: volumebinding.Name, + Args: &schedulerapi.VolumeBindingArgs{ + BindTimeoutSeconds: c.bindTimeoutSeconds, + }, + }) } return c.create() } @@ -326,6 +347,12 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, // PluginConfig is ignored when using Policy. prof.PluginConfig = defPluginConfig + prof.PluginConfig = maybeAppendVolumeBindingArgs(prof.Plugins, prof.PluginConfig, schedulerapi.PluginConfig{ + Name: volumebinding.Name, + Args: &schedulerapi.VolumeBindingArgs{ + BindTimeoutSeconds: c.bindTimeoutSeconds, + }, + }) } return c.create() diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 39aa52a4d25..1c5760631e7 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/events" extenderv1 "k8s.io/kube-scheduler/extender/v1" apicore "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" @@ -45,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -241,8 +243,16 @@ func TestCreateFromEmptyConfig(t *testing.T) { t.Fatal(err) } prof := factory.profiles[0] - if len(prof.PluginConfig) != 0 { - t.Errorf("got plugin config %s, want none", prof.PluginConfig) + wantConfig := []schedulerapi.PluginConfig{ + { + Name: volumebinding.Name, + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: bindTimeoutSeconds, + }, + }, + } + if diff := cmp.Diff(wantConfig, prof.PluginConfig); diff != "" { + t.Errorf("wrong plugin config (-want, +got): %s", diff) } } diff --git a/pkg/scheduler/framework/plugins/legacy_registry.go b/pkg/scheduler/framework/plugins/legacy_registry.go index 02818da6f2e..23739b96660 100644 --- a/pkg/scheduler/framework/plugins/legacy_registry.go +++ b/pkg/scheduler/framework/plugins/legacy_registry.go @@ -271,6 +271,10 @@ func NewLegacyRegistry() *LegacyRegistry { registry.registerPredicateConfigProducer(CheckVolumeBindingPred, func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil) + plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil) + plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil) + plugins.Unreserve = appendToPluginSet(plugins.Unreserve, volumebinding.Name, nil) + plugins.PostBind = appendToPluginSet(plugins.PostBind, volumebinding.Name, nil) return }) registry.registerPredicateConfigProducer(NoDiskConflictPred, diff --git a/pkg/scheduler/framework/plugins/volumebinding/BUILD b/pkg/scheduler/framework/plugins/volumebinding/BUILD index 33ed9d13323..32f95b7e4a6 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/BUILD +++ b/pkg/scheduler/framework/plugins/volumebinding/BUILD @@ -7,9 +7,13 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/controller/volume/scheduling:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index f1f1b105f4a..69745fe8765 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -18,19 +18,51 @@ package volumebinding import ( "context" + "fmt" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" "k8s.io/kubernetes/pkg/controller/volume/scheduling" + "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) +const ( + // DefaultBindTimeoutSeconds defines the default bind timeout in seconds + DefaultBindTimeoutSeconds = 600 + + allBoundStateKey framework.StateKey = "volumebinding:all-bound" +) + +type stateData struct { + allBound bool +} + +func (d stateData) Clone() framework.StateData { + return d +} + // VolumeBinding is a plugin that binds pod volumes in scheduling. +// In the Filter phase, pod binding cache is created for the pod and used in +// Reserve and PreBind phases. Pod binding cache will be cleared at +// Unreserve and PostBind extension points. However, if pod fails before +// the Reserve phase and is deleted from the apiserver later, its pod binding +// cache cannot be cleared at plugin extension points. We register an +// event handler to clear pod binding cache when the pod is deleted to +// prevent memory leaking. type VolumeBinding struct { - binder scheduling.SchedulerVolumeBinder + Binder scheduling.SchedulerVolumeBinder } var _ framework.FilterPlugin = &VolumeBinding{} +var _ framework.ReservePlugin = &VolumeBinding{} +var _ framework.PreBindPlugin = &VolumeBinding{} +var _ framework.UnreservePlugin = &VolumeBinding{} +var _ framework.PostBindPlugin = &VolumeBinding{} // Name is the name of the plugin used in Registry and configurations. const Name = "VolumeBinding" @@ -71,7 +103,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p return nil } - reasons, err := pl.binder.FindPodVolumes(pod, node) + reasons, err := pl.Binder.FindPodVolumes(pod, node) if err != nil { return framework.NewStatus(framework.Error, err.Error()) @@ -87,9 +119,105 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p return nil } -// New initializes a new plugin with volume binder and returns it. -func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) { +// Reserve reserves volumes of pod and saves binding status in cycle state. +func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + cs.Write(allBoundStateKey, stateData{allBound: allBound}) + return nil +} + +// PreBind will make the API update with the assumed bindings and wait until +// the PV controller has completely finished the binding operation. +// +// If binding errors, times out or gets undone, then an error will be returned to +// retry scheduling. +func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + state, err := cs.Read(allBoundStateKey) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + s, ok := state.(stateData) + if !ok { + return framework.NewStatus(framework.Error, "unable to convert state into stateData") + } + if s.allBound { + // no need to bind volumes + return nil + } + klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", pod.Namespace, pod.Name) + err = pl.Binder.BindPodVolumes(pod) + if err != nil { + klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", pod.Namespace, pod.Name, err) + return framework.NewStatus(framework.Error, err.Error()) + } + klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", pod.Namespace, pod.Name) + return nil +} + +// Unreserve clears pod binding state. +// TODO(#90962) Revert assumed PV/PVC cache +func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { + pl.Binder.DeletePodBindings(pod) + return +} + +// PostBind is called after a pod is successfully bound. +func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { + pl.Binder.DeletePodBindings(pod) + return +} + +// New initializes a new plugin and returns it. +func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) { + args, ok := plArgs.(*config.VolumeBindingArgs) + if !ok { + return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs) + } + if err := validateArgs(args); err != nil { + return nil, err + } + nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes() + pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims() + pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes() + storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses() + csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes() + binder := scheduling.NewVolumeBinder(fh.ClientSet(), nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) + // TODO(#90962) Because pod volume binding cache in SchedulerVolumeBinder is + // used only in current scheduling cycle, we can share it via + // framework.CycleState, then we don't need to register this event handler + // and Unreserve/PostBind extension points to clear pod volume binding + // cache. + fh.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = obj.(*v1.Pod) + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) + return + } + default: + utilruntime.HandleError(fmt.Errorf("unable to handle object %T", obj)) + return + } + binder.DeletePodBindings(pod) + }, + }) return &VolumeBinding{ - binder: fh.VolumeBinder(), + Binder: binder, }, nil } + +func validateArgs(args *config.VolumeBindingArgs) error { + if args.BindTimeoutSeconds <= 0 { + return fmt.Errorf("invalid BindTimeoutSeconds: %d, must be positive integer", args.BindTimeoutSeconds) + } + return nil +} diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go index 35cdef2a9ad..5756b1e00fc 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding_test.go @@ -102,7 +102,7 @@ func TestVolumeBinding(t *testing.T) { nodeInfo.SetNode(item.node) fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig) p := &VolumeBinding{ - binder: fakeVolumeBinder, + Binder: fakeVolumeBinder, } gotStatus := p.Filter(context.Background(), nil, item.pod, nodeInfo) if !reflect.DeepEqual(gotStatus, item.wantStatus) { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index efac50e1122..b78d0397c8b 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -34,11 +34,11 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller/volume/scheduling" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/pkg/scheduler/core" frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -47,8 +47,6 @@ import ( ) const ( - // BindTimeoutSeconds defines the default bind timeout - BindTimeoutSeconds = 100 // SchedulerError is the reason recorded for events when an error occurs during scheduling a pod. SchedulerError = "SchedulerError" // Percentage of plugin metrics to be sampled. @@ -101,9 +99,6 @@ type Scheduler struct { // Close this to shut down the scheduler. StopEverything <-chan struct{} - // VolumeBinder handles PVC/PV binding for the pod. - VolumeBinder scheduling.SchedulerVolumeBinder - // Disable pod preemption or not. DisablePreemption bool @@ -223,7 +218,7 @@ var defaultSchedulerOptions = schedulerOptions{ }, disablePreemption: false, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - bindTimeoutSeconds: BindTimeoutSeconds, + bindTimeoutSeconds: volumebinding.DefaultBindTimeoutSeconds, podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), } @@ -247,15 +242,6 @@ func New(client clientset.Interface, } schedulerCache := internalcache.New(30*time.Second, stopEverything) - volumeBinder := scheduling.NewVolumeBinder( - client, - informerFactory.Core().V1().Nodes(), - informerFactory.Storage().V1().CSINodes(), - informerFactory.Core().V1().PersistentVolumeClaims(), - informerFactory.Core().V1().PersistentVolumes(), - informerFactory.Storage().V1().StorageClasses(), - time.Duration(options.bindTimeoutSeconds)*time.Second, - ) registry := frameworkplugins.NewInTreeRegistry() if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { @@ -269,7 +255,6 @@ func New(client clientset.Interface, recorderFactory: recorderFactory, informerFactory: informerFactory, podInformer: podInformer, - volumeBinder: volumeBinder, schedulerCache: schedulerCache, StopEverything: stopEverything, disablePreemption: options.disablePreemption, @@ -455,29 +440,6 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat return nodeName, err } -// bindVolumes will make the API update with the assumed bindings and wait until -// the PV controller has completely finished the binding operation. -// -// If binding errors, times out or gets undone, then an error will be returned to -// retry scheduling. -func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { - klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - err := sched.VolumeBinder.BindPodVolumes(assumed) - if err != nil { - klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) - - // Unassume the Pod and retry scheduling - if forgetErr := sched.SchedulerCache.ForgetPod(assumed); forgetErr != nil { - klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) - } - - return err - } - - klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) - return nil -} - // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. // assume modifies `assumed`. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { @@ -617,21 +579,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod - // Assume volumes first before assuming the pod. - // - // If all volumes are completely bound, then allBound is true and binding will be skipped. - // - // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. - // - // This function modifies 'assumedPod' if volume binding is required. - allBound, err := sched.VolumeBinder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost) - if err != nil { - sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, - fmt.Sprintf("AssumePodVolumes failed: %v", err)) - metrics.PodScheduleErrors.Inc() - return - } - // Run "reserve" plugins. if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) @@ -700,18 +647,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { return } - // Bind volumes first before Pod - if !allBound { - err := sched.bindVolumes(assumedPod) - if err != nil { - sched.recordSchedulingFailure(prof, assumedPodInfo, err, "VolumeBindingFailed", err.Error()) - metrics.PodScheduleErrors.Inc() - // trigger un-reserve plugins to clean up state associated with the reserved Pod - prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - return - } - } - // Run "prebind" plugins. preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9413e6819db..6f9cfef0ff3 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -41,12 +41,14 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clienttesting "k8s.io/client-go/testing" clientcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/controller/volume/scheduling" + "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" @@ -348,7 +350,6 @@ func TestSchedulerScheduleOne(t *testing.T) { Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName), }, }, - VolumeBinder: scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true}), } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { @@ -671,7 +672,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { - scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...) + scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -756,7 +757,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"), } - scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...) + scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -783,11 +784,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, volumeBinder scheduling.SchedulerVolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { - if volumeBinder == nil { - // Create default volume binder if it didn't set. - volumeBinder = scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true}) - } +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { bindingChan := make(chan *v1.Binding, 1) client := clientsetfake.NewSimpleClientset() client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { @@ -799,7 +796,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C return true, b, nil }) - fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(volumeBinder)) + fwk, _ := st.NewFramework(fns, framework.WithClientSet(client)) prof := &profile.Profile{ Framework: fwk, Recorder: &events.FakeRecorder{}, @@ -835,7 +832,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Profiles: profiles, podConditionUpdater: fakePodConditionUpdater{}, podPreemptor: fakePodPreemptor{}, - VolumeBinder: volumeBinder, } return sched, bindingChan, errChan @@ -858,12 +854,13 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume fns := []st.RegisterPluginFunc{ st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New), + st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) { + return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil + }, "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"), } - s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, volumeBinder, fns...) + s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) - s.VolumeBinder = volumeBinder return s, bindingChan, errChan } @@ -952,7 +949,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { }, expectAssumeCalled: true, eventReason: "FailedScheduling", - expectError: assumeErr, + expectError: fmt.Errorf("error while running %q reserve plugin for pod %q: %v", volumebinding.Name, "foo", assumeErr), }, { name: "bind error", @@ -962,7 +959,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { expectAssumeCalled: true, expectBindCalled: true, eventReason: "FailedScheduling", - expectError: bindErr, + expectError: fmt.Errorf("error while running %q prebind plugin for pod %q: %v", volumebinding.Name, "foo", bindErr), }, } @@ -1200,3 +1197,122 @@ func TestSchedulerBinding(t *testing.T) { }) } } + +// TestInjectingPluginConfigForVolumeBinding tests injecting +// KubeSchedulerConfiguration.BindTimeoutSeconds as args for VolumeBinding if +// no plugin args is configured for it. +// TODO remove when KubeSchedulerConfiguration.BindTimeoutSeconds is eliminated +func TestInjectingPluginConfigForVolumeBinding(t *testing.T) { + defaultPluginConfigs := []config.PluginConfig{ + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 600, + }, + }, + } + + tests := []struct { + name string + opts []Option + wantPluginConfig []config.PluginConfig + }{ + { + name: "default with provider", + wantPluginConfig: defaultPluginConfigs, + }, + { + name: "default with policy", + opts: []Option{ + WithAlgorithmSource(schedulerapi.SchedulerAlgorithmSource{ + Policy: &config.SchedulerPolicySource{}, + }), + }, + wantPluginConfig: defaultPluginConfigs, + }, + { + name: "customize BindTimeoutSeconds with provider", + opts: []Option{ + WithBindTimeoutSeconds(100), + }, + wantPluginConfig: []config.PluginConfig{ + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 100, + }, + }, + }, + }, + { + name: "customize BindTimeoutSeconds with policy", + opts: []Option{ + WithAlgorithmSource(schedulerapi.SchedulerAlgorithmSource{ + Policy: &config.SchedulerPolicySource{}, + }), + WithBindTimeoutSeconds(100), + }, + wantPluginConfig: []config.PluginConfig{ + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 100, + }, + }, + }, + }, + { + name: "PluginConfig is preferred", + opts: []Option{ + WithBindTimeoutSeconds(100), + WithProfiles(config.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + PluginConfig: []config.PluginConfig{ + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 200, + }, + }, + }, + }), + }, + wantPluginConfig: []config.PluginConfig{ + { + Name: "VolumeBinding", + Args: &config.VolumeBindingArgs{ + BindTimeoutSeconds: 200, + }, + }, + }, + }, + } + + for _, tt := range tests { + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) + + opts := append(tt.opts, WithBuildFrameworkCapturer(func(p config.KubeSchedulerProfile) { + if p.SchedulerName != v1.DefaultSchedulerName { + t.Errorf("unexpected scheduler name (want %q, got %q)", v1.DefaultSchedulerName, p.SchedulerName) + } + if diff := cmp.Diff(tt.wantPluginConfig, p.PluginConfig); diff != "" { + t.Errorf("unexpected plugins diff (-want, +got): %s", diff) + } + })) + + _, err := New( + client, + informerFactory, + informerFactory.Core().V1().Pods(), + recorderFactory, + make(chan struct{}), + opts..., + ) + + if err != nil { + t.Fatalf("Error constructing: %v", err) + } + } +} diff --git a/pkg/scheduler/testing/framework_helpers.go b/pkg/scheduler/testing/framework_helpers.go index e5ee24df820..42ee5d39f46 100644 --- a/pkg/scheduler/testing/framework_helpers.go +++ b/pkg/scheduler/testing/framework_helpers.go @@ -99,8 +99,14 @@ func getPluginSetByExtension(plugins *schedulerapi.Plugins, extension string) *s return initializeIfNeeded(&plugins.Bind) case "Reserve": return initializeIfNeeded(&plugins.Reserve) + case "Unreserve": + return initializeIfNeeded(&plugins.Unreserve) case "Permit": return initializeIfNeeded(&plugins.Permit) + case "PreBind": + return initializeIfNeeded(&plugins.PreBind) + case "PostBind": + return initializeIfNeeded(&plugins.PostBind) default: return nil } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index e5b9e087227..7f94b10fca3 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -143,7 +143,11 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) { {Name: "DefaultPodTopologySpread", Weight: 1}, {Name: "TaintToleration", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, }, { @@ -233,7 +237,11 @@ kind: Policy {Name: "DefaultPodTopologySpread", Weight: 1}, {Name: "TaintToleration", Weight: 1}, }, - "BindPlugin": {{Name: "DefaultBinder"}}, + "ReservePlugin": {{Name: "VolumeBinding"}}, + "UnreservePlugin": {{Name: "VolumeBinding"}}, + "PreBindPlugin": {{Name: "VolumeBinding"}}, + "BindPlugin": {{Name: "DefaultBinder"}}, + "PostBindPlugin": {{Name: "VolumeBinding"}}, }, }, {