Surface kube config in scheduler framework handle

This commit is contained in:
Wei Huang 2021-03-29 12:27:27 -07:00
parent 3d48f0d1dd
commit e7f67b1a63
No known key found for this signature in database
GPG Key ID: BE5E9752F8B6E005
11 changed files with 55 additions and 14 deletions

View File

@ -41,6 +41,7 @@ type Config struct {
SecureServing *apiserver.SecureServingInfo SecureServing *apiserver.SecureServingInfo
Client clientset.Interface Client clientset.Interface
KubeConfig *restclient.Config
InformerFactory informers.SharedInformerFactory InformerFactory informers.SharedInformerFactory
//lint:ignore SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done. //lint:ignore SA1019 this deprecated field still needs to be used for now. It will be removed once the migration is done.

View File

@ -289,6 +289,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
} }
c.Client = client c.Client = client
c.KubeConfig = kubeConfig
c.InformerFactory = scheduler.NewInformerFactory(client, 0) c.InformerFactory = scheduler.NewInformerFactory(client, 0)
c.LeaderElection = leaderElectionConfig c.LeaderElection = leaderElectionConfig

View File

@ -323,6 +323,7 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
cc.InformerFactory, cc.InformerFactory,
recorderFactory, recorderFactory,
ctx.Done(), ctx.Done(),
scheduler.WithKubeConfig(cc.KubeConfig),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),

View File

@ -31,6 +31,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -57,7 +58,8 @@ type Binder interface {
// Configurator defines I/O, caching, and other functionality needed to // Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler. // construct a new scheduler.
type Configurator struct { type Configurator struct {
client clientset.Interface client clientset.Interface
kubeConfig *restclient.Config
recorderFactory profile.RecorderFactory recorderFactory profile.RecorderFactory
@ -137,6 +139,7 @@ func (c *Configurator) create() (*Scheduler, error) {
clusterEventMap := make(map[framework.ClusterEvent]sets.String) clusterEventMap := make(map[framework.ClusterEvent]sets.String)
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
frameworkruntime.WithClientSet(c.client), frameworkruntime.WithClientSet(c.client),
frameworkruntime.WithKubeConfig(c.kubeConfig),
frameworkruntime.WithInformerFactory(c.informerFactory), frameworkruntime.WithInformerFactory(c.informerFactory),
frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot), frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot),
frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates), frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates),

