refactor: remove configurator in scheduler

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
kerthcet 2022-03-20 23:57:26 +08:00
parent a504daa048
commit 5ecaeb325f
3 changed files with 103 additions and 220 deletions

View File

@ -18,27 +18,19 @@ package scheduler
import (
"context"
"errors"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
// Binder knows how to write a binding.
@ -46,154 +38,62 @@ type Binder interface {
Bind(binding *v1.Binding) error
}
// Configurator defines I/O, caching, and other functionality needed to
// construct a new scheduler.
type Configurator struct {
client clientset.Interface
kubeConfig *restclient.Config
func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
var fExtenders []framework.Extender
if len(extenders) == 0 {
return nil, nil
}
recorderFactory profile.RecorderFactory
informerFactory informers.SharedInformerFactory
// Close this to stop all reflectors
StopEverything <-chan struct{}
schedulerCache internalcache.Cache
componentConfigVersion string
// Always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
podMaxUnschedulableQDuration time.Duration
profiles []schedulerapi.KubeSchedulerProfile
registry frameworkruntime.Registry
nodeInfoSnapshot *internalcache.Snapshot
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallellism int32
// A "cluster event" -> "plugin names" map.
clusterEventMap map[framework.ClusterEvent]sets.String
}
// create a scheduler from a set of registered plugins.
func (c *Configurator) create() (*Scheduler, error) {
var extenders []framework.Extender
var ignoredExtendedResources []string
if len(c.extenders) != 0 {
var ignorableExtenders []framework.Extender
for ii := range c.extenders {
klog.V(2).InfoS("Creating extender", "extender", c.extenders[ii])
extender, err := NewHTTPExtender(&c.extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range c.extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
var ignorableExtenders []framework.Extender
for i := range extenders {
klog.V(2).InfoS("Creating extender", "extender", extenders[i])
extender, err := NewHTTPExtender(&extenders[i])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
fExtenders = append(fExtenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range extenders[i].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
}
// place ignorable extenders to the tail of extenders
fExtenders = append(fExtenders, ignorableExtenders...)
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
// For earlier versions, using both policy and custom plugin config is disallowed, so this should be the only
// plugin config for this plugin.
if len(ignoredExtendedResources) > 0 {
for i := range c.profiles {
prof := &c.profiles[i]
var found = false
for k := range prof.PluginConfig {
if prof.PluginConfig[k].Name == noderesources.Name {
// Update the existing args
pc := &prof.PluginConfig[k]
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
}
args.IgnoredResources = ignoredExtendedResources
found = true
break
if len(ignoredExtendedResources) == 0 {
return fExtenders, nil
}
for i := range profiles {
prof := &profiles[i]
var found = false
for k := range prof.PluginConfig {
if prof.PluginConfig[k].Name == noderesources.Name {
// Update the existing args
pc := &prof.PluginConfig[k]
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
}
}
if !found {
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
args.IgnoredResources = ignoredExtendedResources
found = true
break
}
}
if !found {
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
}
}
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister())
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion),
frameworkruntime.WithClientSet(c.client),
frameworkruntime.WithKubeConfig(c.kubeConfig),
frameworkruntime.WithInformerFactory(c.informerFactory),
frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot),
frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(c.clusterEventMap),
frameworkruntime.WithParallelism(int(c.parallellism)),
frameworkruntime.WithExtenders(extenders),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
c.informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(c.clusterEventMap),
internalqueue.WithPodMaxUnschedulableQDuration(c.podMaxUnschedulableQDuration),
)
// Setup cache debugger.
debugger := cachedebugger.New(
c.informerFactory.Core().V1().Nodes().Lister(),
c.informerFactory.Core().V1().Pods().Lister(),
c.schedulerCache,
podQueue,
)
debugger.ListenForSignal(c.StopEverything)
sched := newScheduler(
c.schedulerCache,
extenders,
internalqueue.MakeNextPodFunc(podQueue),
MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
c.StopEverything,
podQueue,
profiles,
c.client,
c.nodeInfoSnapshot,
c.percentageOfNodesToScore)
return sched, nil
return fExtenders, nil
}
// MakeDefaultErrorFunc construct a function to handle pod scheduler error

View File

@ -29,18 +29,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
testingclock "k8s.io/utils/clock/testing"
)
@ -50,16 +44,6 @@ const (
testSchedulerName = "test-scheduler"
)
func TestCreate(t *testing.T) {
client := fake.NewSimpleClientset()
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, stopCh)
if _, err := factory.create(); err != nil {
t.Error(err)
}
}
func TestDefaultErrorFunc(t *testing.T) {
testPod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "default"}}
testPodUpdated := testPod.DeepCopy()
@ -260,40 +244,6 @@ func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v
return nil
}
func newConfigFactoryWithFrameworkRegistry(
client clientset.Interface, stopCh <-chan struct{},
registry frameworkruntime.Registry) *Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
snapshot := internalcache.NewEmptySnapshot()
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}))
return &Configurator{
client: client,
informerFactory: informerFactory,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: podInitialBackoffDurationSeconds,
podMaxBackoffSeconds: podMaxBackoffDurationSeconds,
StopEverything: stopCh,
registry: registry,
profiles: []schedulerapi.KubeSchedulerProfile{
{
SchedulerName: testSchedulerName,
Plugins: &schedulerapi.Plugins{
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
},
},
},
recorderFactory: recorderFactory,
nodeInfoSnapshot: snapshot,
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
}
}
func newConfigFactory(client clientset.Interface, stopCh <-chan struct{}) *Configurator {
return newConfigFactoryWithFrameworkRegistry(client, stopCh,
frameworkplugins.NewInTreeRegistry())
}
type fakeExtender struct {
isBinder bool
interestedPodName string

View File

@ -18,6 +18,7 @@ package scheduler
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
@ -49,6 +50,7 @@ import (
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
@ -305,45 +307,76 @@ func New(client clientset.Interface,
}
options.profiles = cfg.Profiles
}
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
metrics.Register()
extenders, err := buildExtenders(options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewPodNominator(podLister)
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
configurator := &Configurator{
componentConfigVersion: options.componentConfigVersion,
client: client,
kubeConfig: options.kubeConfig,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
podMaxUnschedulableQDuration: options.podMaxUnschedulableQDuration,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
clusterEventMap: clusterEventMap,
}
metrics.Register()
// Create the config from component config
sched, err := configurator.create()
profiles, err := profile.NewMap(options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler: %v", err)
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxUnschedulableQDuration(options.podMaxUnschedulableQDuration),
)
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything)
sched := newScheduler(
schedulerCache,
extenders,
internalqueue.MakeNextPodFunc(podQueue),
MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),
stopEverything,
podQueue,
profiles,
client,
snapshot,
options.percentageOfNodesToScore,
)
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
return sched, nil