mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #80590 from alculquicondor/refactor/configurator-pod
Remove Configurator interface
This commit is contained in:
commit
1a01d10f5f
@ -5,14 +5,11 @@ go_library(
|
||||
srcs = [
|
||||
"eventhandlers.go",
|
||||
"scheduler.go",
|
||||
"testutil.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/api:go_default_library",
|
||||
"//pkg/scheduler/api/latest:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
@ -20,7 +17,6 @@ go_library(
|
||||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/queue:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
@ -28,7 +24,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
||||
@ -37,7 +32,6 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
|
@ -1097,7 +1097,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
|
||||
if _, err := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: "some-scheduler-name",
|
||||
Client: client,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: informerFactory.Core().V1().Pods(),
|
||||
|
@ -27,8 +27,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
|
@ -22,12 +22,10 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -139,32 +137,8 @@ type PodPreemptor interface {
|
||||
}
|
||||
|
||||
// Configurator defines I/O, caching, and other functionality needed to
|
||||
// construct a new scheduler. An implementation of this can be seen in
|
||||
// factory.go.
|
||||
type Configurator interface {
|
||||
// Exposed for testing
|
||||
GetHardPodAffinitySymmetricWeight() int32
|
||||
|
||||
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
|
||||
GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error)
|
||||
GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error)
|
||||
|
||||
// Exposed for testing
|
||||
GetClient() clientset.Interface
|
||||
|
||||
// TODO(#80216): Remove GetScheduledPodLister from the interface.
|
||||
// Exposed for testing
|
||||
GetScheduledPodLister() corelisters.PodLister
|
||||
|
||||
Create() (*Config, error)
|
||||
CreateFromProvider(providerName string) (*Config, error)
|
||||
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
|
||||
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
|
||||
}
|
||||
|
||||
// configFactory is the default implementation of the scheduler.Configurator interface.
|
||||
// TODO(#80216): Remove pod lister.
|
||||
type configFactory struct {
|
||||
// construct a new scheduler.
|
||||
type Configurator struct {
|
||||
client clientset.Interface
|
||||
// a means to list all known scheduled pods.
|
||||
scheduledPodLister corelisters.PodLister
|
||||
@ -196,10 +170,6 @@ type configFactory struct {
|
||||
|
||||
schedulerCache internalcache.Cache
|
||||
|
||||
// SchedulerName of a scheduler is used to select which pods will be
|
||||
// processed by this scheduler, based on pods's "spec.schedulerName".
|
||||
schedulerName string
|
||||
|
||||
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
|
||||
// corresponding to every RequiredDuringScheduling affinity rule.
|
||||
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
|
||||
@ -226,7 +196,6 @@ type configFactory struct {
|
||||
|
||||
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
|
||||
type ConfigFactoryArgs struct {
|
||||
SchedulerName string
|
||||
Client clientset.Interface
|
||||
NodeInformer coreinformers.NodeInformer
|
||||
PodInformer coreinformers.PodInformer
|
||||
@ -251,7 +220,7 @@ type ConfigFactoryArgs struct {
|
||||
|
||||
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
|
||||
// return the interface.
|
||||
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
|
||||
stopEverything := args.StopCh
|
||||
if stopEverything == nil {
|
||||
stopEverything = wait.NeverStop
|
||||
@ -274,7 +243,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
csiNodeLister = args.CSINodeInformer.Lister()
|
||||
}
|
||||
|
||||
c := &configFactory{
|
||||
c := &Configurator{
|
||||
client: args.Client,
|
||||
podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
|
||||
pVLister: args.PvInformer.Lister(),
|
||||
@ -289,7 +258,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
framework: framework,
|
||||
schedulerCache: schedulerCache,
|
||||
StopEverything: stopEverything,
|
||||
schedulerName: args.SchedulerName,
|
||||
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
|
||||
disablePreemption: args.DisablePreemption,
|
||||
percentageOfNodesToScore: args.PercentageOfNodesToScore,
|
||||
@ -299,9 +267,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
// Setup volume binder
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
|
||||
// ScheduledPodLister is something we provide to plug-in functions that
|
||||
// they may need to call.
|
||||
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
|
||||
|
||||
// Setup cache debugger
|
||||
debugger := cachedebugger.New(
|
||||
@ -319,31 +284,18 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *configFactory) GetHardPodAffinitySymmetricWeight() int32 {
|
||||
// GetHardPodAffinitySymmetricWeight is exposed for testing.
|
||||
func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 {
|
||||
return c.hardPodAffinitySymmetricWeight
|
||||
}
|
||||
|
||||
func (c *configFactory) GetSchedulerName() string {
|
||||
return c.schedulerName
|
||||
}
|
||||
|
||||
// GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *configFactory) GetClient() clientset.Interface {
|
||||
return c.client
|
||||
}
|
||||
|
||||
// GetScheduledPodLister provides a pod lister, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
|
||||
return c.scheduledPodLister
|
||||
}
|
||||
|
||||
// Create creates a scheduler with the default algorithm provider.
|
||||
func (c *configFactory) Create() (*Config, error) {
|
||||
func (c *Configurator) Create() (*Config, error) {
|
||||
return c.CreateFromProvider(DefaultProvider)
|
||||
}
|
||||
|
||||
// Creates a scheduler from the name of a registered algorithm provider.
|
||||
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
|
||||
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
|
||||
func (c *Configurator) CreateFromProvider(providerName string) (*Config, error) {
|
||||
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
|
||||
provider, err := GetAlgorithmProvider(providerName)
|
||||
if err != nil {
|
||||
@ -352,8 +304,8 @@ func (c *configFactory) CreateFromProvider(providerName string) (*Config, error)
|
||||
return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
|
||||
}
|
||||
|
||||
// Creates a scheduler from the configuration file
|
||||
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
|
||||
// CreateFromConfig creates a scheduler from the configuration file
|
||||
func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
|
||||
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
|
||||
|
||||
// validate the policy configuration
|
||||
@ -430,8 +382,8 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, e
|
||||
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
|
||||
}
|
||||
|
||||
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
|
||||
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
|
||||
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
|
||||
|
||||
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
|
||||
@ -443,12 +395,12 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
|
||||
priorityConfigs, err := c.getPriorityFunctionConfigs(priorityKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
|
||||
priorityMetaProducer, err := c.getPriorityMetadataProducer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -514,7 +466,7 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
|
||||
}
|
||||
}
|
||||
|
||||
func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
|
||||
func (c *Configurator) getPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -523,7 +475,7 @@ func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]
|
||||
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
|
||||
}
|
||||
|
||||
func (c *configFactory) GetPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
|
||||
func (c *Configurator) getPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -532,7 +484,9 @@ func (c *configFactory) GetPriorityMetadataProducer() (priorities.PriorityMetada
|
||||
return getPriorityMetadataProducer(*pluginArgs)
|
||||
}
|
||||
|
||||
func (c *configFactory) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
|
||||
// GetPredicateMetadataProducer returns a function to build Predicate Metadata.
|
||||
// It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler.
|
||||
func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -540,7 +494,9 @@ func (c *configFactory) GetPredicateMetadataProducer() (predicates.PredicateMeta
|
||||
return getPredicateMetadataProducer(*pluginArgs)
|
||||
}
|
||||
|
||||
func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
||||
// GetPredicates returns the predicate functions.
|
||||
// It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler.
|
||||
func (c *Configurator) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -549,7 +505,7 @@ func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]pre
|
||||
return getFitPredicateFunctions(predicateKeys, *pluginArgs)
|
||||
}
|
||||
|
||||
func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
func (c *Configurator) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
return &PluginFactoryArgs{
|
||||
PodLister: c.schedulerCache,
|
||||
ServiceLister: c.serviceLister,
|
||||
@ -568,65 +524,6 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// assignedPodLister filters the pods returned from a PodLister to
|
||||
// only include those that have a node name set.
|
||||
type assignedPodLister struct {
|
||||
corelisters.PodLister
|
||||
}
|
||||
|
||||
// List lists all Pods in the indexer for a given namespace.
|
||||
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
|
||||
list, err := l.PodLister.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filtered := make([]*v1.Pod, 0, len(list))
|
||||
for _, pod := range list {
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
filtered = append(filtered, pod)
|
||||
}
|
||||
}
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
// List lists all Pods in the indexer for a given namespace.
|
||||
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
|
||||
return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
|
||||
}
|
||||
|
||||
// assignedPodNamespaceLister filters the pods returned from a PodNamespaceLister to
|
||||
// only include those that have a node name set.
|
||||
type assignedPodNamespaceLister struct {
|
||||
corelisters.PodNamespaceLister
|
||||
}
|
||||
|
||||
// List lists all Pods in the indexer for a given namespace.
|
||||
func (l assignedPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
|
||||
list, err := l.PodNamespaceLister.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filtered := make([]*v1.Pod, 0, len(list))
|
||||
for _, pod := range list {
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
filtered = append(filtered, pod)
|
||||
}
|
||||
}
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
// Get retrieves the Pod from the indexer for a given namespace and name.
|
||||
func (l assignedPodNamespaceLister) Get(name string) (*v1.Pod, error) {
|
||||
pod, err := l.PodNamespaceLister.Get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
return pod, nil
|
||||
}
|
||||
return nil, errors.NewNotFound(schema.GroupResource{Resource: string(v1.ResourcePods)}, name)
|
||||
}
|
||||
|
||||
type podInformer struct {
|
||||
informer cache.SharedIndexInformer
|
||||
}
|
||||
|
@ -476,10 +476,9 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator {
|
||||
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
return NewConfigFactory(&ConfigFactoryArgs{
|
||||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
@ -608,7 +607,7 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
|
||||
},
|
||||
}
|
||||
|
||||
f := &configFactory{}
|
||||
f := &Configurator{}
|
||||
binderFunc := getBinderFunc(f.client, extenders)
|
||||
binder := binderFunc(pod)
|
||||
|
||||
|
@ -144,7 +144,6 @@ func New(client clientset.Interface,
|
||||
}
|
||||
// Set up the configurator which can create schedulers from configs.
|
||||
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: options.schedulerName,
|
||||
Client: client,
|
||||
NodeInformer: nodeInformer,
|
||||
PodInformer: podInformer,
|
||||
|
@ -57,12 +57,14 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||
)
|
||||
|
||||
// EmptyFramework is an empty framework used in tests.
|
||||
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
|
||||
var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)
|
||||
|
||||
// EmptyPluginConfig is an empty plugin config used in tests.
|
||||
var EmptyPluginConfig = []kubeschedulerconfig.PluginConfig{}
|
||||
var (
|
||||
emptyPluginRegistry = framework.Registry{}
|
||||
// emptyPluginConfig is an empty plugin config used in tests.
|
||||
emptyPluginConfig []kubeschedulerconfig.PluginConfig
|
||||
// emptyFramework is an empty framework used in tests.
|
||||
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
|
||||
emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, emptyPluginConfig)
|
||||
)
|
||||
|
||||
type fakeBinder struct {
|
||||
b func(binding *v1.Binding) error
|
||||
@ -195,9 +197,9 @@ func TestSchedulerCreation(t *testing.T) {
|
||||
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
|
||||
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
|
||||
stopCh,
|
||||
EmptyPluginRegistry,
|
||||
emptyPluginRegistry,
|
||||
nil,
|
||||
EmptyPluginConfig,
|
||||
emptyPluginConfig,
|
||||
WithBindTimeoutSeconds(defaultBindTimeout))
|
||||
|
||||
if err != nil {
|
||||
@ -299,7 +301,7 @@ func TestScheduler(t *testing.T) {
|
||||
NextPod: func() *v1.Pod {
|
||||
return item.sendPod
|
||||
},
|
||||
Framework: EmptyFramework,
|
||||
Framework: emptyFramework,
|
||||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
|
||||
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||
})
|
||||
@ -649,7 +651,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
predicates.EmptyPredicateMetadataProducer,
|
||||
[]priorities.PriorityConfig{},
|
||||
priorities.EmptyPriorityMetadataProducer,
|
||||
EmptyFramework,
|
||||
emptyFramework,
|
||||
[]algorithm.SchedulerExtender{},
|
||||
nil,
|
||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||
@ -681,7 +683,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
Recorder: &events.FakeRecorder{},
|
||||
PodConditionUpdater: fakePodConditionUpdater{},
|
||||
PodPreemptor: fakePodPreemptor{},
|
||||
Framework: EmptyFramework,
|
||||
Framework: emptyFramework,
|
||||
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||
}
|
||||
|
||||
@ -695,7 +697,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
}
|
||||
|
||||
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
||||
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{})
|
||||
framework, _ := framework.NewFramework(emptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{})
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
internalqueue.NewSchedulingQueue(nil, nil),
|
||||
|
@ -1,90 +0,0 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||
)
|
||||
|
||||
// FakeConfigurator is an implementation for test.
|
||||
type FakeConfigurator struct {
|
||||
Config *factory.Config
|
||||
}
|
||||
|
||||
// GetPredicateMetadataProducer is not implemented yet.
|
||||
func (fc *FakeConfigurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// GetPredicates is not implemented yet.
|
||||
func (fc *FakeConfigurator) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// GetHardPodAffinitySymmetricWeight is not implemented yet.
|
||||
func (fc *FakeConfigurator) GetHardPodAffinitySymmetricWeight() int32 {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// MakeDefaultErrorFunc is not implemented yet.
|
||||
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClient is not implemented yet.
|
||||
func (fc *FakeConfigurator) GetClient() clientset.Interface {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetScheduledPodLister is not implemented yet.
|
||||
func (fc *FakeConfigurator) GetScheduledPodLister() corelisters.PodLister {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create returns FakeConfigurator.Config
|
||||
func (fc *FakeConfigurator) Create() (*factory.Config, error) {
|
||||
return fc.Config, nil
|
||||
}
|
||||
|
||||
// CreateFromProvider returns FakeConfigurator.Config
|
||||
func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*factory.Config, error) {
|
||||
return fc.Config, nil
|
||||
}
|
||||
|
||||
// CreateFromConfig returns FakeConfigurator.Config
|
||||
func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*factory.Config, error) {
|
||||
return fc.Config, nil
|
||||
}
|
||||
|
||||
// CreateFromKeys returns FakeConfigurator.Config
|
||||
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) {
|
||||
return fc.Config, nil
|
||||
}
|
||||
|
||||
// EmptyPluginRegistry is an empty plugin registry used in tests.
|
||||
var EmptyPluginRegistry = framework.Registry{}
|
@ -84,7 +84,6 @@ func createConfiguratorArgsWithPodInformer(
|
||||
stopCh <-chan struct{},
|
||||
) *factory.ConfigFactoryArgs {
|
||||
return &factory.ConfigFactoryArgs{
|
||||
SchedulerName: schedulerName,
|
||||
Client: clientSet,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: podInformer,
|
||||
|
@ -42,6 +42,7 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/csi-translation-lib/plugins"
|
||||
@ -359,9 +358,9 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
||||
if b.N < minPods {
|
||||
b.N = minPods
|
||||
}
|
||||
schedulerConfigFactory, finalFunc := mustSetupScheduler()
|
||||
schedulerConfigArgs, finalFunc := mustSetupScheduler()
|
||||
defer finalFunc()
|
||||
c := schedulerConfigFactory.GetClient()
|
||||
c := schedulerConfigArgs.Client
|
||||
|
||||
nodePreparer := framework.NewIntegrationTestNodePreparer(
|
||||
c,
|
||||
@ -378,8 +377,9 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
||||
podCreator := testutils.NewTestPodCreator(c, config)
|
||||
podCreator.CreatePods()
|
||||
|
||||
podLister := schedulerConfigArgs.PodInformer.Lister()
|
||||
for {
|
||||
scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything())
|
||||
scheduled, err := getScheduledPods(podLister)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
@ -397,7 +397,7 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
||||
for {
|
||||
// This can potentially affect performance of scheduler, since List() is done under mutex.
|
||||
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
||||
scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything())
|
||||
scheduled, err := getScheduledPods(podLister)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
|
@ -18,17 +18,19 @@ package benchmark
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -101,22 +103,22 @@ func TestSchedule100Node3KPods(t *testing.T) {
|
||||
|
||||
// testConfig contains the some input parameters needed for running test-suite
|
||||
type testConfig struct {
|
||||
numPods int
|
||||
numNodes int
|
||||
mutatedNodeTemplate *v1.Node
|
||||
mutatedPodTemplate *v1.Pod
|
||||
schedulerSupportFunctions factory.Configurator
|
||||
destroyFunc func()
|
||||
numPods int
|
||||
numNodes int
|
||||
mutatedNodeTemplate *v1.Node
|
||||
mutatedPodTemplate *v1.Pod
|
||||
schedulerSupport *factory.ConfigFactoryArgs
|
||||
destroyFunc func()
|
||||
}
|
||||
|
||||
// getBaseConfig returns baseConfig after initializing number of nodes and pods.
|
||||
func getBaseConfig(nodes int, pods int) *testConfig {
|
||||
schedulerConfigFactory, destroyFunc := mustSetupScheduler()
|
||||
schedulerConfigArgs, destroyFunc := mustSetupScheduler()
|
||||
return &testConfig{
|
||||
schedulerSupportFunctions: schedulerConfigFactory,
|
||||
destroyFunc: destroyFunc,
|
||||
numNodes: nodes,
|
||||
numPods: pods,
|
||||
schedulerSupport: schedulerConfigArgs,
|
||||
destroyFunc: destroyFunc,
|
||||
numNodes: nodes,
|
||||
numPods: pods,
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,10 +134,11 @@ func schedulePods(config *testConfig) int32 {
|
||||
// We are interested in low scheduling rates (i.e. qps=2),
|
||||
minQPS := int32(math.MaxInt32)
|
||||
start := time.Now()
|
||||
podLister := config.schedulerSupport.PodInformer.Lister()
|
||||
// Bake in time for the first pod scheduling event.
|
||||
for {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything())
|
||||
scheduled, err := getScheduledPods(podLister)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
@ -153,7 +156,7 @@ func schedulePods(config *testConfig) int32 {
|
||||
// This can potentially affect performance of scheduler, since List() is done under mutex.
|
||||
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
|
||||
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
||||
scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything())
|
||||
scheduled, err := getScheduledPods(podLister)
|
||||
if err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
@ -183,6 +186,20 @@ func schedulePods(config *testConfig) int32 {
|
||||
}
|
||||
}
|
||||
|
||||
func getScheduledPods(lister listers.PodLister) ([]*v1.Pod, error) {
|
||||
all, err := lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
scheduled := make([]*v1.Pod, 0, len(all))
|
||||
for _, pod := range all {
|
||||
if len(pod.Spec.NodeName) > 0 {
|
||||
scheduled = append(scheduled, pod)
|
||||
}
|
||||
}
|
||||
return scheduled, nil
|
||||
}
|
||||
|
||||
// mutateNodeTemplate returns the modified node needed for creation of nodes.
|
||||
func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) {
|
||||
labels := make(map[string]string)
|
||||
@ -220,19 +237,17 @@ func (na nodeAffinity) mutatePodTemplate(pod *v1.Pod) {
|
||||
// generateNodes generates nodes to be used for scheduling.
|
||||
func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) {
|
||||
for i := 0; i < inputConfig.NodeCount; i++ {
|
||||
config.schedulerSupportFunctions.GetClient().CoreV1().Nodes().Create(config.mutatedNodeTemplate)
|
||||
|
||||
config.schedulerSupport.Client.CoreV1().Nodes().Create(config.mutatedNodeTemplate)
|
||||
}
|
||||
for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ {
|
||||
config.schedulerSupportFunctions.GetClient().CoreV1().Nodes().Create(baseNodeTemplate)
|
||||
|
||||
config.schedulerSupport.Client.CoreV1().Nodes().Create(baseNodeTemplate)
|
||||
}
|
||||
}
|
||||
|
||||
// generatePods generates pods to be used for scheduling.
|
||||
func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) {
|
||||
testutils.CreatePod(config.schedulerSupportFunctions.GetClient(), "sample", inputConfig.PodCount, config.mutatedPodTemplate)
|
||||
testutils.CreatePod(config.schedulerSupportFunctions.GetClient(), "sample", config.numPods-inputConfig.PodCount, basePodTemplate)
|
||||
testutils.CreatePod(config.schedulerSupport.Client, "sample", inputConfig.PodCount, config.mutatedPodTemplate)
|
||||
testutils.CreatePod(config.schedulerSupport.Client, "sample", config.numPods-inputConfig.PodCount, basePodTemplate)
|
||||
}
|
||||
|
||||
// generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects.
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
// remove resources after finished.
|
||||
// Notes on rate limiter:
|
||||
// - client rate limit is set to 5000.
|
||||
func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) {
|
||||
func mustSetupScheduler() (*factory.ConfigFactoryArgs, util.ShutdownFunc) {
|
||||
apiURL, apiShutdown := util.StartApiserver()
|
||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
|
||||
Host: apiURL,
|
||||
@ -39,11 +39,11 @@ func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) {
|
||||
QPS: 5000.0,
|
||||
Burst: 5000,
|
||||
})
|
||||
schedulerConfig, schedulerShutdown := util.StartScheduler(clientSet)
|
||||
schedulerConfigArgs, schedulerShutdown := util.StartScheduler(clientSet)
|
||||
|
||||
shutdownFunc := func() {
|
||||
schedulerShutdown()
|
||||
apiShutdown()
|
||||
}
|
||||
return schedulerConfig, shutdownFunc
|
||||
return schedulerConfigArgs, shutdownFunc
|
||||
}
|
||||
|
@ -57,9 +57,9 @@ func StartApiserver() (string, ShutdownFunc) {
|
||||
}
|
||||
|
||||
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
|
||||
// and event broadcaster. It returns a handle to the configurator for the running scheduler
|
||||
// and event broadcaster. It returns a handle to the configurator args for the running scheduler
|
||||
// and the shutdown function to stop it.
|
||||
func StartScheduler(clientSet clientset.Interface) (factory.Configurator, ShutdownFunc) {
|
||||
func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs, ShutdownFunc) {
|
||||
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
||||
stopCh := make(chan struct{})
|
||||
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
|
||||
@ -67,9 +67,10 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
|
||||
|
||||
evtBroadcaster.StartRecordingToSink(stopCh)
|
||||
|
||||
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory, stopCh)
|
||||
configuratorArgs := createSchedulerConfiguratorArgs(clientSet, informerFactory, stopCh)
|
||||
configurator := factory.NewConfigFactory(configuratorArgs)
|
||||
|
||||
config, err := schedulerConfigurator.CreateFromConfig(schedulerapi.Policy{})
|
||||
config, err := configurator.CreateFromConfig(schedulerapi.Policy{})
|
||||
if err != nil {
|
||||
klog.Fatalf("Error creating scheduler: %v", err)
|
||||
}
|
||||
@ -95,18 +96,17 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
|
||||
close(stopCh)
|
||||
klog.Infof("destroyed scheduler")
|
||||
}
|
||||
return schedulerConfigurator, shutdownFunc
|
||||
return configuratorArgs, shutdownFunc
|
||||
}
|
||||
|
||||
// createSchedulerConfigurator create a configurator for scheduler with given informer factory and default name.
|
||||
func createSchedulerConfigurator(
|
||||
// createSchedulerConfigurator create a configurator for scheduler with given informer factory.
|
||||
func createSchedulerConfiguratorArgs(
|
||||
clientSet clientset.Interface,
|
||||
informerFactory informers.SharedInformerFactory,
|
||||
stopCh <-chan struct{},
|
||||
) factory.Configurator {
|
||||
) *factory.ConfigFactoryArgs {
|
||||
|
||||
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: v1.DefaultSchedulerName,
|
||||
return &factory.ConfigFactoryArgs{
|
||||
Client: clientSet,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: informerFactory.Core().V1().Pods(),
|
||||
@ -123,5 +123,5 @@ func createSchedulerConfigurator(
|
||||
DisablePreemption: false,
|
||||
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
StopCh: stopCh,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user