mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-17 15:13:08 +00:00
Refactor scheduler config API
Refactor the kube-scheduler configuration API, command setup, and server setup according to the guidelines established in #32215 and using the kube-proxy refactor (#34727) as a model of a well factored component adhering to said guidelines. * Config API: clarify meaning and use of algorithm source by replacing modality derived from bools and string emptiness checks with an explicit AlgorithmSource type hierarchy. * Config API: consolidate client connection config with common structs. * Config API: split and simplify healthz/metrics server configuration. * Config API: clarify leader election configuration. * Config API: improve defaulting. * CLI: deprecate all flags except `--config`. * CLI: port all flags to new config API. * CLI: refactor to match kube-proxy Cobra command style. * Server: refactor away configurator.go to clarify application wiring. * Server: refactor to more clearly separate wiring/setup from running. Fixes #52428.
This commit is contained in:
@@ -1,196 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
appsinformers "k8s.io/client-go/informers/apps/v1beta1"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
|
||||
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.CoreV1().RESTClient()).Events("")})
|
||||
return eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: s.SchedulerName})
|
||||
}
|
||||
|
||||
func createClients(s *options.SchedulerServer) (*clientset.Clientset, *clientset.Clientset, error) {
|
||||
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to build config from flags: %v", err)
|
||||
}
|
||||
|
||||
kubeconfig.ContentType = s.ContentType
|
||||
// Override kubeconfig qps/burst settings from flags
|
||||
kubeconfig.QPS = s.KubeAPIQPS
|
||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||
kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "scheduler"))
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid API configuration: %v", err)
|
||||
}
|
||||
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
|
||||
return kubeClient, leaderElectionClient, nil
|
||||
}
|
||||
|
||||
// CreateScheduler encapsulates the entire creation of a runnable scheduler.
|
||||
func CreateScheduler(
|
||||
s *options.SchedulerServer,
|
||||
kubecli clientset.Interface,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
replicationControllerInformer coreinformers.ReplicationControllerInformer,
|
||||
replicaSetInformer extensionsinformers.ReplicaSetInformer,
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
recorder record.EventRecorder,
|
||||
) (*scheduler.Scheduler, error) {
|
||||
configurator := factory.NewConfigFactory(
|
||||
s.SchedulerName,
|
||||
kubecli,
|
||||
nodeInformer,
|
||||
podInformer,
|
||||
pvInformer,
|
||||
pvcInformer,
|
||||
replicationControllerInformer,
|
||||
replicaSetInformer,
|
||||
statefulSetInformer,
|
||||
serviceInformer,
|
||||
pdbInformer,
|
||||
s.HardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
)
|
||||
|
||||
// Rebuild the configurator with a default Create(...) method.
|
||||
configurator = &schedulerConfigurator{
|
||||
configurator,
|
||||
s.PolicyConfigFile,
|
||||
s.AlgorithmProvider,
|
||||
s.PolicyConfigMapName,
|
||||
s.PolicyConfigMapNamespace,
|
||||
s.UseLegacyPolicyConfig,
|
||||
}
|
||||
|
||||
return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
||||
cfg.Recorder = recorder
|
||||
})
|
||||
}
|
||||
|
||||
// schedulerConfigurator is an interface wrapper that provides a way to create
|
||||
// a scheduler from a user provided config file or ConfigMap object.
|
||||
type schedulerConfigurator struct {
|
||||
scheduler.Configurator
|
||||
policyFile string
|
||||
algorithmProvider string
|
||||
policyConfigMap string
|
||||
policyConfigMapNamespace string
|
||||
useLegacyPolicyConfig bool
|
||||
}
|
||||
|
||||
// getSchedulerPolicyConfig finds and decodes scheduler's policy config. If no
|
||||
// such policy is found, it returns nil, nil.
|
||||
func (sc schedulerConfigurator) getSchedulerPolicyConfig() (*schedulerapi.Policy, error) {
|
||||
var configData []byte
|
||||
var policyConfigMapFound bool
|
||||
var policy schedulerapi.Policy
|
||||
|
||||
// If not in legacy mode, try to find policy ConfigMap.
|
||||
if !sc.useLegacyPolicyConfig && len(sc.policyConfigMap) != 0 {
|
||||
namespace := sc.policyConfigMapNamespace
|
||||
policyConfigMap, err := sc.GetClient().CoreV1().ConfigMaps(namespace).Get(sc.policyConfigMap, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error getting scheduler policy ConfigMap: %v.", err)
|
||||
}
|
||||
if policyConfigMap != nil {
|
||||
var configString string
|
||||
configString, policyConfigMapFound = policyConfigMap.Data[options.SchedulerPolicyConfigMapKey]
|
||||
if !policyConfigMapFound {
|
||||
return nil, fmt.Errorf("No element with key = '%v' is found in the ConfigMap 'Data'.", options.SchedulerPolicyConfigMapKey)
|
||||
}
|
||||
glog.V(5).Infof("Scheduler policy ConfigMap: %v", configString)
|
||||
configData = []byte(configString)
|
||||
}
|
||||
}
|
||||
|
||||
// If we are in legacy mode or ConfigMap name is empty, try to use policy
|
||||
// config file.
|
||||
if !policyConfigMapFound {
|
||||
if _, err := os.Stat(sc.policyFile); err != nil {
|
||||
// No config file is found.
|
||||
return nil, nil
|
||||
}
|
||||
var err error
|
||||
configData, err = ioutil.ReadFile(sc.policyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to read policy config: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
|
||||
return nil, fmt.Errorf("invalid configuration: %v", err)
|
||||
}
|
||||
return &policy, nil
|
||||
}
|
||||
|
||||
// Create implements the interface for the Configurator, hence it is exported
|
||||
// even though the struct is not.
|
||||
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
|
||||
policy, err := sc.getSchedulerPolicyConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// If no policy is found, create scheduler from algorithm provider.
|
||||
if policy == nil {
|
||||
if sc.Configurator != nil {
|
||||
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
|
||||
}
|
||||
return nil, fmt.Errorf("Configurator was nil")
|
||||
}
|
||||
|
||||
return sc.CreateFromConfig(*policy)
|
||||
}
|
@@ -1,31 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSchedulerConfiguratorFailure(t *testing.T) {
|
||||
sc := &schedulerConfigurator{
|
||||
// policyfile and algorithm are intentionally undefined.
|
||||
}
|
||||
_, err := sc.Create()
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error message when creating with incomplete configurator.")
|
||||
}
|
||||
}
|
@@ -1,97 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package options provides the scheduler flags
|
||||
package options
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
|
||||
// add the kubernetes feature gates
|
||||
_ "k8s.io/kubernetes/pkg/features"
|
||||
// install the componentconfig api so we get its defaulting and conversion functions
|
||||
_ "k8s.io/kubernetes/pkg/apis/componentconfig/install"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// SchedulerPolicyConfigMapKey defines the key of the element in the
|
||||
// scheduler's policy ConfigMap that contains scheduler's policy config.
|
||||
const SchedulerPolicyConfigMapKey string = "policy.cfg"
|
||||
|
||||
// SchedulerServer has all the context and params needed to run a Scheduler
|
||||
type SchedulerServer struct {
|
||||
componentconfig.KubeSchedulerConfiguration
|
||||
// Master is the address of the Kubernetes API server (overrides any
|
||||
// value in kubeconfig).
|
||||
Master string
|
||||
// Kubeconfig is Path to kubeconfig file with authorization and master
|
||||
// location information.
|
||||
Kubeconfig string
|
||||
// Dynamic configuration for scheduler features.
|
||||
}
|
||||
|
||||
// NewSchedulerServer creates a new SchedulerServer with default parameters
|
||||
func NewSchedulerServer() *SchedulerServer {
|
||||
versioned := &v1alpha1.KubeSchedulerConfiguration{}
|
||||
legacyscheme.Scheme.Default(versioned)
|
||||
cfg := componentconfig.KubeSchedulerConfiguration{}
|
||||
legacyscheme.Scheme.Convert(versioned, &cfg, nil)
|
||||
cfg.LeaderElection.LeaderElect = true
|
||||
s := SchedulerServer{
|
||||
KubeSchedulerConfiguration: cfg,
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet
|
||||
func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.Int32Var(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on")
|
||||
fs.StringVar(&s.Address, "address", s.Address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
||||
fs.StringVar(&s.AlgorithmProvider, "algorithm-provider", s.AlgorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders())
|
||||
fs.StringVar(&s.PolicyConfigFile, "policy-config-file", s.PolicyConfigFile, "File with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config==true")
|
||||
usage := fmt.Sprintf("Name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config==false. The config must be provided as the value of an element in 'Data' map with the key='%v'", SchedulerPolicyConfigMapKey)
|
||||
fs.StringVar(&s.PolicyConfigMapName, "policy-configmap", s.PolicyConfigMapName, usage)
|
||||
fs.StringVar(&s.PolicyConfigMapNamespace, "policy-configmap-namespace", s.PolicyConfigMapNamespace, "The namespace where policy ConfigMap is located. The system namespace will be used if this is not provided or is empty.")
|
||||
fs.BoolVar(&s.UseLegacyPolicyConfig, "use-legacy-policy-config", false, "When set to true, scheduler will ignore policy ConfigMap and uses policy config file")
|
||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled")
|
||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||
fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.")
|
||||
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
|
||||
fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's \"spec.SchedulerName\".")
|
||||
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object.")
|
||||
fs.StringVar(&s.LockObjectName, "lock-object-name", s.LockObjectName, "Define the name of the lock object.")
|
||||
fs.IntVar(&s.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", api.DefaultHardPodAffinitySymmetricWeight,
|
||||
"RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+
|
||||
"to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.")
|
||||
fs.MarkDeprecated("hard-pod-affinity-symmetric-weight", "This option was moved to the policy configuration file")
|
||||
fs.StringVar(&s.FailureDomains, "failure-domains", kubeletapis.DefaultFailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.")
|
||||
fs.MarkDeprecated("failure-domains", "Doesn't have any effect. Will be removed in future version.")
|
||||
leaderelectionconfig.BindFlags(&s.LeaderElection, fs)
|
||||
utilfeature.DefaultFeatureGate.AddFlag(fs)
|
||||
}
|
@@ -1,103 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package options
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
func TestAddFlags(t *testing.T) {
|
||||
f := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
|
||||
s := NewSchedulerServer()
|
||||
s.AddFlags(f)
|
||||
|
||||
args := []string{
|
||||
"--address=192.168.4.20",
|
||||
"--algorithm-provider=FooProvider",
|
||||
"--contention-profiling=true",
|
||||
"--failure-domains=kubernetes.io/hostname",
|
||||
"--hard-pod-affinity-symmetric-weight=0",
|
||||
"--kube-api-burst=80",
|
||||
"--kube-api-content-type=application/vnd.kubernetes.protobuf",
|
||||
"--kube-api-qps=40.0",
|
||||
"--kubeconfig=/foo/bar/kubeconfig",
|
||||
"--leader-elect=true",
|
||||
"--leader-elect-lease-duration=20s",
|
||||
"--leader-elect-renew-deadline=15s",
|
||||
"--leader-elect-resource-lock=endpoints",
|
||||
"--leader-elect-retry-period=3s",
|
||||
"--lock-object-name=test-lock-object-name",
|
||||
"--lock-object-namespace=test-lock-object-ns",
|
||||
"--master=192.168.4.20",
|
||||
"--policy-config-file=/foo/bar/policyconfig",
|
||||
"--policy-configmap=test-policy-configmap",
|
||||
"--policy-configmap-namespace=test-policy-configmap-ns",
|
||||
"--port=10000",
|
||||
"--profiling=false",
|
||||
"--scheduler-name=test-scheduler-name",
|
||||
"--use-legacy-policy-config=true",
|
||||
}
|
||||
|
||||
f.Parse(args)
|
||||
|
||||
expected := &SchedulerServer{
|
||||
KubeSchedulerConfiguration: componentconfig.KubeSchedulerConfiguration{
|
||||
Port: 10000,
|
||||
Address: "192.168.4.20",
|
||||
AlgorithmProvider: "FooProvider",
|
||||
PolicyConfigFile: "/foo/bar/policyconfig",
|
||||
EnableContentionProfiling: true,
|
||||
EnableProfiling: false,
|
||||
|
||||
ContentType: "application/vnd.kubernetes.protobuf",
|
||||
KubeAPIQPS: 40.0,
|
||||
KubeAPIBurst: 80,
|
||||
SchedulerName: "test-scheduler-name",
|
||||
LeaderElection: componentconfig.LeaderElectionConfiguration{
|
||||
ResourceLock: "endpoints",
|
||||
LeaderElect: true,
|
||||
LeaseDuration: metav1.Duration{Duration: 20 * time.Second},
|
||||
RenewDeadline: metav1.Duration{Duration: 15 * time.Second},
|
||||
RetryPeriod: metav1.Duration{Duration: 3 * time.Second},
|
||||
},
|
||||
|
||||
LockObjectNamespace: "test-lock-object-ns",
|
||||
LockObjectName: "test-lock-object-name",
|
||||
|
||||
PolicyConfigMapName: "test-policy-configmap",
|
||||
PolicyConfigMapNamespace: "test-policy-configmap-ns",
|
||||
UseLegacyPolicyConfig: true,
|
||||
|
||||
HardPodAffinitySymmetricWeight: 0,
|
||||
FailureDomains: "kubernetes.io/hostname",
|
||||
},
|
||||
Kubeconfig: "/foo/bar/kubeconfig",
|
||||
Master: "192.168.4.20",
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(expected, s) {
|
||||
t.Errorf("Got different run options than expected.\nDifference detected on:\n%s", diff.ObjectReflectDiff(expected, s))
|
||||
}
|
||||
}
|
@@ -18,37 +18,308 @@ limitations under the License.
|
||||
package app
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientgoclientset "k8s.io/client-go/kubernetes"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
"k8s.io/client-go/tools/leaderelection"
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
"k8s.io/kubernetes/pkg/version"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
"k8s.io/kubernetes/pkg/version/verflag"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
||||
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// SchedulerServer has all the context and params needed to run a Scheduler
|
||||
type Options struct {
|
||||
// ConfigFile is the location of the scheduler server's configuration file.
|
||||
ConfigFile string
|
||||
|
||||
// config is the scheduler server's configuration object.
|
||||
config *componentconfig.KubeSchedulerConfiguration
|
||||
|
||||
scheme *runtime.Scheme
|
||||
codecs serializer.CodecFactory
|
||||
|
||||
// The fields below here are placeholders for flags that can't be directly
|
||||
// mapped into componentconfig.KubeSchedulerConfiguration.
|
||||
//
|
||||
// TODO remove these fields once the deprecated flags are removed.
|
||||
|
||||
// master is the address of the Kubernetes API server (overrides any
|
||||
// value in kubeconfig).
|
||||
master string
|
||||
healthzAddress string
|
||||
healthzPort int32
|
||||
policyConfigFile string
|
||||
policyConfigMapName string
|
||||
policyConfigMapNamespace string
|
||||
useLegacyPolicyConfig bool
|
||||
algorithmProvider string
|
||||
}
|
||||
|
||||
// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet
|
||||
func AddFlags(options *Options, fs *pflag.FlagSet) {
|
||||
fs.StringVar(&options.ConfigFile, "config", options.ConfigFile, "The path to the configuration file.")
|
||||
|
||||
// All flags below here are deprecated and will eventually be removed.
|
||||
|
||||
fs.Int32Var(&options.healthzPort, "port", ports.SchedulerPort, "The port that the scheduler's http service runs on")
|
||||
fs.StringVar(&options.healthzAddress, "address", options.healthzAddress, "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
||||
fs.StringVar(&options.algorithmProvider, "algorithm-provider", options.algorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders())
|
||||
fs.StringVar(&options.policyConfigFile, "policy-config-file", options.policyConfigFile, "File with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config==true")
|
||||
usage := fmt.Sprintf("Name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config==false. The config must be provided as the value of an element in 'Data' map with the key='%v'", componentconfig.SchedulerPolicyConfigMapKey)
|
||||
fs.StringVar(&options.policyConfigMapName, "policy-configmap", options.policyConfigMapName, usage)
|
||||
fs.StringVar(&options.policyConfigMapNamespace, "policy-configmap-namespace", options.policyConfigMapNamespace, "The namespace where policy ConfigMap is located. The system namespace will be used if this is not provided or is empty.")
|
||||
fs.BoolVar(&options.useLegacyPolicyConfig, "use-legacy-policy-config", false, "When set to true, scheduler will ignore policy ConfigMap and uses policy config file")
|
||||
fs.BoolVar(&options.config.EnableProfiling, "profiling", options.config.EnableProfiling, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.BoolVar(&options.config.EnableContentionProfiling, "contention-profiling", options.config.EnableContentionProfiling, "Enable lock contention profiling, if profiling is enabled")
|
||||
fs.StringVar(&options.master, "master", options.master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&options.config.ClientConnection.KubeConfigFile, "kubeconfig", options.config.ClientConnection.KubeConfigFile, "Path to kubeconfig file with authorization and master location information.")
|
||||
fs.StringVar(&options.config.ClientConnection.ContentType, "kube-api-content-type", options.config.ClientConnection.ContentType, "Content type of requests sent to apiserver.")
|
||||
fs.Float32Var(&options.config.ClientConnection.QPS, "kube-api-qps", options.config.ClientConnection.QPS, "QPS to use while talking with kubernetes apiserver")
|
||||
fs.IntVar(&options.config.ClientConnection.Burst, "kube-api-burst", options.config.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.StringVar(&options.config.SchedulerName, "scheduler-name", options.config.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's \"spec.SchedulerName\".")
|
||||
fs.StringVar(&options.config.LeaderElection.LockObjectNamespace, "lock-object-namespace", options.config.LeaderElection.LockObjectNamespace, "Define the namespace of the lock object.")
|
||||
fs.StringVar(&options.config.LeaderElection.LockObjectName, "lock-object-name", options.config.LeaderElection.LockObjectName, "Define the name of the lock object.")
|
||||
fs.IntVar(&options.config.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", options.config.HardPodAffinitySymmetricWeight,
|
||||
"RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+
|
||||
"to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.")
|
||||
fs.MarkDeprecated("hard-pod-affinity-symmetric-weight", "This option was moved to the policy configuration file")
|
||||
fs.StringVar(&options.config.FailureDomains, "failure-domains", options.config.FailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.")
|
||||
fs.MarkDeprecated("failure-domains", "Doesn't have any effect. Will be removed in future version.")
|
||||
leaderelectionconfig.BindFlags(&options.config.LeaderElection.LeaderElectionConfiguration, fs)
|
||||
utilfeature.DefaultFeatureGate.AddFlag(fs)
|
||||
}
|
||||
|
||||
func NewOptions() (*Options, error) {
|
||||
o := &Options{
|
||||
config: new(componentconfig.KubeSchedulerConfiguration),
|
||||
}
|
||||
|
||||
o.scheme = runtime.NewScheme()
|
||||
o.codecs = serializer.NewCodecFactory(o.scheme)
|
||||
|
||||
if err := componentconfig.AddToScheme(o.scheme); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := componentconfigv1alpha1.AddToScheme(o.scheme); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (o *Options) Complete() error {
|
||||
if len(o.ConfigFile) == 0 {
|
||||
glog.Warning("WARNING: all flags than --config are deprecated. Please begin using a config file ASAP.")
|
||||
o.applyDeprecatedHealthzAddressToConfig()
|
||||
o.applyDeprecatedHealthzPortToConfig()
|
||||
o.applyDeprecatedAlgorithmSourceOptionsToConfig()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyDeprecatedHealthzAddressToConfig sets o.config.HealthzBindAddress and
|
||||
// o.config.MetricsBindAddress from flags passed on the command line based on
|
||||
// the following rules:
|
||||
//
|
||||
// 1. If --address is empty, leave the config as-is.
|
||||
// 2. Otherwise, use the value of --address for the address portion of
|
||||
// o.config.HealthzBindAddress
|
||||
func (o *Options) applyDeprecatedHealthzAddressToConfig() {
|
||||
if len(o.healthzAddress) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
_, port, err := net.SplitHostPort(o.config.HealthzBindAddress)
|
||||
if err != nil {
|
||||
glog.Fatalf("invalid healthz bind address %q: %v", o.config.HealthzBindAddress, err)
|
||||
}
|
||||
o.config.HealthzBindAddress = net.JoinHostPort(o.healthzAddress, port)
|
||||
o.config.MetricsBindAddress = net.JoinHostPort(o.healthzAddress, port)
|
||||
}
|
||||
|
||||
// applyDeprecatedHealthzPortToConfig sets o.config.HealthzBindAddress and
|
||||
// o.config.MetricsBindAddress from flags passed on the command line based on
|
||||
// the following rules:
|
||||
//
|
||||
// 1. If --port is -1, disable the healthz server.
|
||||
// 2. Otherwise, use the value of --port for the port portion of
|
||||
// o.config.HealthzBindAddress
|
||||
func (o *Options) applyDeprecatedHealthzPortToConfig() {
|
||||
if o.healthzPort == -1 {
|
||||
o.config.HealthzBindAddress = ""
|
||||
return
|
||||
}
|
||||
|
||||
host, _, err := net.SplitHostPort(o.config.HealthzBindAddress)
|
||||
if err != nil {
|
||||
glog.Fatalf("invalid healthz bind address %q: %v", o.config.HealthzBindAddress, err)
|
||||
}
|
||||
o.config.HealthzBindAddress = net.JoinHostPort(host, strconv.Itoa(int(o.healthzPort)))
|
||||
o.config.MetricsBindAddress = net.JoinHostPort(host, strconv.Itoa(int(o.healthzPort)))
|
||||
}
|
||||
|
||||
// applyDeprecatedAlgorithmSourceOptionsToConfig sets o.config.AlgorithmSource from
|
||||
// flags passed on the command line in the following precedence order:
|
||||
//
|
||||
// 1. --use-legacy-policy-config to use a policy file.
|
||||
// 2. --policy-configmap to use a policy config map value.
|
||||
// 3. --algorithm-provider to use a named algorithm provider.
|
||||
func (o *Options) applyDeprecatedAlgorithmSourceOptionsToConfig() {
|
||||
switch {
|
||||
case o.useLegacyPolicyConfig:
|
||||
o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
|
||||
Policy: &componentconfig.SchedulerPolicySource{
|
||||
File: &componentconfig.SchedulerPolicyFileSource{
|
||||
Path: o.policyConfigFile,
|
||||
},
|
||||
},
|
||||
}
|
||||
case len(o.policyConfigMapName) > 0:
|
||||
o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
|
||||
Policy: &componentconfig.SchedulerPolicySource{
|
||||
ConfigMap: &componentconfig.SchedulerPolicyConfigMapSource{
|
||||
Name: o.policyConfigMapName,
|
||||
Namespace: o.policyConfigMapNamespace,
|
||||
},
|
||||
},
|
||||
}
|
||||
case len(o.algorithmProvider) > 0:
|
||||
o.config.AlgorithmSource = componentconfig.SchedulerAlgorithmSource{
|
||||
Provider: &o.algorithmProvider,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates all the required options.
|
||||
func (o *Options) Validate(args []string) error {
|
||||
if len(args) != 0 {
|
||||
return errors.New("no arguments are supported")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadConfigFromFile loads the contents of file and decodes it as a
|
||||
// KubeSchedulerConfiguration object.
|
||||
func (o *Options) loadConfigFromFile(file string) (*componentconfig.KubeSchedulerConfiguration, error) {
|
||||
data, err := ioutil.ReadFile(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return o.loadConfig(data)
|
||||
}
|
||||
|
||||
// loadConfig decodes data as a KubeProxyConfiguration object.
|
||||
func (o *Options) loadConfig(data []byte) (*componentconfig.KubeSchedulerConfiguration, error) {
|
||||
configObj, gvk, err := o.codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config, ok := configObj.(*componentconfig.KubeSchedulerConfiguration)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
}
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (o *Options) ApplyDefaults(in *componentconfig.KubeSchedulerConfiguration) (*componentconfig.KubeSchedulerConfiguration, error) {
|
||||
external, err := o.scheme.ConvertToVersion(in, componentconfigv1alpha1.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
o.scheme.Default(external)
|
||||
|
||||
internal, err := o.scheme.ConvertToVersion(external, componentconfig.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := internal.(*componentconfig.KubeSchedulerConfiguration)
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (o *Options) Run() error {
|
||||
config := o.config
|
||||
|
||||
if len(o.ConfigFile) > 0 {
|
||||
if c, err := o.loadConfigFromFile(o.ConfigFile); err != nil {
|
||||
return err
|
||||
} else {
|
||||
config = c
|
||||
}
|
||||
}
|
||||
|
||||
// Apply algorithms based on feature gates.
|
||||
// TODO: make configurable?
|
||||
algorithmprovider.ApplyFeatureGates()
|
||||
|
||||
server, err := NewSchedulerServer(config, o.master)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
return server.Run(stop)
|
||||
}
|
||||
|
||||
// NewSchedulerCommand creates a *cobra.Command object with default parameters
|
||||
func NewSchedulerCommand() *cobra.Command {
|
||||
s := options.NewSchedulerServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
opts, err := NewOptions()
|
||||
if err != nil {
|
||||
glog.Fatalf("unable to initialize command options: %v", err)
|
||||
}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "kube-scheduler",
|
||||
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
|
||||
@@ -59,133 +330,373 @@ constraints, affinity and anti-affinity specifications, data locality, inter-wor
|
||||
interference, deadlines, and so on. Workload-specific requirements will be exposed
|
||||
through the API as necessary.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
verflag.PrintAndExitIfRequested()
|
||||
cmdutil.CheckErr(opts.Complete())
|
||||
cmdutil.CheckErr(opts.Validate(args))
|
||||
cmdutil.CheckErr(opts.Run())
|
||||
},
|
||||
}
|
||||
|
||||
opts.config, err = opts.ApplyDefaults(opts.config)
|
||||
if err != nil {
|
||||
glog.Fatalf("unable to apply config defaults: %v", err)
|
||||
}
|
||||
|
||||
flags := cmd.Flags()
|
||||
AddFlags(opts, flags)
|
||||
|
||||
cmd.MarkFlagFilename("config", "yaml", "yml", "json")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// Run runs the specified SchedulerServer. This should never exit.
|
||||
func Run(s *options.SchedulerServer) error {
|
||||
// SchedulerServer represents all the parameters required to start the
|
||||
// Kubernetes scheduler server.
|
||||
type SchedulerServer struct {
|
||||
SchedulerName string
|
||||
Client clientset.Interface
|
||||
InformerFactory informers.SharedInformerFactory
|
||||
PodInformer coreinformers.PodInformer
|
||||
AlgorithmSource componentconfig.SchedulerAlgorithmSource
|
||||
HardPodAffinitySymmetricWeight int
|
||||
EventClient v1core.EventsGetter
|
||||
Recorder record.EventRecorder
|
||||
Broadcaster record.EventBroadcaster
|
||||
// LeaderElection is optional.
|
||||
LeaderElection *leaderelection.LeaderElectionConfig
|
||||
// HealthzServer is optional.
|
||||
HealthzServer *http.Server
|
||||
// MetricsServer is optional.
|
||||
MetricsServer *http.Server
|
||||
}
|
||||
|
||||
// NewSchedulerServer creates a runnable SchedulerServer from configuration.
|
||||
func NewSchedulerServer(config *componentconfig.KubeSchedulerConfiguration, master string) (*SchedulerServer, error) {
|
||||
if config == nil {
|
||||
return nil, errors.New("config is required")
|
||||
}
|
||||
|
||||
// Configz registration.
|
||||
if c, err := configz.New("componentconfig"); err == nil {
|
||||
c.Set(config)
|
||||
} else {
|
||||
return nil, fmt.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
|
||||
// Prepare some Kube clients.
|
||||
client, leaderElectionClient, eventClient, err := createClients(config.ClientConnection, master)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Prepare event clients.
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: config.SchedulerName})
|
||||
|
||||
// Set up leader election if enabled.
|
||||
var leaderElectionConfig *leaderelection.LeaderElectionConfig
|
||||
if config.LeaderElection.LeaderElect {
|
||||
leaderElectionConfig, err = makeLeaderElectionConfig(config.LeaderElection, leaderElectionClient, recorder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare a healthz server. If the metrics bind address is the same as the
|
||||
// healthz bind address, consolidate the servers into one.
|
||||
var healthzServer *http.Server
|
||||
if len(config.HealthzBindAddress) != 0 {
|
||||
healthzServer = makeHealthzServer(config)
|
||||
}
|
||||
|
||||
// Prepare a separate metrics server only if the bind address differs from the
|
||||
// healthz bind address.
|
||||
var metricsServer *http.Server
|
||||
if len(config.MetricsBindAddress) > 0 && config.HealthzBindAddress != config.MetricsBindAddress {
|
||||
metricsServer = makeMetricsServer(config)
|
||||
}
|
||||
|
||||
return &SchedulerServer{
|
||||
SchedulerName: config.SchedulerName,
|
||||
Client: client,
|
||||
InformerFactory: informers.NewSharedInformerFactory(client, 0),
|
||||
PodInformer: factory.NewPodInformer(client, 0, config.SchedulerName),
|
||||
AlgorithmSource: config.AlgorithmSource,
|
||||
HardPodAffinitySymmetricWeight: config.HardPodAffinitySymmetricWeight,
|
||||
EventClient: eventClient,
|
||||
Recorder: recorder,
|
||||
Broadcaster: eventBroadcaster,
|
||||
LeaderElection: leaderElectionConfig,
|
||||
HealthzServer: healthzServer,
|
||||
MetricsServer: metricsServer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// makeLeaderElectionConfig builds a leader election configuration. It will
|
||||
// create a new resource lock associated with the configuration.
|
||||
func makeLeaderElectionConfig(config componentconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get hostname: %v", err)
|
||||
}
|
||||
|
||||
rl, err := resourcelock.New(config.ResourceLock,
|
||||
config.LockObjectNamespace,
|
||||
config.LockObjectName,
|
||||
client.CoreV1(),
|
||||
resourcelock.ResourceLockConfig{
|
||||
Identity: hostname,
|
||||
EventRecorder: recorder,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
|
||||
}
|
||||
|
||||
return &leaderelection.LeaderElectionConfig{
|
||||
Lock: rl,
|
||||
LeaseDuration: config.LeaseDuration.Duration,
|
||||
RenewDeadline: config.RenewDeadline.Duration,
|
||||
RetryPeriod: config.RetryPeriod.Duration,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// makeHealthzServer creates a healthz server from the config, and will also
|
||||
// embed the metrics handler if the healthz and metrics address configurations
|
||||
// are the same.
|
||||
func makeHealthzServer(config *componentconfig.KubeSchedulerConfiguration) *http.Server {
|
||||
mux := http.NewServeMux()
|
||||
healthz.InstallHandler(mux)
|
||||
if config.HealthzBindAddress == config.MetricsBindAddress {
|
||||
configz.InstallHandler(mux)
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
}
|
||||
if config.EnableProfiling {
|
||||
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
if config.EnableContentionProfiling {
|
||||
goruntime.SetBlockProfileRate(1)
|
||||
}
|
||||
}
|
||||
return &http.Server{
|
||||
Addr: config.HealthzBindAddress,
|
||||
Handler: mux,
|
||||
}
|
||||
}
|
||||
|
||||
// makeMetricsServer builds a metrics server from the config.
|
||||
func makeMetricsServer(config *componentconfig.KubeSchedulerConfiguration) *http.Server {
|
||||
mux := http.NewServeMux()
|
||||
configz.InstallHandler(mux)
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
if config.EnableProfiling {
|
||||
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
if config.EnableContentionProfiling {
|
||||
goruntime.SetBlockProfileRate(1)
|
||||
}
|
||||
}
|
||||
return &http.Server{
|
||||
Addr: config.MetricsBindAddress,
|
||||
Handler: mux,
|
||||
}
|
||||
}
|
||||
|
||||
// createClients creates a kube client and an event client from the given config and masterOverride.
|
||||
// TODO remove masterOverride when CLI flags are removed.
|
||||
func createClients(config componentconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) {
|
||||
if len(config.KubeConfigFile) == 0 && len(masterOverride) == 0 {
|
||||
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
|
||||
}
|
||||
|
||||
// This creates a client, first loading any specified kubeconfig
|
||||
// file, and then overriding the Master flag, if non-empty.
|
||||
kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
|
||||
&clientcmd.ClientConfigLoadingRules{ExplicitPath: config.KubeConfigFile},
|
||||
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterOverride}}).ClientConfig()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
kubeConfig.AcceptContentTypes = config.AcceptContentTypes
|
||||
kubeConfig.ContentType = config.ContentType
|
||||
kubeConfig.QPS = config.QPS
|
||||
//TODO make config struct use int instead of int32?
|
||||
kubeConfig.Burst = int(config.Burst)
|
||||
|
||||
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler"))
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election"))
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
eventClient, err := clientgoclientset.NewForConfig(kubeConfig)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return client, leaderElectionClient, eventClient.CoreV1(), nil
|
||||
}
|
||||
|
||||
// Run runs the SchedulerServer. This should never exit.
|
||||
func (s *SchedulerServer) Run(stop chan struct{}) error {
|
||||
// To help debugging, immediately log version
|
||||
glog.Infof("Version: %+v", version.Get())
|
||||
|
||||
kubeClient, leaderElectionClient, err := createClients(s)
|
||||
// Build a scheduler config from the provided algorithm source.
|
||||
schedulerConfig, err := s.SchedulerConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create kube client: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
recorder := createRecorder(kubeClient, s)
|
||||
// Create the scheduler.
|
||||
sched := scheduler.NewFromConfig(schedulerConfig)
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
||||
// cache only non-terminal pods
|
||||
|
||||
podInformer := factory.NewPodInformer(kubeClient, 0, s.SchedulerName)
|
||||
// Apply algorithms based on feature gates.
|
||||
algorithmprovider.ApplyFeatureGates()
|
||||
|
||||
sched, err := CreateScheduler(
|
||||
s,
|
||||
kubeClient,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
podInformer,
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
recorder,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating scheduler: %v", err)
|
||||
// Prepare the event broadcaster.
|
||||
if !reflect.ValueOf(s.Broadcaster).IsNil() && !reflect.ValueOf(s.EventClient).IsNil() {
|
||||
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
|
||||
}
|
||||
|
||||
if s.Port != -1 {
|
||||
go startHTTP(s)
|
||||
// Start up the healthz server.
|
||||
if s.HealthzServer != nil {
|
||||
go wait.Until(func() {
|
||||
glog.Infof("starting healthz server on %v", s.HealthzServer.Addr)
|
||||
err := s.HealthzServer.ListenAndServe()
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to start healthz server: %v", err))
|
||||
}
|
||||
}, 5*time.Second, stop)
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
go podInformer.Informer().Run(stop)
|
||||
informerFactory.Start(stop)
|
||||
// Waiting for all cache to sync before scheduling.
|
||||
informerFactory.WaitForCacheSync(stop)
|
||||
controller.WaitForCacheSync("scheduler", stop, podInformer.Informer().HasSynced)
|
||||
// Start up the metrics server.
|
||||
if s.MetricsServer != nil {
|
||||
go wait.Until(func() {
|
||||
glog.Infof("starting metrics server on %v", s.MetricsServer.Addr)
|
||||
err := s.MetricsServer.ListenAndServe()
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to start metrics server: %v", err))
|
||||
}
|
||||
}, 5*time.Second, stop)
|
||||
}
|
||||
|
||||
// Start all informers.
|
||||
go s.PodInformer.Informer().Run(stop)
|
||||
s.InformerFactory.Start(stop)
|
||||
|
||||
// Wait for all caches to sync before scheduling.
|
||||
s.InformerFactory.WaitForCacheSync(stop)
|
||||
controller.WaitForCacheSync("scheduler", stop, s.PodInformer.Informer().HasSynced)
|
||||
|
||||
// Prepare a reusable run function.
|
||||
run := func(stopCh <-chan struct{}) {
|
||||
sched.Run()
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
if !s.LeaderElection.LeaderElect {
|
||||
run(stop)
|
||||
return fmt.Errorf("finished without leader elect")
|
||||
}
|
||||
|
||||
id, err := os.Hostname()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get hostname: %v", err)
|
||||
}
|
||||
|
||||
rl, err := resourcelock.New(s.LeaderElection.ResourceLock,
|
||||
s.LockObjectNamespace,
|
||||
s.LockObjectName,
|
||||
leaderElectionClient.CoreV1(),
|
||||
resourcelock.ResourceLockConfig{
|
||||
Identity: id,
|
||||
EventRecorder: recorder,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating lock: %v", err)
|
||||
}
|
||||
|
||||
leaderElector, err := leaderelection.NewLeaderElector(
|
||||
leaderelection.LeaderElectionConfig{
|
||||
Lock: rl,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: run,
|
||||
OnStoppedLeading: func() {
|
||||
utilruntime.HandleError(fmt.Errorf("lost master"))
|
||||
},
|
||||
// If leader election is enabled, run via LeaderElector until done and exit.
|
||||
if s.LeaderElection != nil {
|
||||
s.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: run,
|
||||
OnStoppedLeading: func() {
|
||||
utilruntime.HandleError(fmt.Errorf("lost master"))
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
leaderElector.Run()
|
||||
|
||||
return fmt.Errorf("lost lease")
|
||||
}
|
||||
|
||||
func startHTTP(s *options.SchedulerServer) {
|
||||
mux := http.NewServeMux()
|
||||
healthz.InstallHandler(mux)
|
||||
if s.EnableProfiling {
|
||||
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
if s.EnableContentionProfiling {
|
||||
goruntime.SetBlockProfileRate(1)
|
||||
}
|
||||
}
|
||||
if c, err := configz.New("componentconfig"); err == nil {
|
||||
c.Set(s.KubeSchedulerConfiguration)
|
||||
} else {
|
||||
glog.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
configz.InstallHandler(mux)
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
leaderElector, err := leaderelection.NewLeaderElector(*s.LeaderElection)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create leader elector: %v", err)
|
||||
}
|
||||
|
||||
server := &http.Server{
|
||||
Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
|
||||
Handler: mux,
|
||||
leaderElector.Run()
|
||||
|
||||
return fmt.Errorf("lost lease")
|
||||
}
|
||||
glog.Fatal(server.ListenAndServe())
|
||||
|
||||
// Leader election is disabled, so run inline until done.
|
||||
run(stop)
|
||||
return fmt.Errorf("finished without leader elect")
|
||||
}
|
||||
|
||||
// SchedulerConfig creates the scheduler configuration. This is exposed for use
|
||||
// by tests.
|
||||
func (s *SchedulerServer) SchedulerConfig() (*scheduler.Config, error) {
|
||||
// Set up the configurator which can create schedulers from configs.
|
||||
configurator := factory.NewConfigFactory(
|
||||
s.SchedulerName,
|
||||
s.Client,
|
||||
s.InformerFactory.Core().V1().Nodes(),
|
||||
s.PodInformer,
|
||||
s.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
s.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
s.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
s.InformerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||
s.InformerFactory.Apps().V1beta1().StatefulSets(),
|
||||
s.InformerFactory.Core().V1().Services(),
|
||||
s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
s.HardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
)
|
||||
|
||||
source := s.AlgorithmSource
|
||||
var config *scheduler.Config
|
||||
switch {
|
||||
case source.Provider != nil:
|
||||
// Create the config from a named algorithm provider.
|
||||
sc, err := configurator.CreateFromProvider(*source.Provider)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
|
||||
}
|
||||
config = sc
|
||||
case source.Policy != nil:
|
||||
// Create the config from a user specified policy source.
|
||||
policy := &schedulerapi.Policy{}
|
||||
switch {
|
||||
case source.Policy.File != nil:
|
||||
// Use a policy serialized in a file.
|
||||
policyFile := source.Policy.File.Path
|
||||
_, err := os.Stat(policyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("missing policy config file %s", policyFile)
|
||||
}
|
||||
data, err := ioutil.ReadFile(policyFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't read policy config: %v", err)
|
||||
}
|
||||
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid policy: %v", err)
|
||||
}
|
||||
case source.Policy.ConfigMap != nil:
|
||||
// Use a policy serialized in a config map value.
|
||||
policyRef := source.Policy.ConfigMap
|
||||
policyConfigMap, err := s.Client.CoreV1().ConfigMaps(policyRef.Namespace).Get(policyRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get policy config map %s/%s: %v", policyRef.Namespace, policyRef.Name, err)
|
||||
}
|
||||
data, found := policyConfigMap.Data[componentconfig.SchedulerPolicyConfigMapKey]
|
||||
if !found {
|
||||
return nil, fmt.Errorf("missing policy config map value at key %q", componentconfig.SchedulerPolicyConfigMapKey)
|
||||
}
|
||||
err = runtime.DecodeInto(latestschedulerapi.Codec, []byte(data), policy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid policy: %v", err)
|
||||
}
|
||||
}
|
||||
sc, err := configurator.CreateFromConfig(*policy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
|
||||
}
|
||||
config = sc
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
|
||||
}
|
||||
// Additional tweaks to the config produced by the configurator.
|
||||
config.Recorder = s.Recorder
|
||||
return config, nil
|
||||
}
|
||||
|
@@ -17,27 +17,31 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"k8s.io/apiserver/pkg/util/flag"
|
||||
"k8s.io/apiserver/pkg/util/logs"
|
||||
"k8s.io/kubernetes/pkg/version/verflag"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app"
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||
goflag "flag"
|
||||
"os"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
utilflag "k8s.io/apiserver/pkg/util/flag"
|
||||
"k8s.io/apiserver/pkg/util/logs"
|
||||
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
|
||||
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
|
||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app"
|
||||
)
|
||||
|
||||
func main() {
|
||||
s := options.NewSchedulerServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
command := app.NewSchedulerCommand()
|
||||
|
||||
flag.InitFlags()
|
||||
// TODO: once we switch everything over to Cobra commands, we can go back to calling
|
||||
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
|
||||
// normalize func and add the go flag set by hand.
|
||||
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
|
||||
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
|
||||
// utilflag.InitFlags()
|
||||
logs.InitLogs()
|
||||
defer logs.FlushLogs()
|
||||
|
||||
verflag.PrintAndExitIfRequested()
|
||||
|
||||
if err := app.Run(s); err != nil {
|
||||
glog.Fatalf("scheduler app failed to run: %v", err)
|
||||
if err := command.Execute(); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
@@ -151,6 +151,14 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// NewFromConfig returns a new scheduler using the provided Config.
|
||||
func NewFromConfig(config *Config) *Scheduler {
|
||||
metrics.Register()
|
||||
return &Scheduler{
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
|
||||
func (sched *Scheduler) Run() {
|
||||
if !sched.config.WaitForCacheSync() {
|
||||
|
Reference in New Issue
Block a user