Merge pull request #94636 from hprateek43/cpu_threads_parameter

Added config parameter for CPU threads
This commit is contained in:
Kubernetes Prow Robot 2020-10-29 21:04:05 -07:00 committed by GitHub
commit d0bee69fc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 168 additions and 5 deletions

View File

@ -298,6 +298,7 @@ profiles:
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HealthzBindAddress: "0.0.0.0:10251",
MetricsBindAddress: "0.0.0.0:10251",
@ -395,6 +396,7 @@ profiles:
},
expectedUsername: "flag",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HealthzBindAddress: "", // defaults empty when not running from config file
MetricsBindAddress: "", // defaults empty when not running from config file
@ -459,6 +461,7 @@ profiles:
Logs: logs.NewOptions(),
},
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HealthzBindAddress: "", // defaults empty when not running from config file
MetricsBindAddress: "", // defaults empty when not running from config file
@ -498,6 +501,7 @@ profiles:
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HealthzBindAddress: "0.0.0.0:10251",
MetricsBindAddress: "0.0.0.0:10251",
@ -572,6 +576,7 @@ profiles:
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
HealthzBindAddress: "0.0.0.0:10251",
MetricsBindAddress: "0.0.0.0:10251",
@ -648,6 +653,7 @@ profiles:
},
expectedUsername: "flag",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
EnableProfiling: true,
@ -700,6 +706,7 @@ profiles:
},
expectedUsername: "flag",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
Parallelism: 16,
AlgorithmSource: kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &defaultSource},
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
EnableProfiling: true,

View File

