mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #83726 from cofyc/fix56180
scheduler: Move all volume binding code into VolumeBinding plugin
This commit is contained in:
commit
5bda0c1b3b
@ -191,7 +191,11 @@ profiles:
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
{Name: "PodTopologySpread", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
}
|
||||
|
||||
testcases := []struct {
|
||||
@ -222,6 +226,10 @@ profiles:
|
||||
"PreScorePlugin": {{Name: "InterPodAffinity"}, {Name: "TaintToleration"}},
|
||||
"QueueSortPlugin": {{Name: "PrioritySort"}},
|
||||
"ScorePlugin": {{Name: "InterPodAffinity", Weight: 1}, {Name: "TaintToleration", Weight: 1}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -236,6 +244,10 @@ profiles:
|
||||
"profile-disable-all-filter-and-score-plugins": {
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"QueueSortPlugin": {{Name: "PrioritySort"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -310,7 +322,11 @@ profiles:
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
{Name: "PodTopologySpread", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -11,7 +11,6 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/controller/volume/scheduling:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/algorithmprovider:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
@ -22,6 +21,7 @@ go_library(
|
||||
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/cache/debugger:go_default_library",
|
||||
|
@ -125,11 +125,31 @@ func getDefaultConfig() *schedulerapi.Plugins {
|
||||
{Name: tainttoleration.Name, Weight: 1},
|
||||
},
|
||||
},
|
||||
Reserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Unreserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
PreBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Bind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: defaultbinder.Name},
|
||||
},
|
||||
},
|
||||
PostBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,11 +99,31 @@ func TestClusterAutoscalerProvider(t *testing.T) {
|
||||
{Name: podtopologyspread.Name, Weight: 1},
|
||||
},
|
||||
},
|
||||
Reserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Unreserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
PreBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Bind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: defaultbinder.Name},
|
||||
},
|
||||
},
|
||||
PostBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
r := NewRegistry()
|
||||
@ -172,11 +192,31 @@ func TestApplyFeatureGates(t *testing.T) {
|
||||
{Name: tainttoleration.Name, Weight: 1},
|
||||
},
|
||||
},
|
||||
Reserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Unreserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
PreBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Bind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: defaultbinder.Name},
|
||||
},
|
||||
},
|
||||
PostBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -238,11 +278,31 @@ func TestApplyFeatureGates(t *testing.T) {
|
||||
{Name: noderesources.ResourceLimitsName, Weight: 1},
|
||||
},
|
||||
},
|
||||
Reserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Unreserve: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
PreBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
Bind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: defaultbinder.Name},
|
||||
},
|
||||
},
|
||||
PostBind: &schedulerapi.PluginSet{
|
||||
Enabled: []schedulerapi.Plugin{
|
||||
{Name: volumebinding.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -45,6 +45,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
|
||||
&PodTopologySpreadArgs{},
|
||||
&RequestedToCapacityRatioArgs{},
|
||||
&ServiceAffinityArgs{},
|
||||
&VolumeBindingArgs{},
|
||||
)
|
||||
scheme.AddKnownTypes(schema.GroupVersion{Group: "", Version: runtime.APIVersionInternal}, &Policy{})
|
||||
return nil
|
||||
|
@ -702,7 +702,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 2},
|
||||
{Name: "TaintToleration", Weight: 2},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantExtenders: []config.Extender{{
|
||||
URLPrefix: "/prefix",
|
||||
@ -805,7 +809,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 2},
|
||||
{Name: "TaintToleration", Weight: 2},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantExtenders: []config.Extender{{
|
||||
URLPrefix: "/prefix",
|
||||
@ -921,7 +929,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 2},
|
||||
{Name: "TaintToleration", Weight: 2},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantExtenders: []config.Extender{{
|
||||
URLPrefix: "/prefix",
|
||||
@ -1039,7 +1051,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 2},
|
||||
{Name: "TaintToleration", Weight: 2},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantExtenders: []config.Extender{{
|
||||
URLPrefix: "/prefix",
|
||||
@ -1157,7 +1173,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 2},
|
||||
{Name: "TaintToleration", Weight: 2},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantExtenders: []config.Extender{{
|
||||
URLPrefix: "/prefix",
|
||||
@ -1279,7 +1299,11 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 2},
|
||||
{Name: "TaintToleration", Weight: 2},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantExtenders: []config.Extender{{
|
||||
URLPrefix: "/prefix",
|
||||
@ -1430,7 +1454,11 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
{Name: "PodTopologySpread", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
}
|
||||
|
||||
testcases := []struct {
|
||||
@ -1494,7 +1522,11 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
{Name: "PodTopologySpread", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -1578,18 +1610,33 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
{Name: "PodTopologySpread", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
}
|
||||
|
||||
defaultPluginConfigs := []config.PluginConfig{
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 600,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
plugins config.Plugins
|
||||
wantPlugins map[string][]config.Plugin
|
||||
pluginConfig []config.PluginConfig
|
||||
name string
|
||||
plugins config.Plugins
|
||||
wantPlugins map[string][]config.Plugin
|
||||
pluginConfig []config.PluginConfig
|
||||
wantPluginConfig []config.PluginConfig
|
||||
}{
|
||||
{
|
||||
name: "default plugins",
|
||||
wantPlugins: defaultPlugins,
|
||||
name: "default plugins",
|
||||
wantPlugins: defaultPlugins,
|
||||
wantPluginConfig: defaultPluginConfigs,
|
||||
},
|
||||
{
|
||||
name: "default plugins with customized plugin config",
|
||||
@ -1651,6 +1698,76 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
|
||||
AntiAffinityLabelsPreference: []string{"disk", "flash"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 300,
|
||||
},
|
||||
},
|
||||
},
|
||||
wantPluginConfig: []config.PluginConfig{
|
||||
{
|
||||
Name: "NodeResourcesFit",
|
||||
Args: &config.NodeResourcesFitArgs{
|
||||
IgnoredResources: []string{"foo", "bar"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "PodTopologySpread",
|
||||
Args: &config.PodTopologySpreadArgs{
|
||||
DefaultConstraints: []v1.TopologySpreadConstraint{
|
||||
{
|
||||
MaxSkew: 1,
|
||||
TopologyKey: "foo",
|
||||
WhenUnsatisfiable: v1.DoNotSchedule,
|
||||
},
|
||||
{
|
||||
MaxSkew: 10,
|
||||
TopologyKey: "bar",
|
||||
WhenUnsatisfiable: v1.ScheduleAnyway,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "RequestedToCapacityRatio",
|
||||
Args: &config.RequestedToCapacityRatioArgs{
|
||||
Shape: []config.UtilizationShapePoint{
|
||||
{Utilization: 5, Score: 5},
|
||||
},
|
||||
Resources: []config.ResourceSpec{
|
||||
{Name: "cpu", Weight: 10},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "InterPodAffinity",
|
||||
Args: &config.InterPodAffinityArgs{
|
||||
HardPodAffinityWeight: 100,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "NodeLabel",
|
||||
Args: &config.NodeLabelArgs{
|
||||
PresentLabels: []string{"foo", "bar"},
|
||||
AbsentLabels: []string{"apple"},
|
||||
PresentLabelsPreference: []string{"dog"},
|
||||
AbsentLabelsPreference: []string{"cat"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "ServiceAffinity",
|
||||
Args: &config.ServiceAffinityArgs{
|
||||
AffinityLabels: []string{"foo", "bar"},
|
||||
AntiAffinityLabelsPreference: []string{"disk", "flash"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 300,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -1704,6 +1821,26 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
|
||||
{Name: "PodTopologySpread"},
|
||||
},
|
||||
},
|
||||
PreBind: &config.PluginSet{
|
||||
Disabled: []config.Plugin{
|
||||
{Name: "VolumeBinding"},
|
||||
},
|
||||
},
|
||||
PostBind: &config.PluginSet{
|
||||
Disabled: []config.Plugin{
|
||||
{Name: "VolumeBinding"},
|
||||
},
|
||||
},
|
||||
Reserve: &config.PluginSet{
|
||||
Disabled: []config.Plugin{
|
||||
{Name: "VolumeBinding"},
|
||||
},
|
||||
},
|
||||
Unreserve: &config.PluginSet{
|
||||
Disabled: []config.Plugin{
|
||||
{Name: "VolumeBinding"},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantPlugins: map[string][]config.Plugin{
|
||||
"QueueSortPlugin": {
|
||||
@ -1824,8 +1961,13 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
|
||||
{Name: "ImageLocality", Weight: 24},
|
||||
{Name: "NodeResourcesBalancedAllocation", Weight: 24},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
wantPluginConfig: defaultPluginConfigs,
|
||||
},
|
||||
}
|
||||
for _, tc := range testcases {
|
||||
@ -1850,7 +1992,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) {
|
||||
if p.SchedulerName != v1.DefaultSchedulerName {
|
||||
t.Errorf("unexpected scheduler name (want %q, got %q)", v1.DefaultSchedulerName, p.SchedulerName)
|
||||
}
|
||||
if diff := cmp.Diff(tc.pluginConfig, p.PluginConfig); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantPluginConfig, p.PluginConfig); diff != "" {
|
||||
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
|
||||
}
|
||||
}),
|
||||
|
@ -93,6 +93,8 @@ type KubeSchedulerConfiguration struct {
|
||||
// Duration to wait for a binding operation to complete before timing out
|
||||
// Value must be non-negative integer. The value zero indicates no waiting.
|
||||
// If this value is nil, the default value will be used.
|
||||
// DEPRECATED: BindTimeoutSeconds in deprecated.
|
||||
// TODO(#90958) Remove this and the versioned counterparts in future API versions.
|
||||
BindTimeoutSeconds int64
|
||||
|
||||
// PodInitialBackoffSeconds is the initial backoff for unschedulable pods.
|
||||
|
@ -115,3 +115,13 @@ type ServiceAffinityArgs struct {
|
||||
// AntiAffinityLabelsPreference are the labels to consider for service anti affinity scoring.
|
||||
AntiAffinityLabelsPreference []string
|
||||
}
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
// VolumeBindingArgs holds arguments used to configure the VolumeBinding plugin.
|
||||
type VolumeBindingArgs struct {
|
||||
metav1.TypeMeta
|
||||
|
||||
// BindTimeoutSeconds is the timeout in seconds in volume binding.
|
||||
BindTimeoutSeconds int64
|
||||
}
|
||||
|
25
pkg/scheduler/apis/config/zz_generated.deepcopy.go
generated
25
pkg/scheduler/apis/config/zz_generated.deepcopy.go
generated
@ -863,3 +863,28 @@ func (in *UtilizationShapePoint) DeepCopy() *UtilizationShapePoint {
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *VolumeBindingArgs) DeepCopyInto(out *VolumeBindingArgs) {
|
||||
*out = *in
|
||||
out.TypeMeta = in.TypeMeta
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeBindingArgs.
|
||||
func (in *VolumeBindingArgs) DeepCopy() *VolumeBindingArgs {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(VolumeBindingArgs)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
|
||||
func (in *VolumeBindingArgs) DeepCopyObject() runtime.Object {
|
||||
if c := in.DeepCopy(); c != nil {
|
||||
return c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -206,10 +206,6 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
|
||||
if err := sched.SchedulingQueue.Delete(pod); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
|
||||
}
|
||||
if sched.VolumeBinder != nil {
|
||||
// Volume binder only wants to keep unassigned pods
|
||||
sched.VolumeBinder.DeletePodBindings(pod)
|
||||
}
|
||||
prof, err := sched.profileForPod(pod)
|
||||
if err != nil {
|
||||
// This shouldn't happen, because we only accept for scheduling the pods
|
||||
|
@ -40,7 +40,6 @@ import (
|
||||
policylisters "k8s.io/client-go/listers/policy/v1beta1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
@ -50,6 +49,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
|
||||
@ -83,9 +83,6 @@ type Configurator struct {
|
||||
|
||||
schedulerCache internalcache.Cache
|
||||
|
||||
// Handles volume binding decisions
|
||||
volumeBinder scheduling.SchedulerVolumeBinder
|
||||
|
||||
// Disable pod preemption or not.
|
||||
disablePreemption bool
|
||||
|
||||
@ -120,7 +117,6 @@ func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (fram
|
||||
framework.WithInformerFactory(c.informerFactory),
|
||||
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
|
||||
framework.WithRunAllFilters(c.alwaysCheckAllPredicates),
|
||||
framework.WithVolumeBinder(c.volumeBinder),
|
||||
)
|
||||
}
|
||||
|
||||
@ -211,11 +207,30 @@ func (c *Configurator) create() (*Scheduler, error) {
|
||||
NextPod: internalqueue.MakeNextPodFunc(podQueue),
|
||||
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
|
||||
StopEverything: c.StopEverything,
|
||||
VolumeBinder: c.volumeBinder,
|
||||
SchedulingQueue: podQueue,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func maybeAppendVolumeBindingArgs(plugins *schedulerapi.Plugins, pcs []schedulerapi.PluginConfig, config schedulerapi.PluginConfig) []schedulerapi.PluginConfig {
|
||||
enabled := false
|
||||
for _, p := range plugins.PreBind.Enabled {
|
||||
if p.Name == volumebinding.Name {
|
||||
enabled = true
|
||||
}
|
||||
}
|
||||
if !enabled {
|
||||
// skip if VolumeBinding is not enabled
|
||||
return pcs
|
||||
}
|
||||
// append if not exist
|
||||
for _, pc := range pcs {
|
||||
if pc.Name == config.Name {
|
||||
return pcs
|
||||
}
|
||||
}
|
||||
return append(pcs, config)
|
||||
}
|
||||
|
||||
// createFromProvider creates a scheduler from the name of a registered algorithm provider.
|
||||
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
|
||||
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
|
||||
@ -231,6 +246,12 @@ func (c *Configurator) createFromProvider(providerName string) (*Scheduler, erro
|
||||
plugins.Append(defaultPlugins)
|
||||
plugins.Apply(prof.Plugins)
|
||||
prof.Plugins = plugins
|
||||
prof.PluginConfig = maybeAppendVolumeBindingArgs(prof.Plugins, prof.PluginConfig, schedulerapi.PluginConfig{
|
||||
Name: volumebinding.Name,
|
||||
Args: &schedulerapi.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: c.bindTimeoutSeconds,
|
||||
},
|
||||
})
|
||||
}
|
||||
return c.create()
|
||||
}
|
||||
@ -326,6 +347,12 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
|
||||
|
||||
// PluginConfig is ignored when using Policy.
|
||||
prof.PluginConfig = defPluginConfig
|
||||
prof.PluginConfig = maybeAppendVolumeBindingArgs(prof.Plugins, prof.PluginConfig, schedulerapi.PluginConfig{
|
||||
Name: volumebinding.Name,
|
||||
Args: &schedulerapi.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: c.bindTimeoutSeconds,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return c.create()
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"k8s.io/client-go/tools/events"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
apicore "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
@ -45,6 +46,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
@ -241,8 +243,16 @@ func TestCreateFromEmptyConfig(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
prof := factory.profiles[0]
|
||||
if len(prof.PluginConfig) != 0 {
|
||||
t.Errorf("got plugin config %s, want none", prof.PluginConfig)
|
||||
wantConfig := []schedulerapi.PluginConfig{
|
||||
{
|
||||
Name: volumebinding.Name,
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: bindTimeoutSeconds,
|
||||
},
|
||||
},
|
||||
}
|
||||
if diff := cmp.Diff(wantConfig, prof.PluginConfig); diff != "" {
|
||||
t.Errorf("wrong plugin config (-want, +got): %s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -271,6 +271,10 @@ func NewLegacyRegistry() *LegacyRegistry {
|
||||
registry.registerPredicateConfigProducer(CheckVolumeBindingPred,
|
||||
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
|
||||
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
|
||||
plugins.Reserve = appendToPluginSet(plugins.Reserve, volumebinding.Name, nil)
|
||||
plugins.PreBind = appendToPluginSet(plugins.PreBind, volumebinding.Name, nil)
|
||||
plugins.Unreserve = appendToPluginSet(plugins.Unreserve, volumebinding.Name, nil)
|
||||
plugins.PostBind = appendToPluginSet(plugins.PostBind, volumebinding.Name, nil)
|
||||
return
|
||||
})
|
||||
registry.registerPredicateConfigProducer(NoDiskConflictPred,
|
||||
|
@ -7,9 +7,13 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/controller/volume/scheduling:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -18,19 +18,51 @@ package volumebinding
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultBindTimeoutSeconds defines the default bind timeout in seconds
|
||||
DefaultBindTimeoutSeconds = 600
|
||||
|
||||
allBoundStateKey framework.StateKey = "volumebinding:all-bound"
|
||||
)
|
||||
|
||||
type stateData struct {
|
||||
allBound bool
|
||||
}
|
||||
|
||||
func (d stateData) Clone() framework.StateData {
|
||||
return d
|
||||
}
|
||||
|
||||
// VolumeBinding is a plugin that binds pod volumes in scheduling.
|
||||
// In the Filter phase, pod binding cache is created for the pod and used in
|
||||
// Reserve and PreBind phases. Pod binding cache will be cleared at
|
||||
// Unreserve and PostBind extension points. However, if pod fails before
|
||||
// the Reserve phase and is deleted from the apiserver later, its pod binding
|
||||
// cache cannot be cleared at plugin extension points. We register an
|
||||
// event handler to clear pod binding cache when the pod is deleted to
|
||||
// prevent memory leaking.
|
||||
type VolumeBinding struct {
|
||||
binder scheduling.SchedulerVolumeBinder
|
||||
Binder scheduling.SchedulerVolumeBinder
|
||||
}
|
||||
|
||||
var _ framework.FilterPlugin = &VolumeBinding{}
|
||||
var _ framework.ReservePlugin = &VolumeBinding{}
|
||||
var _ framework.PreBindPlugin = &VolumeBinding{}
|
||||
var _ framework.UnreservePlugin = &VolumeBinding{}
|
||||
var _ framework.PostBindPlugin = &VolumeBinding{}
|
||||
|
||||
// Name is the name of the plugin used in Registry and configurations.
|
||||
const Name = "VolumeBinding"
|
||||
@ -71,7 +103,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
|
||||
return nil
|
||||
}
|
||||
|
||||
reasons, err := pl.binder.FindPodVolumes(pod, node)
|
||||
reasons, err := pl.Binder.FindPodVolumes(pod, node)
|
||||
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
@ -87,9 +119,105 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
|
||||
return nil
|
||||
}
|
||||
|
||||
// New initializes a new plugin with volume binder and returns it.
|
||||
func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
// Reserve reserves volumes of pod and saves binding status in cycle state.
|
||||
func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
allBound, err := pl.Binder.AssumePodVolumes(pod, nodeName)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
cs.Write(allBoundStateKey, stateData{allBound: allBound})
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreBind will make the API update with the assumed bindings and wait until
|
||||
// the PV controller has completely finished the binding operation.
|
||||
//
|
||||
// If binding errors, times out or gets undone, then an error will be returned to
|
||||
// retry scheduling.
|
||||
func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
state, err := cs.Read(allBoundStateKey)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
s, ok := state.(stateData)
|
||||
if !ok {
|
||||
return framework.NewStatus(framework.Error, "unable to convert state into stateData")
|
||||
}
|
||||
if s.allBound {
|
||||
// no need to bind volumes
|
||||
return nil
|
||||
}
|
||||
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", pod.Namespace, pod.Name)
|
||||
err = pl.Binder.BindPodVolumes(pod)
|
||||
if err != nil {
|
||||
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", pod.Namespace, pod.Name, err)
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", pod.Namespace, pod.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unreserve clears pod binding state.
|
||||
// TODO(#90962) Revert assumed PV/PVC cache
|
||||
func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
pl.Binder.DeletePodBindings(pod)
|
||||
return
|
||||
}
|
||||
|
||||
// PostBind is called after a pod is successfully bound.
|
||||
func (pl *VolumeBinding) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
pl.Binder.DeletePodBindings(pod)
|
||||
return
|
||||
}
|
||||
|
||||
// New initializes a new plugin and returns it.
|
||||
func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
args, ok := plArgs.(*config.VolumeBindingArgs)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs)
|
||||
}
|
||||
if err := validateArgs(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes()
|
||||
pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims()
|
||||
pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes()
|
||||
storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
|
||||
csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
|
||||
binder := scheduling.NewVolumeBinder(fh.ClientSet(), nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
// TODO(#90962) Because pod volume binding cache in SchedulerVolumeBinder is
|
||||
// used only in current scheduling cycle, we can share it via
|
||||
// framework.CycleState, then we don't need to register this event handler
|
||||
// and Unreserve/PostBind extension points to clear pod volume binding
|
||||
// cache.
|
||||
fh.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
var pod *v1.Pod
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
pod = obj.(*v1.Pod)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
var ok bool
|
||||
pod, ok = t.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
|
||||
return
|
||||
}
|
||||
default:
|
||||
utilruntime.HandleError(fmt.Errorf("unable to handle object %T", obj))
|
||||
return
|
||||
}
|
||||
binder.DeletePodBindings(pod)
|
||||
},
|
||||
})
|
||||
return &VolumeBinding{
|
||||
binder: fh.VolumeBinder(),
|
||||
Binder: binder,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func validateArgs(args *config.VolumeBindingArgs) error {
|
||||
if args.BindTimeoutSeconds <= 0 {
|
||||
return fmt.Errorf("invalid BindTimeoutSeconds: %d, must be positive integer", args.BindTimeoutSeconds)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func TestVolumeBinding(t *testing.T) {
|
||||
nodeInfo.SetNode(item.node)
|
||||
fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig)
|
||||
p := &VolumeBinding{
|
||||
binder: fakeVolumeBinder,
|
||||
Binder: fakeVolumeBinder,
|
||||
}
|
||||
gotStatus := p.Filter(context.Background(), nil, item.pod, nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, item.wantStatus) {
|
||||
|
@ -34,11 +34,11 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
@ -47,8 +47,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// BindTimeoutSeconds defines the default bind timeout
|
||||
BindTimeoutSeconds = 100
|
||||
// SchedulerError is the reason recorded for events when an error occurs during scheduling a pod.
|
||||
SchedulerError = "SchedulerError"
|
||||
// Percentage of plugin metrics to be sampled.
|
||||
@ -101,9 +99,6 @@ type Scheduler struct {
|
||||
// Close this to shut down the scheduler.
|
||||
StopEverything <-chan struct{}
|
||||
|
||||
// VolumeBinder handles PVC/PV binding for the pod.
|
||||
VolumeBinder scheduling.SchedulerVolumeBinder
|
||||
|
||||
// Disable pod preemption or not.
|
||||
DisablePreemption bool
|
||||
|
||||
@ -223,7 +218,7 @@ var defaultSchedulerOptions = schedulerOptions{
|
||||
},
|
||||
disablePreemption: false,
|
||||
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
bindTimeoutSeconds: BindTimeoutSeconds,
|
||||
bindTimeoutSeconds: volumebinding.DefaultBindTimeoutSeconds,
|
||||
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
|
||||
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
|
||||
}
|
||||
@ -247,15 +242,6 @@ func New(client clientset.Interface,
|
||||
}
|
||||
|
||||
schedulerCache := internalcache.New(30*time.Second, stopEverything)
|
||||
volumeBinder := scheduling.NewVolumeBinder(
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Storage().V1().CSINodes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
time.Duration(options.bindTimeoutSeconds)*time.Second,
|
||||
)
|
||||
|
||||
registry := frameworkplugins.NewInTreeRegistry()
|
||||
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
|
||||
@ -269,7 +255,6 @@ func New(client clientset.Interface,
|
||||
recorderFactory: recorderFactory,
|
||||
informerFactory: informerFactory,
|
||||
podInformer: podInformer,
|
||||
volumeBinder: volumeBinder,
|
||||
schedulerCache: schedulerCache,
|
||||
StopEverything: stopEverything,
|
||||
disablePreemption: options.disablePreemption,
|
||||
@ -455,29 +440,6 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat
|
||||
return nodeName, err
|
||||
}
|
||||
|
||||
// bindVolumes will make the API update with the assumed bindings and wait until
|
||||
// the PV controller has completely finished the binding operation.
|
||||
//
|
||||
// If binding errors, times out or gets undone, then an error will be returned to
|
||||
// retry scheduling.
|
||||
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
|
||||
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
|
||||
err := sched.VolumeBinder.BindPodVolumes(assumed)
|
||||
if err != nil {
|
||||
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
|
||||
|
||||
// Unassume the Pod and retry scheduling
|
||||
if forgetErr := sched.SchedulerCache.ForgetPod(assumed); forgetErr != nil {
|
||||
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(5).Infof("Success binding volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
|
||||
// assume modifies `assumed`.
|
||||
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
|
||||
@ -617,21 +579,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
assumedPodInfo := podInfo.DeepCopy()
|
||||
assumedPod := assumedPodInfo.Pod
|
||||
|
||||
// Assume volumes first before assuming the pod.
|
||||
//
|
||||
// If all volumes are completely bound, then allBound is true and binding will be skipped.
|
||||
//
|
||||
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
|
||||
//
|
||||
// This function modifies 'assumedPod' if volume binding is required.
|
||||
allBound, err := sched.VolumeBinder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
|
||||
if err != nil {
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError,
|
||||
fmt.Sprintf("AssumePodVolumes failed: %v", err))
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
return
|
||||
}
|
||||
|
||||
// Run "reserve" plugins.
|
||||
if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
|
||||
@ -700,18 +647,6 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Bind volumes first before Pod
|
||||
if !allBound {
|
||||
err := sched.bindVolumes(assumedPod)
|
||||
if err != nil {
|
||||
sched.recordSchedulingFailure(prof, assumedPodInfo, err, "VolumeBindingFailed", err.Error())
|
||||
metrics.PodScheduleErrors.Inc()
|
||||
// trigger un-reserve plugins to clean up state associated with the reserved Pod
|
||||
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Run "prebind" plugins.
|
||||
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
|
||||
if !preBindStatus.IsSuccess() {
|
||||
|
@ -41,12 +41,14 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
clientcache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
@ -348,7 +350,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
|
||||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName),
|
||||
},
|
||||
},
|
||||
VolumeBinder: scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||
}
|
||||
called := make(chan struct{})
|
||||
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
||||
@ -671,7 +672,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
||||
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
|
||||
informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
|
||||
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
|
||||
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
|
||||
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
@ -756,7 +757,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"),
|
||||
}
|
||||
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, nil, fns...)
|
||||
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
|
||||
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
@ -783,11 +784,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
|
||||
// queuedPodStore: pods queued before processing.
|
||||
// scache: scheduler cache that might contain assumed pods.
|
||||
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, volumeBinder scheduling.SchedulerVolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
if volumeBinder == nil {
|
||||
// Create default volume binder if it didn't set.
|
||||
volumeBinder = scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true})
|
||||
}
|
||||
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
bindingChan := make(chan *v1.Binding, 1)
|
||||
client := clientsetfake.NewSimpleClientset()
|
||||
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||
@ -799,7 +796,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
return true, b, nil
|
||||
})
|
||||
|
||||
fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(volumeBinder))
|
||||
fwk, _ := st.NewFramework(fns, framework.WithClientSet(client))
|
||||
prof := &profile.Profile{
|
||||
Framework: fwk,
|
||||
Recorder: &events.FakeRecorder{},
|
||||
@ -835,7 +832,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
Profiles: profiles,
|
||||
podConditionUpdater: fakePodConditionUpdater{},
|
||||
podPreemptor: fakePodPreemptor{},
|
||||
VolumeBinder: volumeBinder,
|
||||
}
|
||||
|
||||
return sched, bindingChan, errChan
|
||||
@ -858,12 +854,13 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume
|
||||
fns := []st.RegisterPluginFunc{
|
||||
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
||||
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
||||
st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New),
|
||||
st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil
|
||||
}, "Filter", "Reserve", "Unreserve", "PreBind", "PostBind"),
|
||||
}
|
||||
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, volumeBinder, fns...)
|
||||
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
|
||||
informerFactory.Start(stop)
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
s.VolumeBinder = volumeBinder
|
||||
return s, bindingChan, errChan
|
||||
}
|
||||
|
||||
@ -952,7 +949,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
|
||||
},
|
||||
expectAssumeCalled: true,
|
||||
eventReason: "FailedScheduling",
|
||||
expectError: assumeErr,
|
||||
expectError: fmt.Errorf("error while running %q reserve plugin for pod %q: %v", volumebinding.Name, "foo", assumeErr),
|
||||
},
|
||||
{
|
||||
name: "bind error",
|
||||
@ -962,7 +959,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
|
||||
expectAssumeCalled: true,
|
||||
expectBindCalled: true,
|
||||
eventReason: "FailedScheduling",
|
||||
expectError: bindErr,
|
||||
expectError: fmt.Errorf("error while running %q prebind plugin for pod %q: %v", volumebinding.Name, "foo", bindErr),
|
||||
},
|
||||
}
|
||||
|
||||
@ -1200,3 +1197,122 @@ func TestSchedulerBinding(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestInjectingPluginConfigForVolumeBinding tests injecting
|
||||
// KubeSchedulerConfiguration.BindTimeoutSeconds as args for VolumeBinding if
|
||||
// no plugin args is configured for it.
|
||||
// TODO remove when KubeSchedulerConfiguration.BindTimeoutSeconds is eliminated
|
||||
func TestInjectingPluginConfigForVolumeBinding(t *testing.T) {
|
||||
defaultPluginConfigs := []config.PluginConfig{
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 600,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
opts []Option
|
||||
wantPluginConfig []config.PluginConfig
|
||||
}{
|
||||
{
|
||||
name: "default with provider",
|
||||
wantPluginConfig: defaultPluginConfigs,
|
||||
},
|
||||
{
|
||||
name: "default with policy",
|
||||
opts: []Option{
|
||||
WithAlgorithmSource(schedulerapi.SchedulerAlgorithmSource{
|
||||
Policy: &config.SchedulerPolicySource{},
|
||||
}),
|
||||
},
|
||||
wantPluginConfig: defaultPluginConfigs,
|
||||
},
|
||||
{
|
||||
name: "customize BindTimeoutSeconds with provider",
|
||||
opts: []Option{
|
||||
WithBindTimeoutSeconds(100),
|
||||
},
|
||||
wantPluginConfig: []config.PluginConfig{
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 100,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "customize BindTimeoutSeconds with policy",
|
||||
opts: []Option{
|
||||
WithAlgorithmSource(schedulerapi.SchedulerAlgorithmSource{
|
||||
Policy: &config.SchedulerPolicySource{},
|
||||
}),
|
||||
WithBindTimeoutSeconds(100),
|
||||
},
|
||||
wantPluginConfig: []config.PluginConfig{
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 100,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "PluginConfig is preferred",
|
||||
opts: []Option{
|
||||
WithBindTimeoutSeconds(100),
|
||||
WithProfiles(config.KubeSchedulerProfile{
|
||||
SchedulerName: v1.DefaultSchedulerName,
|
||||
PluginConfig: []config.PluginConfig{
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 200,
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
},
|
||||
wantPluginConfig: []config.PluginConfig{
|
||||
{
|
||||
Name: "VolumeBinding",
|
||||
Args: &config.VolumeBindingArgs{
|
||||
BindTimeoutSeconds: 200,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
client := fake.NewSimpleClientset()
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}))
|
||||
|
||||
opts := append(tt.opts, WithBuildFrameworkCapturer(func(p config.KubeSchedulerProfile) {
|
||||
if p.SchedulerName != v1.DefaultSchedulerName {
|
||||
t.Errorf("unexpected scheduler name (want %q, got %q)", v1.DefaultSchedulerName, p.SchedulerName)
|
||||
}
|
||||
if diff := cmp.Diff(tt.wantPluginConfig, p.PluginConfig); diff != "" {
|
||||
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
|
||||
}
|
||||
}))
|
||||
|
||||
_, err := New(
|
||||
client,
|
||||
informerFactory,
|
||||
informerFactory.Core().V1().Pods(),
|
||||
recorderFactory,
|
||||
make(chan struct{}),
|
||||
opts...,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("Error constructing: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,8 +99,14 @@ func getPluginSetByExtension(plugins *schedulerapi.Plugins, extension string) *s
|
||||
return initializeIfNeeded(&plugins.Bind)
|
||||
case "Reserve":
|
||||
return initializeIfNeeded(&plugins.Reserve)
|
||||
case "Unreserve":
|
||||
return initializeIfNeeded(&plugins.Unreserve)
|
||||
case "Permit":
|
||||
return initializeIfNeeded(&plugins.Permit)
|
||||
case "PreBind":
|
||||
return initializeIfNeeded(&plugins.PreBind)
|
||||
case "PostBind":
|
||||
return initializeIfNeeded(&plugins.PostBind)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
@ -143,7 +143,11 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
|
||||
{Name: "DefaultPodTopologySpread", Weight: 1},
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -233,7 +237,11 @@ kind: Policy
|
||||
{Name: "DefaultPodTopologySpread", Weight: 1},
|
||||
{Name: "TaintToleration", Weight: 1},
|
||||
},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"ReservePlugin": {{Name: "VolumeBinding"}},
|
||||
"UnreservePlugin": {{Name: "VolumeBinding"}},
|
||||
"PreBindPlugin": {{Name: "VolumeBinding"}},
|
||||
"BindPlugin": {{Name: "DefaultBinder"}},
|
||||
"PostBindPlugin": {{Name: "VolumeBinding"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user