mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Merge pull request #40696 from jayunit100/sched_server_cleanup
Automatic merge from submit-queue (batch tested with PRs 40696, 39914, 40374) Cleanup scheduler server with an external config class **What this PR does / why we need it**: Some cleanup in cmd/server so that the parts which setup scheduler configuration are stored and separately tested. - additionally a simple unit test to check that erroneous configs return a non-nil error is included. - it also will make sure we avoid nil panics of schedulerConfiguration is misconfigured.
This commit is contained in:
commit
f191d8df2e
@ -5,11 +5,15 @@ licenses(["notice"])
|
|||||||
load(
|
load(
|
||||||
"@io_bazel_rules_go//go:def.bzl",
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
"go_library",
|
"go_library",
|
||||||
|
"go_test",
|
||||||
)
|
)
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["server.go"],
|
srcs = [
|
||||||
|
"configurator.go",
|
||||||
|
"server.go",
|
||||||
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
@ -53,3 +57,10 @@ filegroup(
|
|||||||
],
|
],
|
||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["configurator_test.go"],
|
||||||
|
library = ":go_default_library",
|
||||||
|
tags = ["automanaged"],
|
||||||
|
)
|
||||||
|
113
plugin/cmd/kube-scheduler/app/configurator.go
Normal file
113
plugin/cmd/kube-scheduler/app/configurator.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
|
|
||||||
|
restclient "k8s.io/client-go/rest"
|
||||||
|
"k8s.io/client-go/tools/clientcmd"
|
||||||
|
"k8s.io/client-go/tools/record"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
|
|
||||||
|
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
||||||
|
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||||
|
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||||
|
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
||||||
|
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder {
|
||||||
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events("")})
|
||||||
|
return eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: s.SchedulerName})
|
||||||
|
}
|
||||||
|
|
||||||
|
func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
|
||||||
|
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to build config from flags: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
kubeconfig.ContentType = s.ContentType
|
||||||
|
// Override kubeconfig qps/burst settings from flags
|
||||||
|
kubeconfig.QPS = s.KubeAPIQPS
|
||||||
|
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||||
|
|
||||||
|
cli, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid API configuration: %v", err)
|
||||||
|
}
|
||||||
|
return cli, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createScheduler encapsulates the entire creation of a runnable scheduler.
|
||||||
|
func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
|
||||||
|
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
|
||||||
|
|
||||||
|
// Rebuild the configurator with a default Create(...) method.
|
||||||
|
configurator = &schedulerConfigurator{
|
||||||
|
configurator,
|
||||||
|
s.PolicyConfigFile,
|
||||||
|
s.AlgorithmProvider}
|
||||||
|
|
||||||
|
return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
||||||
|
cfg.Recorder = recorder
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user
|
||||||
|
// provided config file.
|
||||||
|
type schedulerConfigurator struct {
|
||||||
|
scheduler.Configurator
|
||||||
|
policyFile string
|
||||||
|
algorithmProvider string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create implements the interface for the Configurator, hence it is exported even through the struct is not.
|
||||||
|
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
|
||||||
|
if _, err := os.Stat(sc.policyFile); err != nil {
|
||||||
|
if sc.Configurator != nil {
|
||||||
|
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Configurator was nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// policy file is valid, try to create a configuration from it.
|
||||||
|
var policy schedulerapi.Policy
|
||||||
|
configData, err := ioutil.ReadFile(sc.policyFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to read policy config: %v", err)
|
||||||
|
}
|
||||||
|
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid configuration: %v", err)
|
||||||
|
}
|
||||||
|
return sc.CreateFromConfig(policy)
|
||||||
|
}
|
31
plugin/cmd/kube-scheduler/app/configurator_test.go
Normal file
31
plugin/cmd/kube-scheduler/app/configurator_test.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSchedulerConfiguratorFailure(t *testing.T) {
|
||||||
|
sc := &schedulerConfigurator{
|
||||||
|
// policyfile and algorithm are intentionally undefined.
|
||||||
|
}
|
||||||
|
_, error := sc.Create()
|
||||||
|
if error == nil {
|
||||||
|
t.Fatalf("Expected error message when creating with incomplete configurator.")
|
||||||
|
}
|
||||||
|
}
|
@ -19,7 +19,6 @@ package app
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/pprof"
|
"net/http/pprof"
|
||||||
@ -28,24 +27,13 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apiserver/pkg/server/healthz"
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
|
||||||
restclient "k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
|
||||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||||
"k8s.io/kubernetes/pkg/util/configz"
|
"k8s.io/kubernetes/pkg/util/configz"
|
||||||
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler"
|
|
||||||
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
|
||||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
|
||||||
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
|
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -124,13 +112,6 @@ func Run(s *options.SchedulerServer) error {
|
|||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
func createRecorder(kubecli *clientset.Clientset, s *options.SchedulerServer) record.EventRecorder {
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubecli.Core().RESTClient()).Events("")})
|
|
||||||
return eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: s.SchedulerName})
|
|
||||||
}
|
|
||||||
|
|
||||||
func startHTTP(s *options.SchedulerServer) {
|
func startHTTP(s *options.SchedulerServer) {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
healthz.InstallHandler(mux)
|
healthz.InstallHandler(mux)
|
||||||
@ -156,61 +137,3 @@ func startHTTP(s *options.SchedulerServer) {
|
|||||||
}
|
}
|
||||||
glog.Fatal(server.ListenAndServe())
|
glog.Fatal(server.ListenAndServe())
|
||||||
}
|
}
|
||||||
|
|
||||||
func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
|
|
||||||
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("unable to build config from flags: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
kubeconfig.ContentType = s.ContentType
|
|
||||||
// Override kubeconfig qps/burst settings from flags
|
|
||||||
kubeconfig.QPS = s.KubeAPIQPS
|
|
||||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
|
||||||
|
|
||||||
cli, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "leader-election"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid API configuration: %v", err)
|
|
||||||
}
|
|
||||||
return cli, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// schedulerConfigurator is an interface wrapper that provides default Configuration creation based on user
|
|
||||||
// provided config file.
|
|
||||||
type schedulerConfigurator struct {
|
|
||||||
scheduler.Configurator
|
|
||||||
policyFile string
|
|
||||||
algorithmProvider string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
|
|
||||||
if _, err := os.Stat(sc.policyFile); err != nil {
|
|
||||||
return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
|
|
||||||
}
|
|
||||||
|
|
||||||
// policy file is valid, try to create a configuration from it.
|
|
||||||
var policy schedulerapi.Policy
|
|
||||||
configData, err := ioutil.ReadFile(sc.policyFile)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("unable to read policy config: %v", err)
|
|
||||||
}
|
|
||||||
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid configuration: %v", err)
|
|
||||||
}
|
|
||||||
return sc.CreateFromConfig(policy)
|
|
||||||
}
|
|
||||||
|
|
||||||
// createScheduler encapsulates the entire creation of a runnable scheduler.
|
|
||||||
func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
|
|
||||||
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight, s.FailureDomains)
|
|
||||||
|
|
||||||
// Rebuild the configurator with a default Create(...) method.
|
|
||||||
configurator = &schedulerConfigurator{
|
|
||||||
configurator,
|
|
||||||
s.PolicyConfigFile,
|
|
||||||
s.AlgorithmProvider}
|
|
||||||
|
|
||||||
return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
|
||||||
cfg.Recorder = recorder
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user