From 1763688d71ba18b333e9e3d190b0f03a513be57f Mon Sep 17 00:00:00 2001 From: Harsh Singh Date: Wed, 14 Oct 2020 17:21:36 +0530 Subject: [PATCH] Added config parameter for CPU threads --- .../app/options/options_test.go | 7 +++ cmd/kube-scheduler/app/server.go | 1 + pkg/scheduler/BUILD | 1 + .../apis/config/scheme/scheme_test.go | 2 + pkg/scheduler/apis/config/types.go | 3 ++ pkg/scheduler/apis/config/v1beta1/defaults.go | 5 ++ .../apis/config/v1beta1/defaults_test.go | 42 +++++++++++++++ .../config/v1beta1/zz_generated.conversion.go | 6 +++ .../apis/config/validation/validation.go | 4 ++ .../apis/config/validation/validation_test.go | 8 +++ pkg/scheduler/internal/parallelize/BUILD | 5 +- .../internal/parallelize/parallelism.go | 18 +++++-- .../internal/parallelize/parallelism_test.go | 54 +++++++++++++++++++ pkg/scheduler/scheduler.go | 9 ++++ .../kube-scheduler/config/v1beta1/types.go | 3 ++ .../config/v1beta1/zz_generated.deepcopy.go | 5 ++ 16 files changed, 168 insertions(+), 5 deletions(-) create mode 100644 pkg/scheduler/internal/parallelize/parallelism_test.go diff --git a/cmd/kube-scheduler/app/options/options_test.go b/cmd/kube-scheduler/app/options/options_test.go index e5f16b4fd4f..abe32ff8483 100644 --- a/cmd/kube-scheduler/app/options/options_test.go +++ b/cmd/kube-scheduler/app/options/options_test.go @@ -298,6 +298,7 @@ profiles: }, expectedUsername: "config", expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, HealthzBindAddress: "0.0.0.0:10251", MetricsBindAddress: "0.0.0.0:10251", @@ -395,6 +396,7 @@ profiles: }, expectedUsername: "flag", expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, HealthzBindAddress: "", // defaults empty when not running from config file MetricsBindAddress: "", // defaults empty when not running from config file @@ -459,6 +461,7 @@ profiles: Logs: logs.NewOptions(), }, expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, HealthzBindAddress: "", // defaults empty when not running from config file MetricsBindAddress: "", // defaults empty when not running from config file @@ -498,6 +501,7 @@ profiles: }, expectedUsername: "config", expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, HealthzBindAddress: "0.0.0.0:10251", MetricsBindAddress: "0.0.0.0:10251", @@ -572,6 +576,7 @@ profiles: }, expectedUsername: "config", expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, HealthzBindAddress: "0.0.0.0:10251", MetricsBindAddress: "0.0.0.0:10251", @@ -648,6 +653,7 @@ profiles: }, expectedUsername: "flag", expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ EnableProfiling: true, @@ -700,6 +706,7 @@ profiles: }, expectedUsername: "flag", expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{ + Parallelism: 16, AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource}, DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ EnableProfiling: true, diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 1907b16be17..4d7f2747eac 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -322,6 +322,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), + scheduler.WithParallelism(cc.ComponentConfig.Parallelism), ) if err != nil { return nil, nil, err diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 81468356a43..6f0cfe4888a 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -25,6 +25,7 @@ go_library( "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library", + "//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/profile:go_default_library", diff --git a/pkg/scheduler/apis/config/scheme/scheme_test.go b/pkg/scheduler/apis/config/scheme/scheme_test.go index a494de5f85c..a59b5184334 100644 --- a/pkg/scheduler/apis/config/scheme/scheme_test.go +++ b/pkg/scheduler/apis/config/scheme/scheme_test.go @@ -466,6 +466,7 @@ profiles: name: "v1beta1 in-tree and out-of-tree plugins from internal", version: v1beta1.SchemeGroupVersion, obj: &config.KubeSchedulerConfiguration{ + Parallelism: 8, Profiles: []config.KubeSchedulerProfile{ { PluginConfig: []config.PluginConfig{ @@ -521,6 +522,7 @@ leaderElection: resourceNamespace: "" retryPeriod: 0s metricsBindAddress: "" +parallelism: 8 percentageOfNodesToScore: 0 podInitialBackoffSeconds: 0 podMaxBackoffSeconds: 0 diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 2aaee6a866e..819bbe2adb0 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -55,6 +55,9 @@ const ( type KubeSchedulerConfiguration struct { metav1.TypeMeta + // Parallelism defines the amount of parallelism in algorithms for scheduling a Pods. Must be greater than 0. Defaults to 16 + Parallelism int32 + // AlgorithmSource specifies the scheduler algorithm source. // TODO(#87526): Remove AlgorithmSource from this package // DEPRECATED: AlgorithmSource is removed in the v1beta1 ComponentConfig diff --git a/pkg/scheduler/apis/config/v1beta1/defaults.go b/pkg/scheduler/apis/config/v1beta1/defaults.go index c41c399cbed..18dd9d9b6e2 100644 --- a/pkg/scheduler/apis/config/v1beta1/defaults.go +++ b/pkg/scheduler/apis/config/v1beta1/defaults.go @@ -44,6 +44,11 @@ func addDefaultingFuncs(scheme *runtime.Scheme) error { // SetDefaults_KubeSchedulerConfiguration sets additional defaults func SetDefaults_KubeSchedulerConfiguration(obj *v1beta1.KubeSchedulerConfiguration) { + + if obj.Parallelism == nil { + obj.Parallelism = pointer.Int32Ptr(16) + } + if len(obj.Profiles) == 0 { obj.Profiles = append(obj.Profiles, v1beta1.KubeSchedulerProfile{}) } diff --git a/pkg/scheduler/apis/config/v1beta1/defaults_test.go b/pkg/scheduler/apis/config/v1beta1/defaults_test.go index f334da90baa..bf7c69ba9bc 100644 --- a/pkg/scheduler/apis/config/v1beta1/defaults_test.go +++ b/pkg/scheduler/apis/config/v1beta1/defaults_test.go @@ -45,6 +45,7 @@ func TestSchedulerDefaults(t *testing.T) { name: "empty config", config: &v1beta1.KubeSchedulerConfiguration{}, expected: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"), MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"), DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ @@ -85,6 +86,7 @@ func TestSchedulerDefaults(t *testing.T) { }, }, expected: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"), MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"), DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ @@ -121,6 +123,7 @@ func TestSchedulerDefaults(t *testing.T) { { name: "two profiles", config: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), Profiles: []v1beta1.KubeSchedulerProfile{ { PluginConfig: []v1beta1.PluginConfig{ @@ -140,6 +143,7 @@ func TestSchedulerDefaults(t *testing.T) { }, }, expected: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"), MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"), DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ @@ -185,10 +189,12 @@ func TestSchedulerDefaults(t *testing.T) { { name: "metrics and healthz address with no port", config: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), MetricsBindAddress: pointer.StringPtr("1.2.3.4"), HealthzBindAddress: pointer.StringPtr("1.2.3.4"), }, expected: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), HealthzBindAddress: pointer.StringPtr("1.2.3.4:10251"), MetricsBindAddress: pointer.StringPtr("1.2.3.4:10251"), DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ @@ -224,6 +230,7 @@ func TestSchedulerDefaults(t *testing.T) { HealthzBindAddress: pointer.StringPtr(":12345"), }, expected: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(16), HealthzBindAddress: pointer.StringPtr("0.0.0.0:12345"), MetricsBindAddress: pointer.StringPtr("0.0.0.0:12345"), DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ @@ -252,6 +259,41 @@ func TestSchedulerDefaults(t *testing.T) { }, }, }, + { + name: "set non default parallelism", + config: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(8), + }, + expected: &v1beta1.KubeSchedulerConfiguration{ + Parallelism: pointer.Int32Ptr(8), + HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"), + MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"), + DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{ + EnableProfiling: &enable, + EnableContentionProfiling: &enable, + }, + LeaderElection: componentbaseconfig.LeaderElectionConfiguration{ + LeaderElect: pointer.BoolPtr(true), + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceLock: "leases", + ResourceNamespace: "kube-system", + ResourceName: "kube-scheduler", + }, + ClientConnection: componentbaseconfig.ClientConnectionConfiguration{ + QPS: 50, + Burst: 100, + ContentType: "application/vnd.kubernetes.protobuf", + }, + PercentageOfNodesToScore: pointer.Int32Ptr(0), + PodInitialBackoffSeconds: pointer.Int64Ptr(1), + PodMaxBackoffSeconds: pointer.Int64Ptr(10), + Profiles: []v1beta1.KubeSchedulerProfile{ + {SchedulerName: pointer.StringPtr("default-scheduler")}, + }, + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go index edd6a18a749..46840442166 100644 --- a/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1beta1/zz_generated.conversion.go @@ -290,6 +290,9 @@ func Convert_config_InterPodAffinityArgs_To_v1beta1_InterPodAffinityArgs(in *con } func autoConvert_v1beta1_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration(in *v1beta1.KubeSchedulerConfiguration, out *config.KubeSchedulerConfiguration, s conversion.Scope) error { + if err := metav1.Convert_Pointer_int32_To_int32(&in.Parallelism, &out.Parallelism, s); err != nil { + return err + } if err := v1alpha1.Convert_v1alpha1_LeaderElectionConfiguration_To_config_LeaderElectionConfiguration(&in.LeaderElection, &out.LeaderElection, s); err != nil { return err } @@ -330,6 +333,9 @@ func autoConvert_v1beta1_KubeSchedulerConfiguration_To_config_KubeSchedulerConfi } func autoConvert_config_KubeSchedulerConfiguration_To_v1beta1_KubeSchedulerConfiguration(in *config.KubeSchedulerConfiguration, out *v1beta1.KubeSchedulerConfiguration, s conversion.Scope) error { + if err := metav1.Convert_int32_To_Pointer_int32(&in.Parallelism, &out.Parallelism, s); err != nil { + return err + } // WARNING: in.AlgorithmSource requires manual conversion: does not exist in peer-type if err := v1alpha1.Convert_config_LeaderElectionConfiguration_To_v1alpha1_LeaderElectionConfiguration(&in.LeaderElection, &out.LeaderElection, s); err != nil { return err diff --git a/pkg/scheduler/apis/config/validation/validation.go b/pkg/scheduler/apis/config/validation/validation.go index e48ae5dc6b7..12feae0e524 100644 --- a/pkg/scheduler/apis/config/validation/validation.go +++ b/pkg/scheduler/apis/config/validation/validation.go @@ -38,6 +38,10 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f allErrs = append(allErrs, componentbasevalidation.ValidateLeaderElectionConfiguration(&cc.LeaderElection, field.NewPath("leaderElection"))...) profilesPath := field.NewPath("profiles") + if cc.Parallelism <= 0 { + allErrs = append(allErrs, field.Invalid(field.NewPath("parallelism"), cc.Parallelism, "should be an integer value greater than zero")) + } + if len(cc.Profiles) == 0 { allErrs = append(allErrs, field.Required(profilesPath, "")) } else { diff --git a/pkg/scheduler/apis/config/validation/validation_test.go b/pkg/scheduler/apis/config/validation/validation_test.go index b2a11fc9c29..d1053a40c4d 100644 --- a/pkg/scheduler/apis/config/validation/validation_test.go +++ b/pkg/scheduler/apis/config/validation/validation_test.go @@ -31,6 +31,7 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { podInitialBackoffSeconds := int64(1) podMaxBackoffSeconds := int64(1) validConfig := &config.KubeSchedulerConfiguration{ + Parallelism: 8, HealthzBindAddress: "0.0.0.0:10254", MetricsBindAddress: "0.0.0.0:10254", ClientConnection: componentbaseconfig.ClientConnectionConfiguration{ @@ -91,6 +92,9 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { }, } + invalidParallelismValue := validConfig.DeepCopy() + invalidParallelismValue.Parallelism = 0 + resourceNameNotSet := validConfig.DeepCopy() resourceNameNotSet.LeaderElection.ResourceName = "" @@ -152,6 +156,10 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) { expectedToFail: false, config: validConfig, }, + "bad-parallelism-invalid-value": { + expectedToFail: true, + config: invalidParallelismValue, + }, "bad-resource-name-not-set": { expectedToFail: true, config: resourceNameNotSet, diff --git a/pkg/scheduler/internal/parallelize/BUILD b/pkg/scheduler/internal/parallelize/BUILD index d78eb1121ed..1ce4de989d2 100644 --- a/pkg/scheduler/internal/parallelize/BUILD +++ b/pkg/scheduler/internal/parallelize/BUILD @@ -27,6 +27,9 @@ filegroup( go_test( name = "go_default_test", - srcs = ["error_channel_test.go"], + srcs = [ + "error_channel_test.go", + "parallelism_test.go", + ], embed = [":go_default_library"], ) diff --git a/pkg/scheduler/internal/parallelize/parallelism.go b/pkg/scheduler/internal/parallelize/parallelism.go index ab337deeca1..4eebc62e33b 100644 --- a/pkg/scheduler/internal/parallelize/parallelism.go +++ b/pkg/scheduler/internal/parallelize/parallelism.go @@ -23,21 +23,31 @@ import ( "k8s.io/client-go/util/workqueue" ) -const parallelism = 16 +var ( + parallelism = 16 +) + +// SetParallelism sets the parallelism for all scheduler algorithms. +// TODO(#95952): Remove global setter in favor of a struct that holds the configuration. +func SetParallelism(p int) { + parallelism = p +} // chunkSizeFor returns a chunk size for the given number of items to use for // parallel work. The size aims to produce good CPU utilization. -func chunkSizeFor(n int) workqueue.Options { +// returns max(1, min(sqrt(n), n/Parallelism)) +func chunkSizeFor(n int) int { s := int(math.Sqrt(float64(n))) + if r := n/parallelism + 1; s > r { s = r } else if s < 1 { s = 1 } - return workqueue.WithChunkSize(s) + return s } // Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms. func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) { - workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, chunkSizeFor(pieces)) + workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces))) } diff --git a/pkg/scheduler/internal/parallelize/parallelism_test.go b/pkg/scheduler/internal/parallelize/parallelism_test.go new file mode 100644 index 00000000000..4f22dea1062 --- /dev/null +++ b/pkg/scheduler/internal/parallelize/parallelism_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package parallelize + +import ( + "fmt" + "testing" +) + +func TestChunkSize(t *testing.T) { + tests := []struct { + input int + wantOutput int + }{ + { + input: 32, + wantOutput: 3, + }, + { + input: 16, + wantOutput: 2, + }, + { + input: 1, + wantOutput: 1, + }, + { + input: 0, + wantOutput: 1, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%d", test.input), func(t *testing.T) { + if chunkSizeFor(test.input) != test.wantOutput { + t.Errorf("Expected: %d, got: %d", test.wantOutput, chunkSizeFor(test.input)) + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3ab718dde35..75a7e5528cf 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -42,6 +42,7 @@ import ( frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" @@ -109,6 +110,14 @@ func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option { } } +// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16. +// TODO(#95952): Remove global setter in favor of a struct that holds the configuration. +func WithParallelism(threads int32) Option { + return func(o *schedulerOptions) { + parallelize.SetParallelism(int(threads)) + } +} + // WithAlgorithmSource sets schedulerAlgorithmSource for Scheduler, the default is a source with DefaultProvider. func WithAlgorithmSource(source schedulerapi.SchedulerAlgorithmSource) Option { return func(o *schedulerOptions) { diff --git a/staging/src/k8s.io/kube-scheduler/config/v1beta1/types.go b/staging/src/k8s.io/kube-scheduler/config/v1beta1/types.go index 87965c73436..b98090a1055 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1beta1/types.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1beta1/types.go @@ -44,6 +44,9 @@ const ( type KubeSchedulerConfiguration struct { metav1.TypeMeta `json:",inline"` + // Parallelism defines the amount of parallelism in algorithms for scheduling a Pods. Must be greater than 0. Defaults to 16 + Parallelism *int32 `json:"parallelism,omitempty"` + // LeaderElection defines the configuration of leader election client. LeaderElection componentbaseconfigv1alpha1.LeaderElectionConfiguration `json:"leaderElection"` diff --git a/staging/src/k8s.io/kube-scheduler/config/v1beta1/zz_generated.deepcopy.go b/staging/src/k8s.io/kube-scheduler/config/v1beta1/zz_generated.deepcopy.go index 4a5f5cd7bad..8bc3650b38b 100644 --- a/staging/src/k8s.io/kube-scheduler/config/v1beta1/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/kube-scheduler/config/v1beta1/zz_generated.deepcopy.go @@ -87,6 +87,11 @@ func (in *InterPodAffinityArgs) DeepCopyObject() runtime.Object { func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfiguration) { *out = *in out.TypeMeta = in.TypeMeta + if in.Parallelism != nil { + in, out := &in.Parallelism, &out.Parallelism + *out = new(int32) + **out = **in + } in.LeaderElection.DeepCopyInto(&out.LeaderElection) out.ClientConnection = in.ClientConnection if in.HealthzBindAddress != nil {