Add test for kube-scheduler command setup

This commit is contained in:
notpad 2020-03-20 07:36:47 +08:00
parent 900143c6d4
commit 16015a691c
5 changed files with 393 additions and 72 deletions

View File

@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -65,3 +66,15 @@ filegroup(
],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["server_test.go"],
embed = [":go_default_library"],
deps = [
"//cmd/kube-scheduler/app/options:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)

View File

@ -119,33 +119,29 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
if len(args) != 0 {
fmt.Fprint(os.Stderr, "arguments are not supported\n")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if errs := opts.Validate(); len(errs) > 0 {
return utilerrors.NewAggregate(errs)
cc, sched, err := Setup(ctx, args, opts, registryOptions...)
if err != nil {
return err
}
if len(opts.WriteConfigTo) > 0 {
c := &schedulerserverconfig.Config{}
if err := opts.ApplyTo(c); err != nil {
return err
}
if err := options.WriteConfigFile(opts.WriteConfigTo, &c.ComponentConfig); err != nil {
if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
return err
}
klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
return nil
}
c, err := opts.Config()
if err != nil {
return err
}
return Run(ctx, cc, sched)
}
// Get the completed config
cc := c.Complete()
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
@ -154,45 +150,6 @@ func runCommand(cmd *cobra.Command, args []string, opts *options.Options, regist
return fmt.Errorf("unable to register configz: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return Run(ctx, cc, registryOptions...)
}
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
outOfTreeRegistry := make(framework.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return err
}
}
recorderFactory := getRecorderFactory(&cc)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return err
}
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(ctx.Done())
@ -340,3 +297,52 @@ func WithPlugin(name string, factory framework.PluginFactory) Option {
return registry.Register(name, factory)
}
}
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, args []string, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
if len(args) != 0 {
fmt.Fprint(os.Stderr, "arguments are not supported\n")
}
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// Get the completed config
cc := c.Complete()
outOfTreeRegistry := make(framework.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return nil, nil, err
}
return &cc, sched, nil
}

View File

