mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 13:07:07 +00:00
* ctx logging involved in startup as per https://github.com/kubernetes/kubernetes/pull/111155#pullrequestreview-1283257121 * use klog.Background().Error in flag handling * revert scheduler_perf changes * refence issue in code comment * enable ctx logcheck for cmd/kube-scheduler
383 lines
14 KiB
Go
383 lines
14 KiB
Go
/*
|
|
Copyright 2018 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package options
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/uuid"
|
|
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/dynamic/dynamicinformer"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
"k8s.io/client-go/tools/record"
|
|
cliflag "k8s.io/component-base/cli/flag"
|
|
componentbaseconfig "k8s.io/component-base/config"
|
|
"k8s.io/component-base/config/options"
|
|
"k8s.io/component-base/logs"
|
|
logsapi "k8s.io/component-base/logs/api/v1"
|
|
"k8s.io/component-base/metrics"
|
|
"k8s.io/klog/v2"
|
|
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
|
|
"k8s.io/kubernetes/pkg/scheduler"
|
|
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
|
netutils "k8s.io/utils/net"
|
|
)
|
|
|
|
// Options has all the params needed to run a Scheduler
|
|
type Options struct {
|
|
// The default values.
|
|
ComponentConfig *kubeschedulerconfig.KubeSchedulerConfiguration
|
|
|
|
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
|
|
Authentication *apiserveroptions.DelegatingAuthenticationOptions
|
|
Authorization *apiserveroptions.DelegatingAuthorizationOptions
|
|
Metrics *metrics.Options
|
|
Logs *logs.Options
|
|
Deprecated *DeprecatedOptions
|
|
LeaderElection *componentbaseconfig.LeaderElectionConfiguration
|
|
|
|
// ConfigFile is the location of the scheduler server's configuration file.
|
|
ConfigFile string
|
|
|
|
// WriteConfigTo is the path where the default configuration will be written.
|
|
WriteConfigTo string
|
|
|
|
Master string
|
|
|
|
// Flags hold the parsed CLI flags.
|
|
Flags *cliflag.NamedFlagSets
|
|
}
|
|
|
|
// NewOptions returns default scheduler app options.
|
|
func NewOptions() *Options {
|
|
o := &Options{
|
|
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
|
|
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
|
|
Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
|
|
Deprecated: &DeprecatedOptions{
|
|
PodMaxInUnschedulablePodsDuration: 5 * time.Minute,
|
|
},
|
|
LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{
|
|
LeaderElect: true,
|
|
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
|
|
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
|
|
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
|
|
ResourceLock: "leases",
|
|
ResourceName: "kube-scheduler",
|
|
ResourceNamespace: "kube-system",
|
|
},
|
|
Metrics: metrics.NewOptions(),
|
|
Logs: logs.NewOptions(),
|
|
}
|
|
|
|
o.Authentication.TolerateInClusterLookupFailure = true
|
|
o.Authentication.RemoteKubeConfigFileOptional = true
|
|
o.Authorization.RemoteKubeConfigFileOptional = true
|
|
|
|
// Set the PairName but leave certificate directory blank to generate in-memory by default
|
|
o.SecureServing.ServerCert.CertDirectory = ""
|
|
o.SecureServing.ServerCert.PairName = "kube-scheduler"
|
|
o.SecureServing.BindPort = kubeschedulerconfig.DefaultKubeSchedulerPort
|
|
|
|
o.initFlags()
|
|
|
|
return o
|
|
}
|
|
|
|
// ApplyDeprecated obtains the deprecated CLI args and set them to `o.ComponentConfig` if specified.
|
|
func (o *Options) ApplyDeprecated() {
|
|
if o.Flags == nil {
|
|
return
|
|
}
|
|
// Obtain deprecated CLI args. Set them to cfg if specified in command line.
|
|
deprecated := o.Flags.FlagSet("deprecated")
|
|
if deprecated.Changed("profiling") {
|
|
o.ComponentConfig.EnableProfiling = o.Deprecated.EnableProfiling
|
|
}
|
|
if deprecated.Changed("contention-profiling") {
|
|
o.ComponentConfig.EnableContentionProfiling = o.Deprecated.EnableContentionProfiling
|
|
}
|
|
if deprecated.Changed("kubeconfig") {
|
|
o.ComponentConfig.ClientConnection.Kubeconfig = o.Deprecated.Kubeconfig
|
|
}
|
|
if deprecated.Changed("kube-api-content-type") {
|
|
o.ComponentConfig.ClientConnection.ContentType = o.Deprecated.ContentType
|
|
}
|
|
if deprecated.Changed("kube-api-qps") {
|
|
o.ComponentConfig.ClientConnection.QPS = o.Deprecated.QPS
|
|
}
|
|
if deprecated.Changed("kube-api-burst") {
|
|
o.ComponentConfig.ClientConnection.Burst = o.Deprecated.Burst
|
|
}
|
|
if deprecated.Changed("lock-object-namespace") {
|
|
o.ComponentConfig.LeaderElection.ResourceNamespace = o.Deprecated.ResourceNamespace
|
|
}
|
|
if deprecated.Changed("lock-object-name") {
|
|
o.ComponentConfig.LeaderElection.ResourceName = o.Deprecated.ResourceName
|
|
}
|
|
}
|
|
|
|
// ApplyLeaderElectionTo obtains the CLI args related with leaderelection, and override the values in `cfg`.
|
|
// Then the `cfg` object is injected into the `options` object.
|
|
func (o *Options) ApplyLeaderElectionTo(cfg *kubeschedulerconfig.KubeSchedulerConfiguration) {
|
|
if o.Flags == nil {
|
|
return
|
|
}
|
|
// Obtain CLI args related with leaderelection. Set them to `cfg` if specified in command line.
|
|
leaderelection := o.Flags.FlagSet("leader election")
|
|
if leaderelection.Changed("leader-elect") {
|
|
cfg.LeaderElection.LeaderElect = o.LeaderElection.LeaderElect
|
|
}
|
|
if leaderelection.Changed("leader-elect-lease-duration") {
|
|
cfg.LeaderElection.LeaseDuration = o.LeaderElection.LeaseDuration
|
|
}
|
|
if leaderelection.Changed("leader-elect-renew-deadline") {
|
|
cfg.LeaderElection.RenewDeadline = o.LeaderElection.RenewDeadline
|
|
}
|
|
if leaderelection.Changed("leader-elect-retry-period") {
|
|
cfg.LeaderElection.RetryPeriod = o.LeaderElection.RetryPeriod
|
|
}
|
|
if leaderelection.Changed("leader-elect-resource-lock") {
|
|
cfg.LeaderElection.ResourceLock = o.LeaderElection.ResourceLock
|
|
}
|
|
if leaderelection.Changed("leader-elect-resource-name") {
|
|
cfg.LeaderElection.ResourceName = o.LeaderElection.ResourceName
|
|
}
|
|
if leaderelection.Changed("leader-elect-resource-namespace") {
|
|
cfg.LeaderElection.ResourceNamespace = o.LeaderElection.ResourceNamespace
|
|
}
|
|
|
|
o.ComponentConfig = cfg
|
|
}
|
|
|
|
// initFlags initializes flags by section name.
|
|
func (o *Options) initFlags() {
|
|
if o.Flags != nil {
|
|
return
|
|
}
|
|
|
|
nfs := cliflag.NamedFlagSets{}
|
|
fs := nfs.FlagSet("misc")
|
|
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file.")
|
|
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
|
|
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
|
|
|
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
|
|
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
|
|
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
|
|
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"))
|
|
options.BindLeaderElectionFlags(o.LeaderElection, nfs.FlagSet("leader election"))
|
|
utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
|
|
o.Metrics.AddFlags(nfs.FlagSet("metrics"))
|
|
logsapi.AddFlags(o.Logs, nfs.FlagSet("logs"))
|
|
|
|
o.Flags = &nfs
|
|
}
|
|
|
|
// ApplyTo applies the scheduler options to the given scheduler app configuration.
|
|
func (o *Options) ApplyTo(logger klog.Logger, c *schedulerappconfig.Config) error {
|
|
if len(o.ConfigFile) == 0 {
|
|
// If the --config arg is not specified, honor the deprecated as well as leader election CLI args.
|
|
o.ApplyDeprecated()
|
|
o.ApplyLeaderElectionTo(o.ComponentConfig)
|
|
c.ComponentConfig = *o.ComponentConfig
|
|
} else {
|
|
cfg, err := loadConfigFromFile(logger, o.ConfigFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// If the --config arg is specified, honor the leader election CLI args only.
|
|
o.ApplyLeaderElectionTo(cfg)
|
|
|
|
if err := validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
|
|
return err
|
|
}
|
|
|
|
c.ComponentConfig = *cfg
|
|
}
|
|
|
|
// Build kubeconfig first to so that if it fails, it doesn't cause leaking
|
|
// goroutines (started from initializing secure serving - which underneath
|
|
// creates a queue which in its constructor starts a goroutine).
|
|
kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.KubeConfig = kubeConfig
|
|
|
|
if err := o.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {
|
|
return err
|
|
}
|
|
if o.SecureServing != nil && (o.SecureServing.BindPort != 0 || o.SecureServing.Listener != nil) {
|
|
if err := o.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {
|
|
return err
|
|
}
|
|
if err := o.Authorization.ApplyTo(&c.Authorization); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
o.Metrics.Apply()
|
|
|
|
// Apply value independently instead of using ApplyDeprecated() because it can't be configured via ComponentConfig.
|
|
if o.Deprecated != nil {
|
|
c.PodMaxInUnschedulablePodsDuration = o.Deprecated.PodMaxInUnschedulablePodsDuration
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Validate validates all the required options.
|
|
func (o *Options) Validate() []error {
|
|
var errs []error
|
|
|
|
if err := validation.ValidateKubeSchedulerConfiguration(o.ComponentConfig); err != nil {
|
|
errs = append(errs, err.Errors()...)
|
|
}
|
|
errs = append(errs, o.SecureServing.Validate()...)
|
|
errs = append(errs, o.Authentication.Validate()...)
|
|
errs = append(errs, o.Authorization.Validate()...)
|
|
errs = append(errs, o.Metrics.Validate()...)
|
|
|
|
return errs
|
|
}
|
|
|
|
// Config return a scheduler config object
|
|
func (o *Options) Config(ctx context.Context) (*schedulerappconfig.Config, error) {
|
|
logger := klog.FromContext(ctx)
|
|
if o.SecureServing != nil {
|
|
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil {
|
|
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
|
|
}
|
|
}
|
|
|
|
c := &schedulerappconfig.Config{}
|
|
if err := o.ApplyTo(logger, c); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Prepare kube clients.
|
|
client, eventClient, err := createClients(c.KubeConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)
|
|
|
|
// Set up leader election if enabled.
|
|
var leaderElectionConfig *leaderelection.LeaderElectionConfig
|
|
if c.ComponentConfig.LeaderElection.LeaderElect {
|
|
// Use the scheduler name in the first profile to record leader election.
|
|
schedulerName := corev1.DefaultSchedulerName
|
|
if len(c.ComponentConfig.Profiles) != 0 {
|
|
schedulerName = c.ComponentConfig.Profiles[0].SchedulerName
|
|
}
|
|
coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName)
|
|
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, c.KubeConfig, coreRecorder)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
c.Client = client
|
|
c.InformerFactory = scheduler.NewInformerFactory(client, 0)
|
|
dynClient := dynamic.NewForConfigOrDie(c.KubeConfig)
|
|
c.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, corev1.NamespaceAll, nil)
|
|
c.LeaderElection = leaderElectionConfig
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// makeLeaderElectionConfig builds a leader election configuration. It will
|
|
// create a new resource lock associated with the configuration.
|
|
func makeLeaderElectionConfig(config componentbaseconfig.LeaderElectionConfiguration, kubeConfig *restclient.Config, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to get hostname: %v", err)
|
|
}
|
|
// add a uniquifier so that two processes on the same host don't accidentally both become active
|
|
id := hostname + "_" + string(uuid.NewUUID())
|
|
|
|
rl, err := resourcelock.NewFromKubeconfig(config.ResourceLock,
|
|
config.ResourceNamespace,
|
|
config.ResourceName,
|
|
resourcelock.ResourceLockConfig{
|
|
Identity: id,
|
|
EventRecorder: recorder,
|
|
},
|
|
kubeConfig,
|
|
config.RenewDeadline.Duration)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
|
|
}
|
|
|
|
return &leaderelection.LeaderElectionConfig{
|
|
Lock: rl,
|
|
LeaseDuration: config.LeaseDuration.Duration,
|
|
RenewDeadline: config.RenewDeadline.Duration,
|
|
RetryPeriod: config.RetryPeriod.Duration,
|
|
WatchDog: leaderelection.NewLeaderHealthzAdaptor(time.Second * 20),
|
|
Name: "kube-scheduler",
|
|
ReleaseOnCancel: true,
|
|
}, nil
|
|
}
|
|
|
|
// createKubeConfig creates a kubeConfig from the given config and masterOverride.
|
|
// TODO remove masterOverride when CLI flags are removed.
|
|
func createKubeConfig(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (*restclient.Config, error) {
|
|
kubeConfig, err := clientcmd.BuildConfigFromFlags(masterOverride, config.Kubeconfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
kubeConfig.DisableCompression = true
|
|
kubeConfig.AcceptContentTypes = config.AcceptContentTypes
|
|
kubeConfig.ContentType = config.ContentType
|
|
kubeConfig.QPS = config.QPS
|
|
kubeConfig.Burst = int(config.Burst)
|
|
|
|
return kubeConfig, nil
|
|
}
|
|
|
|
// createClients creates a kube client and an event client from the given kubeConfig
|
|
func createClients(kubeConfig *restclient.Config) (clientset.Interface, clientset.Interface, error) {
|
|
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler"))
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
eventClient, err := clientset.NewForConfig(kubeConfig)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return client, eventClient, nil
|
|
}
|