Merge pull request #69663 from sttts/sttts-scheduler-secure-serving

scheduler: enable secure port and authn/z
This commit is contained in:
k8s-ci-robot 2018-11-08 17:36:14 -08:00 committed by GitHub
commit be800e623a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 507 additions and 148 deletions

View File

@ -61,6 +61,7 @@ filegroup(
":package-srcs",
"//cmd/kube-scheduler/app/config:all-srcs",
"//cmd/kube-scheduler/app/options:all-srcs",
"//cmd/kube-scheduler/app/testing:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -12,6 +12,7 @@ go_library(
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
],

View File

@ -22,6 +22,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -32,6 +33,9 @@ type Config struct {
// config is the scheduler server's configuration object.
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
// LoopbackClientConfig is a config for a privileged loopback connection
LoopbackClientConfig *restclient.Config
InsecureServing *apiserver.DeprecatedInsecureServingInfo // nil will disable serving on an insecure port
InsecureMetricsServing *apiserver.DeprecatedInsecureServingInfo // non-nil if metrics should be served independently
Authentication apiserver.AuthenticationInfo
@ -70,5 +74,7 @@ func (c *Config) Complete() CompletedConfig {
c.InsecureMetricsServing.Name = "metrics"
}
apiserver.AuthorizeClientBearerToken(c.LoopbackClientConfig, &c.Authentication, &c.Authorization)
return CompletedConfig{&cc}
}

View File

@ -14,6 +14,7 @@ go_library(
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/client/leaderelectionconfig:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/apis/config/v1alpha1:go_default_library",

View File

@ -31,8 +31,8 @@ import (
// CombinedInsecureServingOptions sets up to two insecure listeners for healthz and metrics. The flags
// override the ComponentConfig and DeprecatedInsecureServingOptions values for both.
type CombinedInsecureServingOptions struct {
Healthz *apiserveroptions.DeprecatedInsecureServingOptions
Metrics *apiserveroptions.DeprecatedInsecureServingOptions
Healthz *apiserveroptions.DeprecatedInsecureServingOptionsWithLoopback
Metrics *apiserveroptions.DeprecatedInsecureServingOptionsWithLoopback
BindPort int // overrides the structs above on ApplyTo, ignored on ApplyToFromLoadedConfig
BindAddress string // overrides the structs above on ApplyTo, ignored on ApplyToFromLoadedConfig
@ -60,11 +60,11 @@ func (o *CombinedInsecureServingOptions) applyTo(c *schedulerappconfig.Config, c
return err
}
if err := o.Healthz.ApplyTo(&c.InsecureServing); err != nil {
if err := o.Healthz.ApplyTo(&c.InsecureServing, &c.LoopbackClientConfig); err != nil {
return err
}
if o.Metrics != nil && (c.ComponentConfig.MetricsBindAddress != c.ComponentConfig.HealthzBindAddress || o.Healthz == nil) {
if err := o.Metrics.ApplyTo(&c.InsecureMetricsServing); err != nil {
if err := o.Metrics.ApplyTo(&c.InsecureMetricsServing, &c.LoopbackClientConfig); err != nil {
return err
}
}
@ -108,7 +108,7 @@ func (o *CombinedInsecureServingOptions) ApplyToFromLoadedConfig(c *schedulerapp
return o.applyTo(c, componentConfig)
}
func updateAddressFromDeprecatedInsecureServingOptions(addr *string, is *apiserveroptions.DeprecatedInsecureServingOptions) error {
func updateAddressFromDeprecatedInsecureServingOptions(addr *string, is *apiserveroptions.DeprecatedInsecureServingOptionsWithLoopback) error {
if is == nil {
*addr = ""
} else {
@ -124,7 +124,7 @@ func updateAddressFromDeprecatedInsecureServingOptions(addr *string, is *apiserv
return nil
}
func updateDeprecatedInsecureServingOptionsFromAddress(is *apiserveroptions.DeprecatedInsecureServingOptions, addr string) error {
func updateDeprecatedInsecureServingOptionsFromAddress(is *apiserveroptions.DeprecatedInsecureServingOptionsWithLoopback, addr string) error {
if is == nil {
return nil
}

View File

@ -46,8 +46,8 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindPort: 0,
},
},
@ -61,7 +61,7 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindPort: 0,
},
},
@ -79,7 +79,7 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindPort: 0,
},
},
@ -97,8 +97,8 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindPort: 0,
},
},
@ -118,8 +118,8 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1235",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindPort: 0,
},
},
@ -141,8 +141,8 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindPort: 1236,
BindAddress: "1.2.3.4",
},
@ -163,8 +163,8 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindAddress: "2.3.4.5",
BindPort: 1234,
},
@ -185,8 +185,8 @@ func TestOptions_ApplyTo(t *testing.T) {
MetricsBindAddress: "1.2.3.4:1234",
},
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{},
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{}).WithLoopback(),
BindAddress: "2.3.4.5",
BindPort: 0,
},

View File

