diff --git a/cmd/kube-scheduler/app/BUILD b/cmd/kube-scheduler/app/BUILD index eb071684b88..65394233c45 100644 --- a/cmd/kube-scheduler/app/BUILD +++ b/cmd/kube-scheduler/app/BUILD @@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -65,3 +66,15 @@ filegroup( ], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["server_test.go"], + embed = [":go_default_library"], + deps = [ + "//cmd/kube-scheduler/app/options:go_default_library", + "//pkg/scheduler/apis/config:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", + "//vendor/github.com/spf13/pflag:go_default_library", + ], +) diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 306571053a3..7b866163e31 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -119,33 +119,29 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) - if len(args) != 0 { - fmt.Fprint(os.Stderr, "arguments are not supported\n") - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - if errs := opts.Validate(); len(errs) > 0 { - return utilerrors.NewAggregate(errs) + cc, sched, err := Setup(ctx, args, opts, registryOptions...) + if err != nil { + return err } if len(opts.WriteConfigTo) > 0 { - c := &schedulerserverconfig.Config{} - if err := opts.ApplyTo(c); err != nil { - return err - } - if err := options.WriteConfigFile(opts.WriteConfigTo, &c.ComponentConfig); err != nil { + if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil { return err } klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo) return nil } - c, err := opts.Config() - if err != nil { - return err - } + return Run(ctx, cc, sched) +} - // Get the completed config - cc := c.Complete() +// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. +func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { + // To help debugging, immediately log version + klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) // Configz registration. if cz, err := configz.New("componentconfig"); err == nil { @@ -154,45 +150,6 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist return fmt.Errorf("unable to register configz: %s", err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - return Run(ctx, cc, registryOptions...) -} - -// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. -func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error { - // To help debugging, immediately log version - klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) - - outOfTreeRegistry := make(framework.Registry) - for _, option := range outOfTreeRegistryOptions { - if err := option(outOfTreeRegistry); err != nil { - return err - } - } - - recorderFactory := getRecorderFactory(&cc) - // Create the scheduler. - sched, err := scheduler.New(cc.Client, - cc.InformerFactory, - cc.PodInformer, - recorderFactory, - ctx.Done(), - scheduler.WithProfiles(cc.ComponentConfig.Profiles...), - scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), - scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), - scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), - scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), - scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), - scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), - scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), - scheduler.WithExtenders(cc.ComponentConfig.Extenders...), - ) - if err != nil { - return err - } - // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { cc.Broadcaster.StartRecordingToSink(ctx.Done()) @@ -340,3 +297,52 @@ func WithPlugin(name string, factory framework.PluginFactory) Option { return registry.Register(name, factory) } } + +// Setup creates a completed config and a scheduler based on the command args and options +func Setup(ctx context.Context, args []string, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { + if len(args) != 0 { + fmt.Fprint(os.Stderr, "arguments are not supported\n") + } + + if errs := opts.Validate(); len(errs) > 0 { + return nil, nil, utilerrors.NewAggregate(errs) + } + + c, err := opts.Config() + if err != nil { + return nil, nil, err + } + + // Get the completed config + cc := c.Complete() + + outOfTreeRegistry := make(framework.Registry) + for _, option := range outOfTreeRegistryOptions { + if err := option(outOfTreeRegistry); err != nil { + return nil, nil, err + } + } + + recorderFactory := getRecorderFactory(&cc) + // Create the scheduler. + sched, err := scheduler.New(cc.Client, + cc.InformerFactory, + cc.PodInformer, + recorderFactory, + ctx.Done(), + scheduler.WithProfiles(cc.ComponentConfig.Profiles...), + scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), + scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), + scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), + scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), + scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), + scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), + scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), + scheduler.WithExtenders(cc.ComponentConfig.Extenders...), + ) + if err != nil { + return nil, nil, err + } + + return &cc, sched, nil +} diff --git a/cmd/kube-scheduler/app/server_test.go b/cmd/kube-scheduler/app/server_test.go new file mode 100644 index 00000000000..e6035cc3b7e --- /dev/null +++ b/cmd/kube-scheduler/app/server_test.go @@ -0,0 +1,298 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/spf13/pflag" + "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" +) + +func TestSetup(t *testing.T) { + // temp dir + tmpDir, err := ioutil.TempDir("", "scheduler-options") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // https server + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(200) + w.Write([]byte(`ok`)) + })) + defer server.Close() + + configKubeconfig := filepath.Join(tmpDir, "config.kubeconfig") + if err := ioutil.WriteFile(configKubeconfig, []byte(fmt.Sprintf(` +apiVersion: v1 +kind: Config +clusters: +- cluster: + insecure-skip-tls-verify: true + server: %s + name: default +contexts: +- context: + cluster: default + user: default + name: default +current-context: default +users: +- name: default + user: + username: config +`, server.URL)), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + v1alpha1Config := filepath.Join(tmpDir, "kubeconfig_v1alpha1.yaml") + if err := ioutil.WriteFile(v1alpha1Config, []byte(fmt.Sprintf(` +apiVersion: kubescheduler.config.k8s.io/v1alpha1 +kind: KubeSchedulerConfiguration +schedulerName: "my-old-scheduler" +clientConnection: + kubeconfig: "%s" +leaderElection: + leaderElect: true +hardPodAffinitySymmetricWeight: 3`, configKubeconfig)), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + // plugin config + pluginConfigFile := filepath.Join(tmpDir, "plugin.yaml") + if err := ioutil.WriteFile(pluginConfigFile, []byte(fmt.Sprintf(` +apiVersion: kubescheduler.config.k8s.io/v1alpha2 +kind: KubeSchedulerConfiguration +clientConnection: + kubeconfig: "%s" +profiles: +- plugins: + preFilter: + enabled: + - name: NodeResourcesFit + - name: NodePorts + disabled: + - name: "*" + filter: + enabled: + - name: NodeResourcesFit + - name: NodePorts + disabled: + - name: "*" + preScore: + enabled: + - name: InterPodAffinity + - name: TaintToleration + disabled: + - name: "*" + score: + enabled: + - name: InterPodAffinity + - name: TaintToleration + disabled: + - name: "*" +`, configKubeconfig)), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + // multiple profiles config + multiProfilesConfig := filepath.Join(tmpDir, "multi-profiles.yaml") + if err := ioutil.WriteFile(multiProfilesConfig, []byte(fmt.Sprintf(` +apiVersion: kubescheduler.config.k8s.io/v1alpha2 +kind: KubeSchedulerConfiguration +clientConnection: + kubeconfig: "%s" +profiles: +- schedulerName: "profile-default-plugins" +- schedulerName: "profile-disable-all-filter-and-score-plugins" + plugins: + preFilter: + disabled: + - name: "*" + filter: + disabled: + - name: "*" + preScore: + disabled: + - name: "*" + score: + disabled: + - name: "*" +`, configKubeconfig)), os.FileMode(0600)); err != nil { + t.Fatal(err) + } + + defaultPlugins := map[string][]kubeschedulerconfig.Plugin{ + "QueueSortPlugin": { + {Name: "PrioritySort"}, + }, + "PreFilterPlugin": { + {Name: "NodeResourcesFit"}, + {Name: "NodePorts"}, + {Name: "InterPodAffinity"}, + {Name: "PodTopologySpread"}, + }, + "FilterPlugin": { + {Name: "NodeUnschedulable"}, + {Name: "NodeResourcesFit"}, + {Name: "NodeName"}, + {Name: "NodePorts"}, + {Name: "NodeAffinity"}, + {Name: "VolumeRestrictions"}, + {Name: "TaintToleration"}, + {Name: "EBSLimits"}, + {Name: "GCEPDLimits"}, + {Name: "NodeVolumeLimits"}, + {Name: "AzureDiskLimits"}, + {Name: "VolumeBinding"}, + {Name: "VolumeZone"}, + {Name: "InterPodAffinity"}, + {Name: "PodTopologySpread"}, + }, + "PreScorePlugin": { + {Name: "InterPodAffinity"}, + {Name: "DefaultPodTopologySpread"}, + {Name: "TaintToleration"}, + {Name: "PodTopologySpread"}, + }, + "ScorePlugin": { + {Name: "NodeResourcesBalancedAllocation", Weight: 1}, + {Name: "ImageLocality", Weight: 1}, + {Name: "InterPodAffinity", Weight: 1}, + {Name: "NodeResourcesLeastAllocated", Weight: 1}, + {Name: "NodeAffinity", Weight: 1}, + {Name: "NodePreferAvoidPods", Weight: 10000}, + {Name: "DefaultPodTopologySpread", Weight: 1}, + {Name: "TaintToleration", Weight: 1}, + {Name: "PodTopologySpread", Weight: 1}, + }, + "BindPlugin": {{Name: "DefaultBinder"}}, + } + + testcases := []struct { + name string + flags []string + wantPlugins map[string]map[string][]kubeschedulerconfig.Plugin + }{ + { + name: "default config", + flags: []string{ + "--kubeconfig", configKubeconfig, + }, + wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{ + "default-scheduler": defaultPlugins, + }, + }, + { + name: "v1alpha1 config with SchedulerName and HardPodAffinitySymmetricWeight", + flags: []string{ + "--config", v1alpha1Config, + "--kubeconfig", configKubeconfig, + }, + wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{ + "my-old-scheduler": defaultPlugins, + }, + }, + { + name: "plugin config with single profile", + flags: []string{ + "--config", pluginConfigFile, + "--kubeconfig", configKubeconfig, + }, + wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{ + "default-scheduler": { + "BindPlugin": {{Name: "DefaultBinder"}}, + "FilterPlugin": {{Name: "NodeResourcesFit"}, {Name: "NodePorts"}}, + "PreFilterPlugin": {{Name: "NodeResourcesFit"}, {Name: "NodePorts"}}, + "PreScorePlugin": {{Name: "InterPodAffinity"}, {Name: "TaintToleration"}}, + "QueueSortPlugin": {{Name: "PrioritySort"}}, + "ScorePlugin": {{Name: "InterPodAffinity", Weight: 1}, {Name: "TaintToleration", Weight: 1}}, + }, + }, + }, + { + name: "plugin config with multiple profiles", + flags: []string{ + "--config", multiProfilesConfig, + "--kubeconfig", configKubeconfig, + }, + wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{ + "profile-default-plugins": defaultPlugins, + "profile-disable-all-filter-and-score-plugins": { + "BindPlugin": {{Name: "DefaultBinder"}}, + "QueueSortPlugin": {{Name: "PrioritySort"}}, + }, + }, + }, + { + name: "Deprecated SchedulerName flag", + flags: []string{ + "--kubeconfig", configKubeconfig, + "--scheduler-name", "my-scheduler", + }, + wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{ + "my-scheduler": defaultPlugins, + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + fs := pflag.NewFlagSet("test", pflag.PanicOnError) + opts, err := options.NewOptions() + if err != nil { + t.Fatal(err) + } + for _, f := range opts.Flags().FlagSets { + fs.AddFlagSet(f) + } + if err := fs.Parse(tc.flags); err != nil { + t.Fatal(err) + } + + var args []string + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cc, sched, err := Setup(ctx, args, opts) + if err != nil { + t.Fatal(err) + } + defer cc.SecureServing.Listener.Close() + defer cc.InsecureServing.Listener.Close() + + gotPlugins := make(map[string]map[string][]kubeschedulerconfig.Plugin) + for n, p := range sched.Profiles { + gotPlugins[n] = p.ListPlugins() + } + + if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" { + t.Errorf("unexpected plugins diff (-want, +got): %s", diff) + } + }) + } +} diff --git a/cmd/kube-scheduler/app/testing/BUILD b/cmd/kube-scheduler/app/testing/BUILD index 8df856388b1..72cc391ea30 100644 --- a/cmd/kube-scheduler/app/testing/BUILD +++ b/cmd/kube-scheduler/app/testing/BUILD @@ -9,6 +9,7 @@ go_library( "//cmd/kube-scheduler/app:go_default_library", "//cmd/kube-scheduler/app/config:go_default_library", "//cmd/kube-scheduler/app/options:go_default_library", + "//pkg/util/configz:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/cmd/kube-scheduler/app/testing/testserver.go b/cmd/kube-scheduler/app/testing/testserver.go index 3c362fd9c91..b6ae8fc34b6 100644 --- a/cmd/kube-scheduler/app/testing/testserver.go +++ b/cmd/kube-scheduler/app/testing/testserver.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/cmd/kube-scheduler/app" kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + "k8s.io/kubernetes/pkg/util/configz" ) // TearDownFunc is to be called to tear down a test server. @@ -66,6 +67,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err if len(result.TmpDir) != 0 { os.RemoveAll(result.TmpDir) } + configz.Delete("componentconfig") } defer func() { if result.TearDownFn == nil { @@ -80,51 +82,52 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err fs := pflag.NewFlagSet("test", pflag.PanicOnError) - s, err := options.NewOptions() + opts, err := options.NewOptions() if err != nil { return TestServer{}, err } - namedFlagSets := s.Flags() + namedFlagSets := opts.Flags() for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) } fs.Parse(customFlags) - if s.SecureServing.BindPort != 0 { - s.SecureServing.Listener, s.SecureServing.BindPort, err = createListenerOnFreePort() + if opts.SecureServing.BindPort != 0 { + opts.SecureServing.Listener, opts.SecureServing.BindPort, err = createListenerOnFreePort() if err != nil { return result, fmt.Errorf("failed to create listener: %v", err) } - s.SecureServing.ServerCert.CertDirectory = result.TmpDir + opts.SecureServing.ServerCert.CertDirectory = result.TmpDir - t.Logf("kube-scheduler will listen securely on port %d...", s.SecureServing.BindPort) + t.Logf("kube-scheduler will listen securely on port %d...", opts.SecureServing.BindPort) } - if s.CombinedInsecureServing.BindPort != 0 { + if opts.CombinedInsecureServing.BindPort != 0 { listener, port, err := createListenerOnFreePort() if err != nil { return result, fmt.Errorf("failed to create listener: %v", err) } - s.CombinedInsecureServing.BindPort = port - s.CombinedInsecureServing.Healthz.Listener = listener - s.CombinedInsecureServing.Metrics.Listener = listener - t.Logf("kube-scheduler will listen insecurely on port %d...", s.CombinedInsecureServing.BindPort) + opts.CombinedInsecureServing.BindPort = port + opts.CombinedInsecureServing.Healthz.Listener = listener + opts.CombinedInsecureServing.Metrics.Listener = listener + t.Logf("kube-scheduler will listen insecurely on port %d...", opts.CombinedInsecureServing.BindPort) } - config, err := s.Config() + + cc, sched, err := app.Setup(ctx, []string{}, opts) if err != nil { return result, fmt.Errorf("failed to create config from options: %v", err) } errCh := make(chan error) go func(ctx context.Context) { - if err := app.Run(ctx, config.Complete()); err != nil { + if err := app.Run(ctx, cc, sched); err != nil { errCh <- err } }(ctx) t.Logf("Waiting for /healthz to be ok...") - client, err := kubernetes.NewForConfig(config.LoopbackClientConfig) + client, err := kubernetes.NewForConfig(cc.LoopbackClientConfig) if err != nil { return result, fmt.Errorf("failed to create a client: %v", err) } @@ -148,9 +151,9 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err } // from here the caller must call tearDown - result.LoopbackClientConfig = config.LoopbackClientConfig - result.Options = s - result.Config = config + result.LoopbackClientConfig = cc.LoopbackClientConfig + result.Options = opts + result.Config = cc.Config result.TearDownFn = tearDown return result, nil