From 5ecaeb325f9420d41462d32ed3e0e72d2521af58 Mon Sep 17 00:00:00 2001 From: kerthcet Date: Sun, 20 Mar 2022 23:57:26 +0800 Subject: [PATCH] refactor: remove configurator in scheduler Signed-off-by: kerthcet --- pkg/scheduler/factory.go | 186 ++++++++-------------------------- pkg/scheduler/factory_test.go | 50 --------- pkg/scheduler/scheduler.go | 87 +++++++++++----- 3 files changed, 103 insertions(+), 220 deletions(-) diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 012f85cdc6d..efcd71636b1 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -18,27 +18,19 @@ package scheduler import ( "context" - "errors" "fmt" - "time" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" - restclient "k8s.io/client-go/rest" "k8s.io/klog/v2" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" - frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" - cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" - "k8s.io/kubernetes/pkg/scheduler/profile" ) // Binder knows how to write a binding. @@ -46,154 +38,62 @@ type Binder interface { Bind(binding *v1.Binding) error } -// Configurator defines I/O, caching, and other functionality needed to -// construct a new scheduler. -type Configurator struct { - client clientset.Interface - kubeConfig *restclient.Config +func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) { + var fExtenders []framework.Extender + if len(extenders) == 0 { + return nil, nil + } - recorderFactory profile.RecorderFactory - - informerFactory informers.SharedInformerFactory - - // Close this to stop all reflectors - StopEverything <-chan struct{} - - schedulerCache internalcache.Cache - - componentConfigVersion string - - // Always check all predicates even if the middle of one predicate fails. - alwaysCheckAllPredicates bool - - // percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle. - percentageOfNodesToScore int32 - - podInitialBackoffSeconds int64 - - podMaxBackoffSeconds int64 - - podMaxUnschedulableQDuration time.Duration - - profiles []schedulerapi.KubeSchedulerProfile - registry frameworkruntime.Registry - nodeInfoSnapshot *internalcache.Snapshot - extenders []schedulerapi.Extender - frameworkCapturer FrameworkCapturer - parallellism int32 - // A "cluster event" -> "plugin names" map. - clusterEventMap map[framework.ClusterEvent]sets.String -} - -// create a scheduler from a set of registered plugins. -func (c *Configurator) create() (*Scheduler, error) { - var extenders []framework.Extender var ignoredExtendedResources []string - if len(c.extenders) != 0 { - var ignorableExtenders []framework.Extender - for ii := range c.extenders { - klog.V(2).InfoS("Creating extender", "extender", c.extenders[ii]) - extender, err := NewHTTPExtender(&c.extenders[ii]) - if err != nil { - return nil, err - } - if !extender.IsIgnorable() { - extenders = append(extenders, extender) - } else { - ignorableExtenders = append(ignorableExtenders, extender) - } - for _, r := range c.extenders[ii].ManagedResources { - if r.IgnoredByScheduler { - ignoredExtendedResources = append(ignoredExtendedResources, r.Name) - } + var ignorableExtenders []framework.Extender + for i := range extenders { + klog.V(2).InfoS("Creating extender", "extender", extenders[i]) + extender, err := NewHTTPExtender(&extenders[i]) + if err != nil { + return nil, err + } + if !extender.IsIgnorable() { + fExtenders = append(fExtenders, extender) + } else { + ignorableExtenders = append(ignorableExtenders, extender) + } + for _, r := range extenders[i].ManagedResources { + if r.IgnoredByScheduler { + ignoredExtendedResources = append(ignoredExtendedResources, r.Name) } } - // place ignorable extenders to the tail of extenders - extenders = append(extenders, ignorableExtenders...) } + // place ignorable extenders to the tail of extenders + fExtenders = append(fExtenders, ignorableExtenders...) // If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile. // This should only have an effect on ComponentConfig, where it is possible to configure Extenders and // plugin args (and in which case the extender ignored resources take precedence). - // For earlier versions, using both policy and custom plugin config is disallowed, so this should be the only - // plugin config for this plugin. - if len(ignoredExtendedResources) > 0 { - for i := range c.profiles { - prof := &c.profiles[i] - var found = false - for k := range prof.PluginConfig { - if prof.PluginConfig[k].Name == noderesources.Name { - // Update the existing args - pc := &prof.PluginConfig[k] - args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs) - if !ok { - return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args) - } - args.IgnoredResources = ignoredExtendedResources - found = true - break + if len(ignoredExtendedResources) == 0 { + return fExtenders, nil + } + + for i := range profiles { + prof := &profiles[i] + var found = false + for k := range prof.PluginConfig { + if prof.PluginConfig[k].Name == noderesources.Name { + // Update the existing args + pc := &prof.PluginConfig[k] + args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs) + if !ok { + return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args) } - } - if !found { - return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config") + args.IgnoredResources = ignoredExtendedResources + found = true + break } } + if !found { + return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config") + } } - - // The nominator will be passed all the way to framework instantiation. - nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister()) - profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory, - frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion), - frameworkruntime.WithClientSet(c.client), - frameworkruntime.WithKubeConfig(c.kubeConfig), - frameworkruntime.WithInformerFactory(c.informerFactory), - frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot), - frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates), - frameworkruntime.WithPodNominator(nominator), - frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)), - frameworkruntime.WithClusterEventMap(c.clusterEventMap), - frameworkruntime.WithParallelism(int(c.parallellism)), - frameworkruntime.WithExtenders(extenders), - ) - if err != nil { - return nil, fmt.Errorf("initializing profiles: %v", err) - } - if len(profiles) == 0 { - return nil, errors.New("at least one profile is required") - } - // Profiles are required to have equivalent queue sort plugins. - lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc() - podQueue := internalqueue.NewSchedulingQueue( - lessFn, - c.informerFactory, - internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), - internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), - internalqueue.WithPodNominator(nominator), - internalqueue.WithClusterEventMap(c.clusterEventMap), - internalqueue.WithPodMaxUnschedulableQDuration(c.podMaxUnschedulableQDuration), - ) - - // Setup cache debugger. - debugger := cachedebugger.New( - c.informerFactory.Core().V1().Nodes().Lister(), - c.informerFactory.Core().V1().Pods().Lister(), - c.schedulerCache, - podQueue, - ) - debugger.ListenForSignal(c.StopEverything) - - sched := newScheduler( - c.schedulerCache, - extenders, - internalqueue.MakeNextPodFunc(podQueue), - MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), - c.StopEverything, - podQueue, - profiles, - c.client, - c.nodeInfoSnapshot, - c.percentageOfNodesToScore) - return sched, nil + return fExtenders, nil } // MakeDefaultErrorFunc construct a function to handle pod scheduler error diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 7a07aa7e686..8cdce31e37a 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -29,18 +29,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" - clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/events" extenderv1 "k8s.io/kube-scheduler/extender/v1" - schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" - frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" - frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" - "k8s.io/kubernetes/pkg/scheduler/profile" testingclock "k8s.io/utils/clock/testing" ) @@ -50,16 +44,6 @@ const ( testSchedulerName = "test-scheduler" ) -func TestCreate(t *testing.T) { - client := fake.NewSimpleClientset() - stopCh := make(chan struct{}) - defer close(stopCh) - factory := newConfigFactory(client, stopCh) - if _, err := factory.create(); err != nil { - t.Error(err) - } -} - func TestDefaultErrorFunc(t *testing.T) { testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}} testPodUpdated := testPod.DeepCopy() @@ -260,40 +244,6 @@ func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v return nil } -func newConfigFactoryWithFrameworkRegistry( - client clientset.Interface, stopCh <-chan struct{}, - registry frameworkruntime.Registry) *Configurator { - informerFactory := informers.NewSharedInformerFactory(client, 0) - snapshot := internalcache.NewEmptySnapshot() - recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})) - return &Configurator{ - client: client, - informerFactory: informerFactory, - percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, - podInitialBackoffSeconds: podInitialBackoffDurationSeconds, - podMaxBackoffSeconds: podMaxBackoffDurationSeconds, - StopEverything: stopCh, - registry: registry, - profiles: []schedulerapi.KubeSchedulerProfile{ - { - SchedulerName: testSchedulerName, - Plugins: &schedulerapi.Plugins{ - QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, - Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, - }, - }, - }, - recorderFactory: recorderFactory, - nodeInfoSnapshot: snapshot, - clusterEventMap: make(map[framework.ClusterEvent]sets.String), - } -} - -func newConfigFactory(client clientset.Interface, stopCh <-chan struct{}) *Configurator { - return newConfigFactoryWithFrameworkRegistry(client, stopCh, - frameworkplugins.NewInTreeRegistry()) -} - type fakeExtender struct { isBinder bool interestedPodName string diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2315b505c72..ed47a2356ef 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -18,6 +18,7 @@ package scheduler import ( "context" + "errors" "fmt" "math/rand" "strconv" @@ -49,6 +50,7 @@ import ( frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/profile" @@ -305,45 +307,76 @@ func New(client clientset.Interface, } options.profiles = cfg.Profiles } - schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything) registry := frameworkplugins.NewInTreeRegistry() if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err } + metrics.Register() + + extenders, err := buildExtenders(options.extenders, options.profiles) + if err != nil { + return nil, fmt.Errorf("couldn't build extenders: %w", err) + } + + podLister := informerFactory.Core().V1().Pods().Lister() + nodeLister := informerFactory.Core().V1().Nodes().Lister() + + // The nominator will be passed all the way to framework instantiation. + nominator := internalqueue.NewPodNominator(podLister) snapshot := internalcache.NewEmptySnapshot() clusterEventMap := make(map[framework.ClusterEvent]sets.String) - configurator := &Configurator{ - componentConfigVersion: options.componentConfigVersion, - client: client, - kubeConfig: options.kubeConfig, - recorderFactory: recorderFactory, - informerFactory: informerFactory, - schedulerCache: schedulerCache, - StopEverything: stopEverything, - percentageOfNodesToScore: options.percentageOfNodesToScore, - podInitialBackoffSeconds: options.podInitialBackoffSeconds, - podMaxBackoffSeconds: options.podMaxBackoffSeconds, - podMaxUnschedulableQDuration: options.podMaxUnschedulableQDuration, - profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), - registry: registry, - nodeInfoSnapshot: snapshot, - extenders: options.extenders, - frameworkCapturer: options.frameworkCapturer, - parallellism: options.parallelism, - clusterEventMap: clusterEventMap, - } - - metrics.Register() - - // Create the config from component config - sched, err := configurator.create() + profiles, err := profile.NewMap(options.profiles, registry, recorderFactory, + frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), + frameworkruntime.WithClientSet(client), + frameworkruntime.WithKubeConfig(options.kubeConfig), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithPodNominator(nominator), + frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), + frameworkruntime.WithClusterEventMap(clusterEventMap), + frameworkruntime.WithParallelism(int(options.parallelism)), + frameworkruntime.WithExtenders(extenders), + ) if err != nil { - return nil, fmt.Errorf("couldn't create scheduler: %v", err) + return nil, fmt.Errorf("initializing profiles: %v", err) } + if len(profiles) == 0 { + return nil, errors.New("at least one profile is required") + } + + podQueue := internalqueue.NewSchedulingQueue( + profiles[options.profiles[0].SchedulerName].QueueSortFunc(), + informerFactory, + internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), + internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), + internalqueue.WithPodNominator(nominator), + internalqueue.WithClusterEventMap(clusterEventMap), + internalqueue.WithPodMaxUnschedulableQDuration(options.podMaxUnschedulableQDuration), + ) + + schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything) + + // Setup cache debugger. + debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue) + debugger.ListenForSignal(stopEverything) + + sched := newScheduler( + schedulerCache, + extenders, + internalqueue.MakeNextPodFunc(podQueue), + MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache), + stopEverything, + podQueue, + profiles, + client, + snapshot, + options.percentageOfNodesToScore, + ) + addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) return sched, nil