@ -322,6 +322,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
)
if err != nil {
return nil, nil, err

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/scheduler/framework/runtime:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library",
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/profile:go_default_library",

View File

@ -459,6 +459,7 @@ profiles:
name: "v1beta1 in-tree and out-of-tree plugins from internal",
version: v1beta1.SchemeGroupVersion,
obj: &config.KubeSchedulerConfiguration{
Parallelism: 8,
Profiles: []config.KubeSchedulerProfile{
{
PluginConfig: []config.PluginConfig{
@ -514,6 +515,7 @@ leaderElection:
resourceNamespace: ""
retryPeriod: 0s
metricsBindAddress: ""
parallelism: 8
percentageOfNodesToScore: 0
podInitialBackoffSeconds: 0
podMaxBackoffSeconds: 0

View File

@ -55,6 +55,9 @@ const (
type KubeSchedulerConfiguration struct {
metav1.TypeMeta
// Parallelism defines the amount of parallelism in algorithms for scheduling a Pods. Must be greater than 0. Defaults to 16
Parallelism int32
// AlgorithmSource specifies the scheduler algorithm source.
// TODO(#87526): Remove AlgorithmSource from this package
// DEPRECATED: AlgorithmSource is removed in the v1beta1 ComponentConfig

View File

@ -44,6 +44,11 @@ func addDefaultingFuncs(scheme *runtime.Scheme) error {
// SetDefaults_KubeSchedulerConfiguration sets additional defaults
func SetDefaults_KubeSchedulerConfiguration(obj *v1beta1.KubeSchedulerConfiguration) {
if obj.Parallelism == nil {
obj.Parallelism = pointer.Int32Ptr(16)
}
if len(obj.Profiles) == 0 {
obj.Profiles = append(obj.Profiles, v1beta1.KubeSchedulerProfile{})
}

View File

@ -45,6 +45,7 @@ func TestSchedulerDefaults(t *testing.T) {
name: "empty config",
config: &v1beta1.KubeSchedulerConfiguration{},
expected: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"),
MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"),
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
@ -85,6 +86,7 @@ func TestSchedulerDefaults(t *testing.T) {
},
},
expected: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"),
MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"),
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
@ -121,6 +123,7 @@ func TestSchedulerDefaults(t *testing.T) {
{
name: "two profiles",
config: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
Profiles: []v1beta1.KubeSchedulerProfile{
{
PluginConfig: []v1beta1.PluginConfig{
@ -140,6 +143,7 @@ func TestSchedulerDefaults(t *testing.T) {
},
},
expected: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"),
MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"),
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
@ -185,10 +189,12 @@ func TestSchedulerDefaults(t *testing.T) {
{
name: "metrics and healthz address with no port",
config: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
MetricsBindAddress: pointer.StringPtr("1.2.3.4"),
HealthzBindAddress: pointer.StringPtr("1.2.3.4"),
},
expected: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
HealthzBindAddress: pointer.StringPtr("1.2.3.4:10251"),
MetricsBindAddress: pointer.StringPtr("1.2.3.4:10251"),
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
@ -224,6 +230,7 @@ func TestSchedulerDefaults(t *testing.T) {
HealthzBindAddress: pointer.StringPtr(":12345"),
},
expected: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(16),
HealthzBindAddress: pointer.StringPtr("0.0.0.0:12345"),
MetricsBindAddress: pointer.StringPtr("0.0.0.0:12345"),
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
@ -252,6 +259,41 @@ func TestSchedulerDefaults(t *testing.T) {
},
},
},
{
name: "set non default parallelism",
config: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(8),
},
expected: &v1beta1.KubeSchedulerConfiguration{
Parallelism: pointer.Int32Ptr(8),
HealthzBindAddress: pointer.StringPtr("0.0.0.0:10251"),
MetricsBindAddress: pointer.StringPtr("0.0.0.0:10251"),
DebuggingConfiguration: componentbaseconfig.DebuggingConfiguration{
EnableProfiling: &enable,
EnableContentionProfiling: &enable,
},
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: pointer.BoolPtr(true),
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: "leases",
ResourceNamespace: "kube-system",
ResourceName: "kube-scheduler",
},
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
QPS: 50,
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
PercentageOfNodesToScore: pointer.Int32Ptr(0),
PodInitialBackoffSeconds: pointer.Int64Ptr(1),
PodMaxBackoffSeconds: pointer.Int64Ptr(10),
Profiles: []v1beta1.KubeSchedulerProfile{
{SchedulerName: pointer.StringPtr("default-scheduler")},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

View File

@ -290,6 +290,9 @@ func Convert_config_InterPodAffinityArgs_To_v1beta1_InterPodAffinityArgs(in *con
}
func autoConvert_v1beta1_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration(in *v1beta1.KubeSchedulerConfiguration, out *config.KubeSchedulerConfiguration, s conversion.Scope) error {
if err := metav1.Convert_Pointer_int32_To_int32(&in.Parallelism, &out.Parallelism, s); err != nil {
return err
}
if err := v1alpha1.Convert_v1alpha1_LeaderElectionConfiguration_To_config_LeaderElectionConfiguration(&in.LeaderElection, &out.LeaderElection, s); err != nil {
return err
}
@ -330,6 +333,9 @@ func autoConvert_v1beta1_KubeSchedulerConfiguration_To_config_KubeSchedulerConfi
}
func autoConvert_config_KubeSchedulerConfiguration_To_v1beta1_KubeSchedulerConfiguration(in *config.KubeSchedulerConfiguration, out *v1beta1.KubeSchedulerConfiguration, s conversion.Scope) error {
if err := metav1.Convert_int32_To_Pointer_int32(&in.Parallelism, &out.Parallelism, s); err != nil {
return err
}
// WARNING: in.AlgorithmSource requires manual conversion: does not exist in peer-type
if err := v1alpha1.Convert_config_LeaderElectionConfiguration_To_v1alpha1_LeaderElectionConfiguration(&in.LeaderElection, &out.LeaderElection, s); err != nil {
return err

View File

@ -38,6 +38,10 @@ func ValidateKubeSchedulerConfiguration(cc *config.KubeSchedulerConfiguration) f
allErrs = append(allErrs, componentbasevalidation.ValidateLeaderElectionConfiguration(&cc.LeaderElection, field.NewPath("leaderElection"))...)
profilesPath := field.NewPath("profiles")
if cc.Parallelism <= 0 {
allErrs = append(allErrs, field.Invalid(field.NewPath("parallelism"), cc.Parallelism, "should be an integer value greater than zero"))
}
if len(cc.Profiles) == 0 {
allErrs = append(allErrs, field.Required(profilesPath, ""))
} else {

View File

@ -31,6 +31,7 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
podInitialBackoffSeconds := int64(1)
podMaxBackoffSeconds := int64(1)
validConfig := &config.KubeSchedulerConfiguration{
Parallelism: 8,
HealthzBindAddress: "0.0.0.0:10254",
MetricsBindAddress: "0.0.0.0:10254",
ClientConnection: componentbaseconfig.ClientConnectionConfiguration{
@ -91,6 +92,9 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
},
}
invalidParallelismValue := validConfig.DeepCopy()
invalidParallelismValue.Parallelism = 0
resourceNameNotSet := validConfig.DeepCopy()
resourceNameNotSet.LeaderElection.ResourceName = ""
@ -152,6 +156,10 @@ func TestValidateKubeSchedulerConfiguration(t *testing.T) {
expectedToFail: false,
config: validConfig,
},
"bad-parallelism-invalid-value": {
expectedToFail: true,
config: invalidParallelismValue,
},
"bad-resource-name-not-set": {
expectedToFail: true,
config: resourceNameNotSet,

View File

@ -27,6 +27,9 @@ filegroup(
go_test(
name = "go_default_test",
srcs = ["error_channel_test.go"],
srcs = [
"error_channel_test.go",
"parallelism_test.go",
],
embed = [":go_default_library"],
)

View File

@ -23,21 +23,31 @@ import (
"k8s.io/client-go/util/workqueue"
)
const parallelism = 16
var (
parallelism = 16
)
// SetParallelism sets the parallelism for all scheduler algorithms.
// TODO(#95952): Remove global setter in favor of a struct that holds the configuration.
func SetParallelism(p int) {
parallelism = p
}
// chunkSizeFor returns a chunk size for the given number of items to use for
// parallel work. The size aims to produce good CPU utilization.
func chunkSizeFor(n int) workqueue.Options {
// returns max(1, min(sqrt(n), n/Parallelism))
func chunkSizeFor(n int) int {
s := int(math.Sqrt(float64(n)))
if r := n/parallelism + 1; s > r {
s = r
} else if s < 1 {
s = 1
}
return workqueue.WithChunkSize(s)
return s
}
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
func Until(ctx context.Context, pieces int, doWorkPiece workqueue.DoWorkPieceFunc) {
workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, chunkSizeFor(pieces))
workqueue.ParallelizeUntil(ctx, parallelism, pieces, doWorkPiece, workqueue.WithChunkSize(chunkSizeFor(pieces)))
}

View File

@ -0,0 +1,54 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package parallelize
import (
"fmt"
"testing"
)
func TestChunkSize(t *testing.T) {
tests := []struct {
input int
wantOutput int
}{
{
input: 32,
wantOutput: 3,
},
{
input: 16,
wantOutput: 2,
},
{
input: 1,
wantOutput: 1,
},
{
input: 0,
wantOutput: 1,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("%d", test.input), func(t *testing.T) {
if chunkSizeFor(test.input) != test.wantOutput {
t.Errorf("Expected: %d, got: %d", test.wantOutput, chunkSizeFor(test.input))
}
})
}
}

View File

@ -42,6 +42,7 @@ import (
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
@ -109,6 +110,14 @@ func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
}
}
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
// TODO(#95952): Remove global setter in favor of a struct that holds the configuration.
func WithParallelism(threads int32) Option {
return func(o *schedulerOptions) {
parallelize.SetParallelism(int(threads))
}
}
// WithAlgorithmSource sets schedulerAlgorithmSource for Scheduler, the default is a source with DefaultProvider.
func WithAlgorithmSource(source schedulerapi.SchedulerAlgorithmSource) Option {
return func(o *schedulerOptions) {

View File

@ -44,6 +44,9 @@ const (
type KubeSchedulerConfiguration struct {
metav1.TypeMeta `json:",inline"`
// Parallelism defines the amount of parallelism in algorithms for scheduling a Pods. Must be greater than 0. Defaults to 16
Parallelism *int32 `json:"parallelism,omitempty"`
// LeaderElection defines the configuration of leader election client.
LeaderElection componentbaseconfigv1alpha1.LeaderElectionConfiguration `json:"leaderElection"`

View File

@ -87,6 +87,11 @@ func (in *InterPodAffinityArgs) DeepCopyObject() runtime.Object {
func (in *KubeSchedulerConfiguration) DeepCopyInto(out *KubeSchedulerConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.Parallelism != nil {
in, out := &in.Parallelism, &out.Parallelism
*out = new(int32)
**out = **in
}
in.LeaderElection.DeepCopyInto(&out.LeaderElection)
out.ClientConnection = in.ClientConnection
if in.HealthzBindAddress != nil {