View File

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
@ -588,6 +589,9 @@ type Handle interface {
// ClientSet returns a kubernetes clientSet. // ClientSet returns a kubernetes clientSet.
ClientSet() clientset.Interface ClientSet() clientset.Interface
// KubeConfig returns the raw kube config.
KubeConfig() *restclient.Config
// EventRecorder returns an event recorder. // EventRecorder returns an event recorder.
EventRecorder() events.EventRecorder EventRecorder() events.EventRecorder

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/component-helpers/scheduling/corev1" "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -92,6 +93,7 @@ type frameworkImpl struct {
permitPlugins []framework.PermitPlugin permitPlugins []framework.PermitPlugin
clientSet clientset.Interface clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
@ -142,6 +144,7 @@ func (f *frameworkImpl) Extenders() []framework.Extender {
type frameworkOptions struct { type frameworkOptions struct {
clientSet clientset.Interface clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
snapshotSharedLister framework.SharedLister snapshotSharedLister framework.SharedLister
@ -164,6 +167,13 @@ func WithClientSet(clientSet clientset.Interface) Option {
} }
} }
// WithKubeConfig sets kubeConfig for the scheduling frameworkImpl.
func WithKubeConfig(kubeConfig *restclient.Config) Option {
return func(o *frameworkOptions) {
o.kubeConfig = kubeConfig
}
}
// WithEventRecorder sets clientSet for the scheduling frameworkImpl. // WithEventRecorder sets clientSet for the scheduling frameworkImpl.
func WithEventRecorder(recorder events.EventRecorder) Option { func WithEventRecorder(recorder events.EventRecorder) Option {
return func(o *frameworkOptions) { return func(o *frameworkOptions) {
@ -254,6 +264,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
pluginNameToWeightMap: make(map[string]int), pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(), waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet, clientSet: options.clientSet,
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder, eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory, informerFactory: options.informerFactory,
metricsRecorder: options.metricsRecorder, metricsRecorder: options.metricsRecorder,
@ -1149,6 +1160,11 @@ func (f *frameworkImpl) ClientSet() clientset.Interface {
return f.clientSet return f.clientSet
} }
// KubeConfig returns a kubernetes config.
func (f *frameworkImpl) KubeConfig() *restclient.Config {
return f.kubeConfig
}
// EventRecorder returns an event recorder. // EventRecorder returns an event recorder.
func (f *frameworkImpl) EventRecorder() events.EventRecorder { func (f *frameworkImpl) EventRecorder() events.EventRecorder {
return f.eventRecorder return f.eventRecorder

View File

@ -32,6 +32,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2" "k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -89,6 +90,7 @@ type Scheduler struct {
} }
type schedulerOptions struct { type schedulerOptions struct {
kubeConfig *restclient.Config
schedulerAlgorithmSource schedulerapi.SchedulerAlgorithmSource schedulerAlgorithmSource schedulerapi.SchedulerAlgorithmSource
percentageOfNodesToScore int32 percentageOfNodesToScore int32
podInitialBackoffSeconds int64 podInitialBackoffSeconds int64
@ -104,6 +106,13 @@ type schedulerOptions struct {
// Option configures a Scheduler // Option configures a Scheduler
type Option func(*schedulerOptions) type Option func(*schedulerOptions)
// WithKubeConfig sets the kube config for Scheduler.
func WithKubeConfig(cfg *restclient.Config) Option {
return func(o *schedulerOptions) {
o.kubeConfig = cfg
}
}
// WithProfiles sets profiles for Scheduler. By default, there is one profile // WithProfiles sets profiles for Scheduler. By default, there is one profile
// with the name "default-scheduler". // with the name "default-scheduler".
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option { func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
@ -214,6 +223,7 @@ func New(client clientset.Interface,
configurator := &Configurator{ configurator := &Configurator{
client: client, client: client,
kubeConfig: options.kubeConfig,
recorderFactory: recorderFactory, recorderFactory: recorderFactory,
informerFactory: informerFactory, informerFactory: informerFactory,
schedulerCache: schedulerCache, schedulerCache: schedulerCache,

View File

@ -176,7 +176,8 @@ func TestSchedulerCreation(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
s, err := New(client, s, err := New(
client,
informerFactory, informerFactory,
profile.NewRecorderFactory(eventBroadcaster), profile.NewRecorderFactory(eventBroadcaster),
stopCh, stopCh,
@ -456,7 +457,8 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
defer cancel() defer cancel()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := New(client, sched, err := New(
client,
informerFactory, informerFactory,
profile.NewRecorderFactory(broadcaster), profile.NewRecorderFactory(broadcaster),
ctx.Done(), ctx.Done(),

View File

@ -92,7 +92,7 @@ type BindPlugin struct {
numBindCalled int numBindCalled int
PluginName string PluginName string
bindStatus *framework.Status bindStatus *framework.Status
client *clientset.Clientset client clientset.Interface
pluginInvokeEventChan chan pluginInvokeEvent pluginInvokeEventChan chan pluginInvokeEvent
} }

View File

@ -99,7 +99,7 @@ func mustSetupScheduler(config *config.KubeSchedulerConfiguration) (util.Shutdow
// Not all config options will be effective but only those mostly related with scheduler performance will // Not all config options will be effective but only those mostly related with scheduler performance will
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
_, podInformer, schedulerShutdown := util.StartScheduler(client, config) _, podInformer, schedulerShutdown := util.StartScheduler(client, cfg, config)
fakePVControllerShutdown := util.StartFakePVController(client) fakePVControllerShutdown := util.StartFakePVController(client)
shutdownFunc := func() { shutdownFunc := func() {

View File

@ -75,7 +75,7 @@ func StartApiserver() (string, ShutdownFunc) {
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface // StartScheduler configures and starts a scheduler given a handle to the clientSet interface
// and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it. // and event broadcaster. It returns the running scheduler, podInformer and the shutdown function to stop it.
func StartScheduler(clientSet clientset.Interface, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) { func StartScheduler(clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration) (*scheduler.Scheduler, coreinformers.PodInformer, ShutdownFunc) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
informerFactory := scheduler.NewInformerFactory(clientSet, 0) informerFactory := scheduler.NewInformerFactory(clientSet, 0)
@ -89,6 +89,7 @@ func StartScheduler(clientSet clientset.Interface, cfg *kubeschedulerconfig.Kube
informerFactory, informerFactory,
profile.NewRecorderFactory(evtBroadcaster), profile.NewRecorderFactory(evtBroadcaster),
ctx.Done(), ctx.Done(),
scheduler.WithKubeConfig(kubeConfig),
scheduler.WithProfiles(cfg.Profiles...), scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithAlgorithmSource(cfg.AlgorithmSource), scheduler.WithAlgorithmSource(cfg.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore), scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore),
@ -159,7 +160,8 @@ type TestContext struct {
CloseFn framework.CloseFunc CloseFn framework.CloseFunc
HTTPServer *httptest.Server HTTPServer *httptest.Server
NS *v1.Namespace NS *v1.Namespace
ClientSet *clientset.Clientset ClientSet clientset.Interface
KubeConfig *restclient.Config
InformerFactory informers.SharedInformerFactory InformerFactory informers.SharedInformerFactory
Scheduler *scheduler.Scheduler Scheduler *scheduler.Scheduler
Ctx context.Context Ctx context.Context
@ -349,14 +351,14 @@ func InitTestMaster(t *testing.T, nsPrefix string, admission admission.Interface
} }
// 2. Create kubeclient // 2. Create kubeclient
testCtx.ClientSet = clientset.NewForConfigOrDie( kubeConfig := &restclient.Config{
&restclient.Config{ QPS: -1, Host: s.URL,
QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{
ContentConfig: restclient.ContentConfig{ GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"},
GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"},
},
}, },
) }
testCtx.KubeConfig = kubeConfig
testCtx.ClientSet = clientset.NewForConfigOrDie(kubeConfig)
return &testCtx return &testCtx
} }
@ -403,6 +405,7 @@ func InitTestSchedulerWithOptions(
if policy != nil { if policy != nil {
opts = append(opts, scheduler.WithAlgorithmSource(CreateAlgorithmSourceFromPolicy(policy, testCtx.ClientSet))) opts = append(opts, scheduler.WithAlgorithmSource(CreateAlgorithmSourceFromPolicy(policy, testCtx.ClientSet)))
} }
opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig))
testCtx.Scheduler, err = scheduler.New( testCtx.Scheduler, err = scheduler.New(
testCtx.ClientSet, testCtx.ClientSet,
testCtx.InformerFactory, testCtx.InformerFactory,