@ -45,6 +45,7 @@ import (
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
"k8s.io/kubernetes/pkg/master/ports"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
@ -56,7 +57,7 @@ type Options struct {
// The default values. These are overridden if ConfigFile is set or by values in InsecureServing.
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
SecureServing *apiserveroptions.SecureServingOptions
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
CombinedInsecureServing *CombinedInsecureServingOptions
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *apiserveroptions.DelegatingAuthorizationOptions
@ -85,25 +86,34 @@ func NewOptions() (*Options, error) {
o := &Options{
ComponentConfig: *cfg,
SecureServing: nil, // TODO: enable with apiserveroptions.NewSecureServingOptions()
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
CombinedInsecureServing: &CombinedInsecureServingOptions{
Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{
Healthz: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindNetwork: "tcp",
},
Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{
}).WithLoopback(),
Metrics: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindNetwork: "tcp",
},
}).WithLoopback(),
BindPort: hport,
BindAddress: hhost,
},
Authentication: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthenticationOptions()
Authorization: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthorizationOptions()
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
Deprecated: &DeprecatedOptions{
UseLegacyPolicyConfig: false,
PolicyConfigMapNamespace: metav1.NamespaceSystem,
},
}
o.Authentication.RemoteKubeConfigFileOptional = true
o.Authorization.RemoteKubeConfigFileOptional = true
o.Authorization.AlwaysAllowPaths = []string{"/healthz"}
// Set the PairName but leave certificate directory blank to generate in-memory by default
o.SecureServing.ServerCert.CertDirectory = ""
o.SecureServing.ServerCert.PairName = "kube-scheduler"
o.SecureServing.BindPort = ports.KubeSchedulerPort
return o, nil
}
@ -173,13 +183,19 @@ func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
}
}
if err := o.SecureServing.ApplyTo(&c.SecureServing); err != nil {
if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
if o.SecureServing != nil && (o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil) {
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
return err
}
}
return o.Authorization.ApplyTo(&c.Authorization)
return nil
}
// Validate validates all the required options.
@ -200,6 +216,12 @@ func (o *Options) Validate() []error {
// Config return a scheduler config object
func (o *Options) Config() (*schedulerappconfig.Config, error) {
if o.SecureServing != nil {
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
}
c := &schedulerappconfig.Config{}
if err := o.ApplyTo(c); err != nil {
return nil, err

View File

@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
@ -175,6 +176,29 @@ users:
}
return *cfg
}(),
SecureServing: (&apiserveroptions.SecureServingOptions{
ServerCert: apiserveroptions.GeneratableKeyCert{
CertDirectory: "/a/b/c",
PairName: "kube-scheduler",
},
HTTP2MaxStreamsPerConnection: 47,
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"},
},
RemoteKubeConfigFileOptional: true,
},
Authorization: &apiserveroptions.DelegatingAuthorizationOptions{
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or /healthz/*
},
},
expectedUsername: "config",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
@ -233,6 +257,29 @@ users:
cfg.ClientConnection.Kubeconfig = flagKubeconfig
return *cfg
}(),
SecureServing: (&apiserveroptions.SecureServingOptions{
ServerCert: apiserveroptions.GeneratableKeyCert{
CertDirectory: "/a/b/c",
PairName: "kube-scheduler",
},
HTTP2MaxStreamsPerConnection: 47,
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
ClientCert: apiserveroptions.ClientCertAuthenticationOptions{},
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"},
},
RemoteKubeConfigFileOptional: true,
},
Authorization: &apiserveroptions.DelegatingAuthorizationOptions{
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or /healthz/*
},
},
expectedUsername: "flag",
expectedConfig: kubeschedulerconfig.KubeSchedulerConfiguration{
@ -264,8 +311,32 @@ users:
},
},
{
name: "overridden master",
options: &Options{Master: insecureserver.URL},
name: "overridden master",
options: &Options{
Master: insecureserver.URL,
SecureServing: (&apiserveroptions.SecureServingOptions{
ServerCert: apiserveroptions.GeneratableKeyCert{
CertDirectory: "/a/b/c",
PairName: "kube-scheduler",
},
HTTP2MaxStreamsPerConnection: 47,
}).WithLoopback(),
Authentication: &apiserveroptions.DelegatingAuthenticationOptions{
CacheTTL: 10 * time.Second,
RequestHeader: apiserveroptions.RequestHeaderAuthenticationOptions{
UsernameHeaders: []string{"x-remote-user"},
GroupHeaders: []string{"x-remote-group"},
ExtraHeaderPrefixes: []string{"x-remote-extra-"},
},
RemoteKubeConfigFileOptional: true,
},
Authorization: &apiserveroptions.DelegatingAuthorizationOptions{
AllowCacheTTL: 10 * time.Second,
DenyCacheTTL: 10 * time.Second,
RemoteKubeConfigFileOptional: true,
AlwaysAllowPaths: []string{"/healthz"}, // note: this does not match /healthz/ or /healthz/*
},
},
expectedUsername: "none, http",
},
{

View File

@ -81,7 +81,7 @@ constraints, affinity and anti-affinity specifications, data locality, inter-wor
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
Run: func(cmd *cobra.Command, args []string) {
if err := run(cmd, args, opts); err != nil {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
@ -94,8 +94,8 @@ through the API as necessary.`,
return cmd
}
// run runs the scheduler.
func run(cmd *cobra.Command, args []string, opts *options.Options) error {
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
@ -136,36 +136,42 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(c.ComponentConfig)
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}
return Run(cc, stopCh)
}
// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
var storageClassInformer storageinformers.StorageClassInformer
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
storageClassInformer = c.InformerFactory.Storage().V1().StorageClasses()
storageClassInformer = cc.InformerFactory.Storage().V1().StorageClasses()
}
// Create the scheduler.
sched, err := scheduler.New(c.Client,
c.InformerFactory.Core().V1().Nodes(),
c.PodInformer,
c.InformerFactory.Core().V1().PersistentVolumes(),
c.InformerFactory.Core().V1().PersistentVolumeClaims(),
c.InformerFactory.Core().V1().ReplicationControllers(),
c.InformerFactory.Apps().V1().ReplicaSets(),
c.InformerFactory.Apps().V1().StatefulSets(),
c.InformerFactory.Core().V1().Services(),
c.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
c.Recorder,
c.ComponentConfig.AlgorithmSource,
scheduler.WithName(c.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(c.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(c.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(c.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(c.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*c.ComponentConfig.BindTimeoutSeconds))
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}
@ -205,7 +211,7 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
cc.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)
// Prepare a reusable run function.
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
@ -222,7 +228,7 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
}
}()
// If leader election is enabled, run via LeaderElector until done and exit.
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
@ -240,7 +246,7 @@ func run(cmd *cobra.Command, args []string, opts *options.Options) error {
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so run inline until done.
// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}

View File

@ -0,0 +1,32 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["testserver.go"],
importpath = "k8s.io/kubernetes/cmd/kube-scheduler/app/testing",
visibility = ["//visibility:public"],
deps = [
"//cmd/kube-scheduler/app:go_default_library",
"//cmd/kube-scheduler/app/config:go_default_library",
"//cmd/kube-scheduler/app/options:go_default_library",
"//pkg/scheduler/algorithmprovider/defaults:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,183 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"io/ioutil"
"net"
"os"
"time"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
kubeschedulerconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
// import DefaultProvider
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)
// TearDownFunc is to be called to tear down a test server.
type TearDownFunc func()
// TestServer return values supplied by kube-test-ApiServer
type TestServer struct {
LoopbackClientConfig *restclient.Config // Rest client config using the magic token
Options *options.Options
Config *kubeschedulerconfig.Config
TearDownFn TearDownFunc // TearDown function
TmpDir string // Temp Dir used, by the apiserver
}
// Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
}
// StartTestServer starts a kube-scheduler. A rest client config and a tear-down func,
// and location of the tmpdir are returned.
//
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, customFlags []string) (result TestServer, err error) {
stopCh := make(chan struct{})
tearDown := func() {
close(stopCh)
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)
}
}
defer func() {
if result.TearDownFn == nil {
tearDown()
}
}()
result.TmpDir, err = ioutil.TempDir("", "kube-scheduler")
if err != nil {
return result, fmt.Errorf("failed to create temp dir: %v", err)
}
fs := pflag.NewFlagSet("test", pflag.PanicOnError)
s, err := options.NewOptions()
if err != nil {
return TestServer{}, err
}
s.AddFlags(fs)
fs.Parse(customFlags)
if s.SecureServing.BindPort != 0 {
s.SecureServing.Listener, s.SecureServing.BindPort, err = createListenerOnFreePort()
if err != nil {
return result, fmt.Errorf("failed to create listener: %v", err)
}
s.SecureServing.ServerCert.CertDirectory = result.TmpDir
t.Logf("kube-scheduler will listen securely on port %d...", s.SecureServing.BindPort)
}
if s.CombinedInsecureServing.BindPort != 0 {
listener, port, err := createListenerOnFreePort()
if err != nil {
return result, fmt.Errorf("failed to create listener: %v", err)
}
s.CombinedInsecureServing.BindPort = port
s.CombinedInsecureServing.Healthz.Listener = listener
s.CombinedInsecureServing.Metrics.Listener = listener
t.Logf("kube-scheduler will listen insecurely on port %d...", s.CombinedInsecureServing.BindPort)
}
config, err := s.Config()
if err != nil {
return result, fmt.Errorf("failed to create config from options: %v", err)
}
errCh := make(chan error)
go func(stopCh <-chan struct{}) {
if err := app.Run(config.Complete(), stopCh); err != nil {
errCh <- err
}
}(stopCh)
t.Logf("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(config.LoopbackClientConfig)
if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err)
}
err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) {
select {
case err := <-errCh:
return false, err
default:
}
result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do()
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
}
// from here the caller must call tearDown
result.LoopbackClientConfig = config.LoopbackClientConfig
result.Options = s
result.Config = config
result.TearDownFn = tearDown
return result, nil
}
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, flags []string) *TestServer {
result, err := StartTestServer(t, flags)
if err == nil {
return &result
}
t.Fatalf("failed to launch server: %v", err)
return nil
}
func createListenerOnFreePort() (net.Listener, int, error) {
ln, err := net.Listen("tcp", ":0")
if err != nil {
return nil, 0, err
}
// get port
tcpAddr, ok := ln.Addr().(*net.TCPAddr)
if !ok {
ln.Close()
return nil, 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
}
return ln, tcpAddr.Port, nil
}

View File

@ -23,9 +23,10 @@ const (
// KubeletPort is the default port for the kubelet server on each host machine.
// May be overridden by a flag at startup.
KubeletPort = 10250
// SchedulerPort is the default port for the scheduler status server.
// InsecureSchedulerPort is the default port for the scheduler status server.
// May be overridden by a flag at startup.
SchedulerPort = 10251
// Deprecated: use the secure KubeSchedulerPort instead.
InsecureSchedulerPort = 10251
// InsecureKubeControllerManagerPort is the default port for the controller manager status server.
// May be overridden by a flag at startup.
// Deprecated: use the secure KubeControllerManagerPort instead.
@ -49,4 +50,8 @@ const (
// CloudControllerManagerPort is the default port for the cloud controller manager server.
// This value may be overridden by a flag at startup.
CloudControllerManagerPort = 10258
// KubeSchedulerPort is the default port for the scheduler status server.
// May be overridden by a flag at startup.
KubeSchedulerPort = 10259
)

View File

@ -254,7 +254,7 @@ type componentStatusStorage struct {
func (s componentStatusStorage) serversToValidate() map[string]*componentstatus.Server {
serversToValidate := map[string]*componentstatus.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.InsecureKubeControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.InsecureSchedulerPort, Path: "/healthz"},
}
for ix, machine := range s.storageFactory.Backends() {

View File

@ -62,7 +62,7 @@ func SetDefaults_KubeSchedulerConfiguration(obj *kubescedulerconfigv1alpha1.Kube
}
obj.HealthzBindAddress = net.JoinHostPort(host, port)
} else {
obj.HealthzBindAddress = net.JoinHostPort("0.0.0.0", strconv.Itoa(ports.SchedulerPort))
obj.HealthzBindAddress = net.JoinHostPort("0.0.0.0", strconv.Itoa(ports.InsecureSchedulerPort))
}
if host, port, err := net.SplitHostPort(obj.MetricsBindAddress); err == nil {
@ -71,7 +71,7 @@ func SetDefaults_KubeSchedulerConfiguration(obj *kubescedulerconfigv1alpha1.Kube
}
obj.MetricsBindAddress = net.JoinHostPort(host, port)
} else {
obj.MetricsBindAddress = net.JoinHostPort("0.0.0.0", strconv.Itoa(ports.SchedulerPort))
obj.MetricsBindAddress = net.JoinHostPort("0.0.0.0", strconv.Itoa(ports.InsecureSchedulerPort))
}
if len(obj.LeaderElection.LockObjectNamespace) == 0 {

View File

@ -37,6 +37,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -127,7 +128,7 @@ type Config struct {
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything chan struct{}
StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
@ -200,7 +201,7 @@ type configFactory struct {
storageClassLister storagelisters.StorageClassLister
// Close this to stop all reflectors
StopEverything chan struct{}
StopEverything <-chan struct{}
scheduledPodsHasSynced cache.InformerSynced
@ -253,12 +254,16 @@ type ConfigFactoryArgs struct {
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
StopCh <-chan struct{}
}
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
stopEverything := make(chan struct{})
stopEverything := args.StopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate

View File

@ -52,7 +52,9 @@ const (
func TestCreate(t *testing.T) {
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
factory.Create()
}
@ -63,7 +65,9 @@ func TestCreateFromConfig(t *testing.T) {
var policy schedulerapi.Policy
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@ -101,7 +105,9 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
var policy schedulerapi.Policy
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@ -140,7 +146,9 @@ func TestCreateFromEmptyConfig(t *testing.T) {
var policy schedulerapi.Policy
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
configData = []byte(`{}`)
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
@ -155,7 +163,9 @@ func TestCreateFromEmptyConfig(t *testing.T) {
// The predicate/priority from DefaultProvider will be used.
func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterFitPredicate("PredicateOne", PredicateOne)
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
@ -188,7 +198,9 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
// Empty predicate/priority sets will be used.
func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
client := fake.NewSimpleClientset()
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterFitPredicate("PredicateOne", PredicateOne)
RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
@ -240,7 +252,9 @@ func TestDefaultErrorFunc(t *testing.T) {
Spec: apitesting.V1DeepEqualSafePodSpec(),
}
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)
@ -370,7 +384,9 @@ func testBind(binding *v1.Binding, t *testing.T) {
func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
client := fake.NewSimpleClientset()
// factory of "default-scheduler"
factory := newConfigFactory(client, -1)
stopCh := make(chan struct{})
factory := newConfigFactory(client, -1, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing")
@ -399,7 +415,9 @@ func TestInvalidFactoryArgs(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
factory := newConfigFactory(client, test.hardPodAffinitySymmetricWeight)
stopCh := make(chan struct{})
factory := newConfigFactory(client, test.hardPodAffinitySymmetricWeight, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: %s, got nothing", test.expectErr)
@ -501,7 +519,7 @@ func TestSkipPodUpdate(t *testing.T) {
}
}
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32) Configurator {
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,
@ -521,6 +539,7 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
disablePodPreemption,
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,
stopCh,
})
}

View File

@ -59,12 +59,6 @@ type Scheduler struct {
config *factory.Config
}
// StopEverything closes the scheduler config's StopEverything channel, to shut
// down the Scheduler.
func (sched *Scheduler) StopEverything() {
close(sched.config.StopEverything)
}
// Cache returns the cache in scheduler for test to check the data in scheduler.
func (sched *Scheduler) Cache() schedulerinternalcache.Cache {
return sched.config.SchedulerCache
@ -147,6 +141,7 @@ func New(client clientset.Interface,
storageClassInformer storageinformers.StorageClassInformer,
recorder record.EventRecorder,
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
options := defaultSchedulerOptions
@ -230,6 +225,7 @@ func New(client clientset.Interface,
// Additional tweaks to the config produced by the configurator.
config.Recorder = recorder
config.DisablePreemption = options.disablePreemption
config.StopEverything = stopCh
// Create the scheduler.
sched := NewFromConfig(config)
return sched, nil

View File

@ -175,6 +175,8 @@ func TestSchedulerCreation(t *testing.T) {
factory.RegisterPriorityFunction("PriorityOne", PriorityOne, 1)
factory.RegisterAlgorithmProvider(testSource, sets.NewString("PredicateOne"), sets.NewString("PriorityOne"))
stopCh := make(chan struct{})
defer close(stopCh)
_, err := New(client,
informerFactory.Core().V1().Nodes(),
factory.NewPodInformer(client, 0),
@ -188,6 +190,7 @@ func TestSchedulerCreation(t *testing.T) {
informerFactory.Storage().V1().StorageClasses(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh,
WithBindTimeoutSeconds(defaultBindTimeout))
if err != nil {

View File

@ -150,11 +150,11 @@ func (s *DeprecatedInsecureServingOptionsWithLoopback) ApplyTo(insecureServingIn
secureLoopbackClientConfig, err := (*insecureServingInfo).NewLoopbackClientConfig()
switch {
// if we failed and there's no fallback loopback client config, we need to fail
case err != nil && secureLoopbackClientConfig == nil:
case err != nil && *loopbackClientConfig == nil:
return err
// if we failed, but we already have a fallback loopback client config (usually insecure), allow it
case err != nil && secureLoopbackClientConfig != nil:
case err != nil && *loopbackClientConfig != nil:
default:
*loopbackClientConfig = secureLoopbackClientConfig

View File

@ -63,11 +63,11 @@ func (s *SecureServingOptionsWithLoopback) ApplyTo(secureServingInfo **server.Se
secureLoopbackClientConfig, err := (*secureServingInfo).NewLoopbackClientConfig(uuid.NewRandom().String(), certPem)
switch {
// if we failed and there's no fallback loopback client config, we need to fail
case err != nil && secureLoopbackClientConfig == nil:
case err != nil && *loopbackClientConfig == nil:
return err
// if we failed, but we already have a fallback loopback client config (usually insecure), allow it
case err != nil && secureLoopbackClientConfig != nil:
case err != nil && *loopbackClientConfig != nil:
default:
*loopbackClientConfig = secureLoopbackClientConfig

View File

@ -281,7 +281,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() {
// Requires master ssh access.
framework.SkipUnlessProviderIs("gce", "aws")
restarter := NewRestartConfig(
framework.GetMasterHost(), "kube-scheduler", ports.SchedulerPort, restartPollInterval, restartTimeout)
framework.GetMasterHost(), "kube-scheduler", ports.InsecureSchedulerPort, restartPollInterval, restartTimeout)
// Create pods while the scheduler is down and make sure the scheduler picks them up by
// scaling the rc to the same size.

View File

@ -127,7 +127,7 @@ func (g *MetricsGrabber) GrabFromScheduler() (SchedulerMetrics, error) {
if !g.registeredMaster {
return SchedulerMetrics{}, fmt.Errorf("Master's Kubelet is not registered. Skipping Scheduler's metrics gathering.")
}
output, err := g.getMetricsFromPod(g.client, fmt.Sprintf("%v-%v", "kube-scheduler", g.masterName), metav1.NamespaceSystem, ports.SchedulerPort)
output, err := g.getMetricsFromPod(g.client, fmt.Sprintf("%v-%v", "kube-scheduler", g.masterName), metav1.NamespaceSystem, ports.InsecureSchedulerPort)
if err != nil {
return SchedulerMetrics{}, err
}

View File

@ -611,7 +611,7 @@ func sendRestRequestToScheduler(c clientset.Interface, op string) (string, error
Context(ctx).
Namespace(metav1.NamespaceSystem).
Resource("pods").
Name(fmt.Sprintf("kube-scheduler-%v:%v", TestContext.CloudConfig.MasterName, ports.SchedulerPort)).
Name(fmt.Sprintf("kube-scheduler-%v:%v", TestContext.CloudConfig.MasterName, ports.InsecureSchedulerPort)).
SubResource("proxy").
Suffix("metrics").
Do().Raw()

View File

@ -180,7 +180,7 @@ var _ = SIGDescribe("Firewall rule", func() {
masterAddresses := framework.GetAllMasterAddresses(cs)
for _, masterAddress := range masterAddresses {
assertNotReachableHTTPTimeout(masterAddress, ports.InsecureKubeControllerManagerPort, gce.FirewallTestTcpTimeout)
assertNotReachableHTTPTimeout(masterAddress, ports.SchedulerPort, gce.FirewallTestTcpTimeout)
assertNotReachableHTTPTimeout(masterAddress, ports.InsecureSchedulerPort, gce.FirewallTestTcpTimeout)
}
assertNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletPort, gce.FirewallTestTcpTimeout)
assertNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletReadOnlyPort, gce.FirewallTestTcpTimeout)

View File

@ -41,7 +41,6 @@ filegroup(
"//test/integration/benchmark/jsonify:all-srcs",
"//test/integration/client:all-srcs",
"//test/integration/configmap:all-srcs",
"//test/integration/controllermanager:all-srcs",
"//test/integration/cronjob:all-srcs",
"//test/integration/daemonset:all-srcs",
"//test/integration/defaulttolerationseconds:all-srcs",
@ -66,6 +65,7 @@ filegroup(
"//test/integration/scheduler_perf:all-srcs",
"//test/integration/secrets:all-srcs",
"//test/integration/serviceaccount:all-srcs",
"//test/integration/serving:all-srcs",
"//test/integration/statefulset:all-srcs",
"//test/integration/storageclasses:all-srcs",
"//test/integration/tls:all-srcs",

View File

@ -349,7 +349,7 @@ func TestSchedulerExtender(t *testing.T) {
}
policy.APIVersion = "v1"
context = initTestScheduler(t, context, nil, false, &policy)
context = initTestScheduler(t, context, false, &policy)
defer cleanupTest(t, context)
DoTestPodScheduling(context.ns, t, clientSet)

View File

@ -522,7 +522,10 @@ func TestMultiScheduler(t *testing.T) {
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
podInformer2 := factory.NewPodInformer(context.clientSet, 0)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2)
stopCh := make(chan struct{})
defer close(stopCh)
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, stopCh)
schedulerConfig2, err := schedulerConfigFactory2.Create()
if err != nil {
t.Errorf("Couldn't create scheduler config: %v", err)
@ -530,12 +533,11 @@ func TestMultiScheduler(t *testing.T) {
eventBroadcaster2 := record.NewBroadcaster()
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler})
eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")})
go podInformer2.Informer().Run(schedulerConfig2.StopEverything)
informerFactory2.Start(schedulerConfig2.StopEverything)
go podInformer2.Informer().Run(stopCh)
informerFactory2.Start(stopCh)
sched2, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig2}, nil...)
sched2.Run()
defer close(schedulerConfig2.StopEverything)
// 6. **check point-2**:
// - testPodWithAnnotationFitsFoo should be scheduled

View File

@ -85,13 +85,10 @@ func TestTaintNodeByCondition(t *testing.T) {
admission.SetExternalKubeClientSet(externalClientset)
admission.SetExternalKubeInformerFactory(externalInformers)
controllerCh := make(chan struct{})
defer close(controllerCh)
// Apply feature gates to enable TaintNodesByCondition
algorithmprovider.ApplyFeatureGates()
context = initTestScheduler(t, context, controllerCh, false, nil)
context = initTestScheduler(t, context, false, nil)
cs := context.clientSet
informers := context.informerFactory
nsName := context.ns.Name
@ -120,13 +117,13 @@ func TestTaintNodeByCondition(t *testing.T) {
t.Errorf("Failed to create node controller: %v", err)
return
}
go nc.Run(controllerCh)
go nc.Run(context.stopCh)
// Waiting for all controller sync.
externalInformers.Start(controllerCh)
externalInformers.WaitForCacheSync(controllerCh)
informers.Start(controllerCh)
informers.WaitForCacheSync(controllerCh)
externalInformers.Start(context.stopCh)
externalInformers.WaitForCacheSync(context.stopCh)
informers.Start(context.stopCh)
informers.WaitForCacheSync(context.stopCh)
// -------------------------------------------
// Test TaintNodeByCondition feature.

View File

@ -65,6 +65,7 @@ type TestContext struct {
schedulerConfigFactory factory.Configurator
schedulerConfig *factory.Config
scheduler *scheduler.Scheduler
stopCh chan struct{}
}
// createConfiguratorWithPodInformer creates a configurator for scheduler.
@ -73,6 +74,7 @@ func createConfiguratorWithPodInformer(
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
stopCh <-chan struct{},
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: schedulerName,
@ -92,13 +94,16 @@ func createConfiguratorWithPodInformer(
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: 600,
StopCh: stopCh,
})
}
// initTestMasterAndScheduler initializes a test environment and creates a master with default
// configuration.
func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext {
var context TestContext
context := TestContext{
stopCh: make(chan struct{}),
}
// 1. Create master
h := &framework.MasterHolder{Initialized: make(chan struct{})}
@ -138,13 +143,12 @@ func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface
func initTestScheduler(
t *testing.T,
context *TestContext,
controllerCh chan struct{},
setPodInformer bool,
policy *schedulerapi.Policy,
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, controllerCh, setPodInformer, policy, false, false, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, false, false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -152,7 +156,6 @@ func initTestScheduler(
func initTestSchedulerWithOptions(
t *testing.T,
context *TestContext,
controllerCh chan struct{},
setPodInformer bool,
policy *schedulerapi.Policy,
disablePreemption bool,
@ -179,7 +182,7 @@ func initTestSchedulerWithOptions(
}
context.schedulerConfigFactory = createConfiguratorWithPodInformer(
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory)
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, context.stopCh)
var err error
@ -193,11 +196,6 @@ func initTestSchedulerWithOptions(
t.Fatalf("Couldn't create scheduler config: %v", err)
}
// set controllerCh if provided.
if controllerCh != nil {
context.schedulerConfig.StopEverything = controllerCh
}
// set DisablePreemption option
context.schedulerConfig.DisablePreemption = disablePreemption
@ -252,21 +250,21 @@ func initDisruptionController(context *TestContext) *disruption.DisruptionContro
// initTest initializes a test environment and creates master and scheduler with default
// configuration.
func initTest(t *testing.T, nsPrefix string) *TestContext {
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), nil, true, nil)
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), true, nil)
}
// initTestDisablePreemption initializes a test environment and creates master and scheduler with default
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), nil, true, nil, true, false, time.Second)
t, initTestMaster(t, nsPrefix, nil), true, nil, true, false, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
// at the end of a test.
func cleanupTest(t *testing.T, context *TestContext) {
// Kill the scheduler.
close(context.schedulerConfig.StopEverything)
close(context.stopCh)
// Cleanup nodes.
context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
framework.DeleteTestingNamespace(context.ns, context.httpServer, t)

View File

@ -901,9 +901,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
// Set feature gates
utilfeature.DefaultFeatureGate.SetFromMap(features)
controllerCh := make(chan struct{})
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), controllerCh, false, nil, false, disableEquivalenceCache, resyncPeriod)
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, false, disableEquivalenceCache, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name
@ -912,10 +910,10 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(controllerCh)
go ctrl.Run(context.stopCh)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(controllerCh)
informerFactory.WaitForCacheSync(controllerCh)
informerFactory.Start(context.stopCh)
informerFactory.WaitForCacheSync(context.stopCh)
// Create shared objects
// Create nodes
@ -936,7 +934,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, features map[s
return &testConfig{
client: clientset,
ns: ns,
stop: controllerCh,
stop: context.stopCh,
teardown: func() {
deleteTestObjects(clientset, ns, nil)
cleanupTest(t, context)

View File

@ -20,6 +20,7 @@ go_test(
"//cmd/cloud-controller-manager/app/testing:go_default_library",
"//cmd/kube-apiserver/app/testing:go_default_library",
"//cmd/kube-controller-manager/app/testing:go_default_library",
"//cmd/kube-scheduler/app/testing:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//staging/src/k8s.io/api/rbac/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controllermanager
package serving
import (
"testing"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package controllermanager
package serving
import (
"crypto/tls"
@ -33,15 +33,16 @@ import (
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/kubernetes"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider"
cloudctrlmgrtesting "k8s.io/kubernetes/cmd/cloud-controller-manager/app/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
kubectrlmgrtesting "k8s.io/kubernetes/cmd/kube-controller-manager/app/testing"
kubeschedulertesting "k8s.io/kubernetes/cmd/kube-scheduler/app/testing"
"k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/test/integration/framework"
)
type controllerManagerTester interface {
type componentTester interface {
StartTestServer(t kubectrlmgrtesting.Logger, customFlags []string) (*options.SecureServingOptionsWithLoopback, *server.SecureServingInfo, *server.DeprecatedInsecureServingInfo, func(), error)
}
@ -65,7 +66,17 @@ func (cloudControllerManagerTester) StartTestServer(t kubectrlmgrtesting.Logger,
return gotResult.Options.SecureServing, gotResult.Config.SecureServing, gotResult.Config.InsecureServing, gotResult.TearDownFn, err
}
func TestControllerManagerServing(t *testing.T) {
type kubeSchedulerTester struct{}
func (kubeSchedulerTester) StartTestServer(t kubectrlmgrtesting.Logger, customFlags []string) (*options.SecureServingOptionsWithLoopback, *server.SecureServingInfo, *server.DeprecatedInsecureServingInfo, func(), error) {
gotResult, err := kubeschedulertesting.StartTestServer(t, customFlags)
if err != nil {
return nil, nil, nil, nil, err
}
return gotResult.Options.SecureServing, gotResult.Config.SecureServing, gotResult.Config.InsecureServing, gotResult.TearDownFn, err
}
func TestComponentSecureServingAndAuth(t *testing.T) {
if !cloudprovider.IsCloudProvider("fake") {
cloudprovider.RegisterCloudProvider("fake", fakeCloudProviderFactory)
}
@ -188,20 +199,21 @@ users:
tests := []struct {
name string
tester controllerManagerTester
tester componentTester
extraFlags []string
}{
{"kube-controller-manager", kubeControllerManagerTester{}, nil},
{"cloud-controller-manager", cloudControllerManagerTester{}, []string{"--cloud-provider=fake"}},
{"kube-scheduler", kubeSchedulerTester{}, nil},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testControllerManager(t, tt.tester, apiserverConfig.Name(), brokenApiserverConfig.Name(), token, tt.extraFlags)
testComponent(t, tt.tester, apiserverConfig.Name(), brokenApiserverConfig.Name(), token, tt.extraFlags)
})
}
}
func testControllerManager(t *testing.T, tester controllerManagerTester, kubeconfig, brokenKubeconfig, token string, extraFlags []string) {
func testComponent(t *testing.T, tester componentTester, kubeconfig, brokenKubeconfig, token string, extraFlags []string) {
tests := []struct {
name string
flags []string
@ -228,8 +240,7 @@ func testControllerManager(t *testing.T, tester controllerManagerTester, kubecon
"--kubeconfig", kubeconfig,
"--leader-elect=false",
}, "/healthz", true, false, intPtr(http.StatusOK), nil},
{"/metrics without auhn/z", []string{
"--kubeconfig", kubeconfig,
{"/metrics without authn/authz", []string{
"--kubeconfig", kubeconfig,
"--leader-elect=false",
"--port=10253",
@ -297,7 +308,7 @@ func testControllerManager(t *testing.T, tester controllerManagerTester, kubecon
serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt")
serverCert, err := ioutil.ReadFile(serverCertPath)
if err != nil {
t.Fatalf("Failed to read controller-manager server cert %q: %v", serverCertPath, err)
t.Fatalf("Failed to read component server cert %q: %v", serverCertPath, err)
}
pool.AppendCertsFromPEM(serverCert)
tr := &http.Transport{
@ -316,13 +327,13 @@ func testControllerManager(t *testing.T, tester controllerManagerTester, kubecon
}
r, err := client.Do(req)
if err != nil {
t.Fatalf("failed to GET %s from controller-manager: %v", tt.path, err)
t.Fatalf("failed to GET %s from component: %v", tt.path, err)
}
body, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if got, expected := r.StatusCode, *tt.wantSecureCode; got != expected {
t.Fatalf("expected http %d at %s of controller-manager, got: %d %q", expected, tt.path, got, string(body))
t.Fatalf("expected http %d at %s of component, got: %d %q", expected, tt.path, got, string(body))
}
}
@ -332,12 +343,12 @@ func testControllerManager(t *testing.T, tester controllerManagerTester, kubecon
url := fmt.Sprintf("http://%s%s", insecureInfo.Listener.Addr().String(), tt.path)
r, err := http.Get(url)
if err != nil {
t.Fatalf("failed to GET %s from controller-manager: %v", tt.path, err)
t.Fatalf("failed to GET %s from component: %v", tt.path, err)
}
body, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if got, expected := r.StatusCode, *tt.wantInsecureCode; got != expected {
t.Fatalf("expected http %d at %s of controller-manager, got: %d %q", expected, tt.path, got, string(body))
t.Fatalf("expected http %d at %s of component, got: %d %q", expected, tt.path, got, string(body))
}
}
})

View File

@ -66,7 +66,8 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
evtWatch := evtBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
Interface: clientSet.CoreV1().Events("")})
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory)
stopCh := make(chan struct{})
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory, stopCh)
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *factory.Config) {
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
@ -75,16 +76,13 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
glog.Fatalf("Error creating scheduler: %v", err)
}
stop := make(chan struct{})
informerFactory.Start(stop)
informerFactory.Start(stopCh)
sched.Run()
shutdownFunc := func() {
glog.Infof("destroying scheduler")
evtWatch.Stop()
sched.StopEverything()
close(stop)
close(stopCh)
glog.Infof("destroyed scheduler")
}
return schedulerConfigurator, shutdownFunc
@ -94,6 +92,7 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
func createSchedulerConfigurator(
clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory,
stopCh <-chan struct{},
) factory.Configurator {
// Enable EnableEquivalenceClassCache for all integration tests.
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
@ -115,5 +114,6 @@ func createSchedulerConfigurator(
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
StopCh: stopCh,
})
}