@ -0,0 +1,298 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/spf13/pflag"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
func TestSetup(t *testing.T) {
// temp dir
tmpDir, err := ioutil.TempDir("", "scheduler-options")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// https server
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
w.Write([]byte(`ok`))
}))
defer server.Close()
configKubeconfig := filepath.Join(tmpDir, "config.kubeconfig")
if err := ioutil.WriteFile(configKubeconfig, []byte(fmt.Sprintf(`
apiVersion: v1
kind: Config
clusters:
- cluster:
insecure-skip-tls-verify: true
server: %s
name: default
contexts:
- context:
cluster: default
user: default
name: default
current-context: default
users:
- name: default
user:
username: config
`, server.URL)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
v1alpha1Config := filepath.Join(tmpDir, "kubeconfig_v1alpha1.yaml")
if err := ioutil.WriteFile(v1alpha1Config, []byte(fmt.Sprintf(`
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: "my-old-scheduler"
clientConnection:
kubeconfig: "%s"
leaderElection:
leaderElect: true
hardPodAffinitySymmetricWeight: 3`, configKubeconfig)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
// plugin config
pluginConfigFile := filepath.Join(tmpDir, "plugin.yaml")
if err := ioutil.WriteFile(pluginConfigFile, []byte(fmt.Sprintf(`
apiVersion: kubescheduler.config.k8s.io/v1alpha2
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: "%s"
profiles:
- plugins:
preFilter:
enabled:
- name: NodeResourcesFit
- name: NodePorts
disabled:
- name: "*"
filter:
enabled:
- name: NodeResourcesFit
- name: NodePorts
disabled:
- name: "*"
preScore:
enabled:
- name: InterPodAffinity
- name: TaintToleration
disabled:
- name: "*"
score:
enabled:
- name: InterPodAffinity
- name: TaintToleration
disabled:
- name: "*"
`, configKubeconfig)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
// multiple profiles config
multiProfilesConfig := filepath.Join(tmpDir, "multi-profiles.yaml")
if err := ioutil.WriteFile(multiProfilesConfig, []byte(fmt.Sprintf(`
apiVersion: kubescheduler.config.k8s.io/v1alpha2
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: "%s"
profiles:
- schedulerName: "profile-default-plugins"
- schedulerName: "profile-disable-all-filter-and-score-plugins"
plugins:
preFilter:
disabled:
- name: "*"
filter:
disabled:
- name: "*"
preScore:
disabled:
- name: "*"
score:
disabled:
- name: "*"
`, configKubeconfig)), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
defaultPlugins := map[string][]kubeschedulerconfig.Plugin{
"QueueSortPlugin": {
{Name: "PrioritySort"},
},
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
{Name: "PodTopologySpread"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
{Name: "PodTopologySpread"},
},
"PreScorePlugin": {
{Name: "InterPodAffinity"},
{Name: "DefaultPodTopologySpread"},
{Name: "TaintToleration"},
{Name: "PodTopologySpread"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
{Name: "PodTopologySpread", Weight: 1},
},
"BindPlugin": {{Name: "DefaultBinder"}},
}
testcases := []struct {
name string
flags []string
wantPlugins map[string]map[string][]kubeschedulerconfig.Plugin
}{
{
name: "default config",
flags: []string{
"--kubeconfig", configKubeconfig,
},
wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{
"default-scheduler": defaultPlugins,
},
},
{
name: "v1alpha1 config with SchedulerName and HardPodAffinitySymmetricWeight",
flags: []string{
"--config", v1alpha1Config,
"--kubeconfig", configKubeconfig,
},
wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{
"my-old-scheduler": defaultPlugins,
},
},
{
name: "plugin config with single profile",
flags: []string{
"--config", pluginConfigFile,
"--kubeconfig", configKubeconfig,
},
wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{
"default-scheduler": {
"BindPlugin": {{Name: "DefaultBinder"}},
"FilterPlugin": {{Name: "NodeResourcesFit"}, {Name: "NodePorts"}},
"PreFilterPlugin": {{Name: "NodeResourcesFit"}, {Name: "NodePorts"}},
"PreScorePlugin": {{Name: "InterPodAffinity"}, {Name: "TaintToleration"}},
"QueueSortPlugin": {{Name: "PrioritySort"}},
"ScorePlugin": {{Name: "InterPodAffinity", Weight: 1}, {Name: "TaintToleration", Weight: 1}},
},
},
},
{
name: "plugin config with multiple profiles",
flags: []string{
"--config", multiProfilesConfig,
"--kubeconfig", configKubeconfig,
},
wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{
"profile-default-plugins": defaultPlugins,
"profile-disable-all-filter-and-score-plugins": {
"BindPlugin": {{Name: "DefaultBinder"}},
"QueueSortPlugin": {{Name: "PrioritySort"}},
},
},
},
{
name: "Deprecated SchedulerName flag",
flags: []string{
"--kubeconfig", configKubeconfig,
"--scheduler-name", "my-scheduler",
},
wantPlugins: map[string]map[string][]kubeschedulerconfig.Plugin{
"my-scheduler": defaultPlugins,
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
opts, err := options.NewOptions()
if err != nil {
t.Fatal(err)
}
for _, f := range opts.Flags().FlagSets {
fs.AddFlagSet(f)
}
if err := fs.Parse(tc.flags); err != nil {
t.Fatal(err)
}
var args []string
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc, sched, err := Setup(ctx, args, opts)
if err != nil {
t.Fatal(err)
}
defer cc.SecureServing.Listener.Close()
defer cc.InsecureServing.Listener.Close()
gotPlugins := make(map[string]map[string][]kubeschedulerconfig.Plugin)
for n, p := range sched.Profiles {
gotPlugins[n] = p.ListPlugins()
}
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
})
}
}

View File

@ -9,6 +9,7 @@ go_library(
"//cmd/kube-scheduler/app:go_default_library",
"//cmd/kube-scheduler/app/config:go_default_library",
"//cmd/kube-scheduler/app/options:go_default_library",
"//pkg/util/configz:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/cmd/kube-scheduler/app"
kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
"k8s.io/kubernetes/pkg/util/configz"
)
// TearDownFunc is to be called to tear down a test server.
@ -66,6 +67,7 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
configz.Delete("componentconfig")
}
defer func() {
if result.TearDownFn == nil {
@ -80,51 +82,52 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
s, err := options.NewOptions()
opts, err := options.NewOptions()
if err != nil {
return TestServer{}, err
}
namedFlagSets := s.Flags()
namedFlagSets := opts.Flags()
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
fs.Parse(customFlags)
if s.SecureServing.BindPort != 0 {
s.SecureServing.Listener, s.SecureServing.BindPort, err = createListenerOnFreePort()
if opts.SecureServing.BindPort != 0 {
opts.SecureServing.Listener, opts.SecureServing.BindPort, err = createListenerOnFreePort()
if err != nil {
return result, fmt.Errorf("failed to create listener: %v", err)
}
s.SecureServing.ServerCert.CertDirectory = result.TmpDir
opts.SecureServing.ServerCert.CertDirectory = result.TmpDir
t.Logf("kube-scheduler will listen securely on port %d...", s.SecureServing.BindPort)
t.Logf("kube-scheduler will listen securely on port %d...", opts.SecureServing.BindPort)
}
if s.CombinedInsecureServing.BindPort != 0 {
if opts.CombinedInsecureServing.BindPort != 0 {
listener, port, err := createListenerOnFreePort()
if err != nil {
return result, fmt.Errorf("failed to create listener: %v", err)
}
s.CombinedInsecureServing.BindPort = port
s.CombinedInsecureServing.Healthz.Listener = listener
s.CombinedInsecureServing.Metrics.Listener = listener
t.Logf("kube-scheduler will listen insecurely on port %d...", s.CombinedInsecureServing.BindPort)
opts.CombinedInsecureServing.BindPort = port
opts.CombinedInsecureServing.Healthz.Listener = listener
opts.CombinedInsecureServing.Metrics.Listener = listener
t.Logf("kube-scheduler will listen insecurely on port %d...", opts.CombinedInsecureServing.BindPort)
}
config, err := s.Config()
cc, sched, err := app.Setup(ctx, []string{}, opts)
if err != nil {
return result, fmt.Errorf("failed to create config from options: %v", err)
}
errCh := make(chan error)
go func(ctx context.Context) {
if err := app.Run(ctx, config.Complete()); err != nil {
if err := app.Run(ctx, cc, sched); err != nil {
errCh <- err
}
}(ctx)
t.Logf("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
client, err := kubernetes.NewForConfig(cc.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}
@ -148,9 +151,9 @@ func StartTestServer(t Logger, customFlags []string) (result TestServer, err err
}
// from here the caller must call tearDown
result.LoopbackClientConfig = config.LoopbackClientConfig
result.Options = s
result.Config = config
result.LoopbackClientConfig = cc.LoopbackClientConfig
result.Options = opts
result.Config = cc.Config
result.TearDownFn = tearDown
return result, nil