mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Remove Configurator interface
The Configurator has been used as a holder for listers that tests need, which is not its purpose. By making the tests obtain listers from more appropriate places, such as informers, there is no need for various accessors to the Configurator. Also, FakeConfigurator is not being used anymore, so there's no need for an interface instead of a plain pointer. Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
parent
bb496d626b
commit
d935b1054c
@ -5,14 +5,11 @@ go_library(
|
|||||||
srcs = [
|
srcs = [
|
||||||
"eventhandlers.go",
|
"eventhandlers.go",
|
||||||
"scheduler.go",
|
"scheduler.go",
|
||||||
"testutil.go",
|
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/scheduler",
|
importpath = "k8s.io/kubernetes/pkg/scheduler",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/features:go_default_library",
|
"//pkg/features:go_default_library",
|
||||||
"//pkg/scheduler/algorithm:go_default_library",
|
|
||||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
|
||||||
"//pkg/scheduler/api:go_default_library",
|
"//pkg/scheduler/api:go_default_library",
|
||||||
"//pkg/scheduler/api/latest:go_default_library",
|
"//pkg/scheduler/api/latest:go_default_library",
|
||||||
"//pkg/scheduler/apis/config:go_default_library",
|
"//pkg/scheduler/apis/config:go_default_library",
|
||||||
@ -20,7 +17,6 @@ go_library(
|
|||||||
"//pkg/scheduler/factory:go_default_library",
|
"//pkg/scheduler/factory:go_default_library",
|
||||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
|
||||||
"//pkg/scheduler/metrics:go_default_library",
|
"//pkg/scheduler/metrics:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||||
@ -28,7 +24,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library",
|
||||||
@ -37,7 +32,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
|
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
@ -1097,7 +1097,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||||
|
|
||||||
if _, err := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
if _, err := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||||
SchedulerName: "some-scheduler-name",
|
|
||||||
Client: client,
|
Client: client,
|
||||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||||
PodInformer: informerFactory.Core().V1().Pods(),
|
PodInformer: informerFactory.Core().V1().Pods(),
|
||||||
|
@ -27,8 +27,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
@ -22,12 +22,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
@ -139,32 +137,8 @@ type PodPreemptor interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Configurator defines I/O, caching, and other functionality needed to
|
// Configurator defines I/O, caching, and other functionality needed to
|
||||||
// construct a new scheduler. An implementation of this can be seen in
|
// construct a new scheduler.
|
||||||
// factory.go.
|
type Configurator struct {
|
||||||
type Configurator interface {
|
|
||||||
// Exposed for testing
|
|
||||||
GetHardPodAffinitySymmetricWeight() int32
|
|
||||||
|
|
||||||
// Predicate related accessors to be exposed for use by k8s.io/autoscaler/cluster-autoscaler
|
|
||||||
GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error)
|
|
||||||
GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error)
|
|
||||||
|
|
||||||
// Exposed for testing
|
|
||||||
GetClient() clientset.Interface
|
|
||||||
|
|
||||||
// TODO(#80216): Remove GetScheduledPodLister from the interface.
|
|
||||||
// Exposed for testing
|
|
||||||
GetScheduledPodLister() corelisters.PodLister
|
|
||||||
|
|
||||||
Create() (*Config, error)
|
|
||||||
CreateFromProvider(providerName string) (*Config, error)
|
|
||||||
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
|
|
||||||
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// configFactory is the default implementation of the scheduler.Configurator interface.
|
|
||||||
// TODO(#80216): Remove pod lister.
|
|
||||||
type configFactory struct {
|
|
||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
// a means to list all known scheduled pods.
|
// a means to list all known scheduled pods.
|
||||||
scheduledPodLister corelisters.PodLister
|
scheduledPodLister corelisters.PodLister
|
||||||
@ -196,10 +170,6 @@ type configFactory struct {
|
|||||||
|
|
||||||
schedulerCache internalcache.Cache
|
schedulerCache internalcache.Cache
|
||||||
|
|
||||||
// SchedulerName of a scheduler is used to select which pods will be
|
|
||||||
// processed by this scheduler, based on pods's "spec.schedulerName".
|
|
||||||
schedulerName string
|
|
||||||
|
|
||||||
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
|
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
|
||||||
// corresponding to every RequiredDuringScheduling affinity rule.
|
// corresponding to every RequiredDuringScheduling affinity rule.
|
||||||
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
|
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
|
||||||
@ -226,7 +196,6 @@ type configFactory struct {
|
|||||||
|
|
||||||
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
|
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
|
||||||
type ConfigFactoryArgs struct {
|
type ConfigFactoryArgs struct {
|
||||||
SchedulerName string
|
|
||||||
Client clientset.Interface
|
Client clientset.Interface
|
||||||
NodeInformer coreinformers.NodeInformer
|
NodeInformer coreinformers.NodeInformer
|
||||||
PodInformer coreinformers.PodInformer
|
PodInformer coreinformers.PodInformer
|
||||||
@ -251,7 +220,7 @@ type ConfigFactoryArgs struct {
|
|||||||
|
|
||||||
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
|
// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
|
||||||
// return the interface.
|
// return the interface.
|
||||||
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
|
||||||
stopEverything := args.StopCh
|
stopEverything := args.StopCh
|
||||||
if stopEverything == nil {
|
if stopEverything == nil {
|
||||||
stopEverything = wait.NeverStop
|
stopEverything = wait.NeverStop
|
||||||
@ -274,7 +243,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
|||||||
csiNodeLister = args.CSINodeInformer.Lister()
|
csiNodeLister = args.CSINodeInformer.Lister()
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &configFactory{
|
c := &Configurator{
|
||||||
client: args.Client,
|
client: args.Client,
|
||||||
podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
|
podQueue: internalqueue.NewSchedulingQueue(stopEverything, framework),
|
||||||
pVLister: args.PvInformer.Lister(),
|
pVLister: args.PvInformer.Lister(),
|
||||||
@ -289,7 +258,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
|||||||
framework: framework,
|
framework: framework,
|
||||||
schedulerCache: schedulerCache,
|
schedulerCache: schedulerCache,
|
||||||
StopEverything: stopEverything,
|
StopEverything: stopEverything,
|
||||||
schedulerName: args.SchedulerName,
|
|
||||||
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
|
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
|
||||||
disablePreemption: args.DisablePreemption,
|
disablePreemption: args.DisablePreemption,
|
||||||
percentageOfNodesToScore: args.PercentageOfNodesToScore,
|
percentageOfNodesToScore: args.PercentageOfNodesToScore,
|
||||||
@ -299,9 +267,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
|||||||
// Setup volume binder
|
// Setup volume binder
|
||||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
|
||||||
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
|
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
|
||||||
// ScheduledPodLister is something we provide to plug-in functions that
|
|
||||||
// they may need to call.
|
|
||||||
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
|
|
||||||
|
|
||||||
// Setup cache debugger
|
// Setup cache debugger
|
||||||
debugger := cachedebugger.New(
|
debugger := cachedebugger.New(
|
||||||
@ -319,31 +284,18 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) GetHardPodAffinitySymmetricWeight() int32 {
|
// GetHardPodAffinitySymmetricWeight is exposed for testing.
|
||||||
|
func (c *Configurator) GetHardPodAffinitySymmetricWeight() int32 {
|
||||||
return c.hardPodAffinitySymmetricWeight
|
return c.hardPodAffinitySymmetricWeight
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) GetSchedulerName() string {
|
|
||||||
return c.schedulerName
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests.
|
|
||||||
func (c *configFactory) GetClient() clientset.Interface {
|
|
||||||
return c.client
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetScheduledPodLister provides a pod lister, mostly internal use, but may also be called by mock-tests.
|
|
||||||
func (c *configFactory) GetScheduledPodLister() corelisters.PodLister {
|
|
||||||
return c.scheduledPodLister
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create creates a scheduler with the default algorithm provider.
|
// Create creates a scheduler with the default algorithm provider.
|
||||||
func (c *configFactory) Create() (*Config, error) {
|
func (c *Configurator) Create() (*Config, error) {
|
||||||
return c.CreateFromProvider(DefaultProvider)
|
return c.CreateFromProvider(DefaultProvider)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a scheduler from the name of a registered algorithm provider.
|
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
|
||||||
func (c *configFactory) CreateFromProvider(providerName string) (*Config, error) {
|
func (c *Configurator) CreateFromProvider(providerName string) (*Config, error) {
|
||||||
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
|
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
|
||||||
provider, err := GetAlgorithmProvider(providerName)
|
provider, err := GetAlgorithmProvider(providerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -352,8 +304,8 @@ func (c *configFactory) CreateFromProvider(providerName string) (*Config, error)
|
|||||||
return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
|
return c.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a scheduler from the configuration file
|
// CreateFromConfig creates a scheduler from the configuration file
|
||||||
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
|
func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
|
||||||
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
|
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
|
||||||
|
|
||||||
// validate the policy configuration
|
// validate the policy configuration
|
||||||
@ -430,8 +382,8 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, e
|
|||||||
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
|
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
|
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||||
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
|
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
|
||||||
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
|
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
|
||||||
|
|
||||||
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
|
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
|
||||||
@ -443,12 +395,12 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
|
priorityConfigs, err := c.getPriorityFunctionConfigs(priorityKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
|
priorityMetaProducer, err := c.getPriorityMetadataProducer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -514,7 +466,7 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
|
func (c *Configurator) getPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
|
||||||
pluginArgs, err := c.getPluginArgs()
|
pluginArgs, err := c.getPluginArgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -523,7 +475,7 @@ func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]
|
|||||||
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
|
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) GetPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
|
func (c *Configurator) getPriorityMetadataProducer() (priorities.PriorityMetadataProducer, error) {
|
||||||
pluginArgs, err := c.getPluginArgs()
|
pluginArgs, err := c.getPluginArgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -532,7 +484,9 @@ func (c *configFactory) GetPriorityMetadataProducer() (priorities.PriorityMetada
|
|||||||
return getPriorityMetadataProducer(*pluginArgs)
|
return getPriorityMetadataProducer(*pluginArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
|
// GetPredicateMetadataProducer returns a function to build Predicate Metadata.
|
||||||
|
// It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler.
|
||||||
|
func (c *Configurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
|
||||||
pluginArgs, err := c.getPluginArgs()
|
pluginArgs, err := c.getPluginArgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -540,7 +494,9 @@ func (c *configFactory) GetPredicateMetadataProducer() (predicates.PredicateMeta
|
|||||||
return getPredicateMetadataProducer(*pluginArgs)
|
return getPredicateMetadataProducer(*pluginArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
// GetPredicates returns the predicate functions.
|
||||||
|
// It is used by the scheduler and other components, such as k8s.io/autoscaler/cluster-autoscaler.
|
||||||
|
func (c *Configurator) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
||||||
pluginArgs, err := c.getPluginArgs()
|
pluginArgs, err := c.getPluginArgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -549,7 +505,7 @@ func (c *configFactory) GetPredicates(predicateKeys sets.String) (map[string]pre
|
|||||||
return getFitPredicateFunctions(predicateKeys, *pluginArgs)
|
return getFitPredicateFunctions(predicateKeys, *pluginArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
func (c *Configurator) getPluginArgs() (*PluginFactoryArgs, error) {
|
||||||
return &PluginFactoryArgs{
|
return &PluginFactoryArgs{
|
||||||
PodLister: c.schedulerCache,
|
PodLister: c.schedulerCache,
|
||||||
ServiceLister: c.serviceLister,
|
ServiceLister: c.serviceLister,
|
||||||
@ -567,65 +523,6 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// assignedPodLister filters the pods returned from a PodLister to
|
|
||||||
// only include those that have a node name set.
|
|
||||||
type assignedPodLister struct {
|
|
||||||
corelisters.PodLister
|
|
||||||
}
|
|
||||||
|
|
||||||
// List lists all Pods in the indexer for a given namespace.
|
|
||||||
func (l assignedPodLister) List(selector labels.Selector) ([]*v1.Pod, error) {
|
|
||||||
list, err := l.PodLister.List(selector)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
filtered := make([]*v1.Pod, 0, len(list))
|
|
||||||
for _, pod := range list {
|
|
||||||
if len(pod.Spec.NodeName) > 0 {
|
|
||||||
filtered = append(filtered, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return filtered, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// List lists all Pods in the indexer for a given namespace.
|
|
||||||
func (l assignedPodLister) Pods(namespace string) corelisters.PodNamespaceLister {
|
|
||||||
return assignedPodNamespaceLister{l.PodLister.Pods(namespace)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// assignedPodNamespaceLister filters the pods returned from a PodNamespaceLister to
|
|
||||||
// only include those that have a node name set.
|
|
||||||
type assignedPodNamespaceLister struct {
|
|
||||||
corelisters.PodNamespaceLister
|
|
||||||
}
|
|
||||||
|
|
||||||
// List lists all Pods in the indexer for a given namespace.
|
|
||||||
func (l assignedPodNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
|
|
||||||
list, err := l.PodNamespaceLister.List(selector)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
filtered := make([]*v1.Pod, 0, len(list))
|
|
||||||
for _, pod := range list {
|
|
||||||
if len(pod.Spec.NodeName) > 0 {
|
|
||||||
filtered = append(filtered, pod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return filtered, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves the Pod from the indexer for a given namespace and name.
|
|
||||||
func (l assignedPodNamespaceLister) Get(name string) (*v1.Pod, error) {
|
|
||||||
pod, err := l.PodNamespaceLister.Get(name)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(pod.Spec.NodeName) > 0 {
|
|
||||||
return pod, nil
|
|
||||||
}
|
|
||||||
return nil, errors.NewNotFound(schema.GroupResource{Resource: string(v1.ResourcePods)}, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
type podInformer struct {
|
type podInformer struct {
|
||||||
informer cache.SharedIndexInformer
|
informer cache.SharedIndexInformer
|
||||||
}
|
}
|
||||||
|
@ -476,10 +476,9 @@ func TestInvalidFactoryArgs(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) Configurator {
|
func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||||
return NewConfigFactory(&ConfigFactoryArgs{
|
return NewConfigFactory(&ConfigFactoryArgs{
|
||||||
v1.DefaultSchedulerName,
|
|
||||||
client,
|
client,
|
||||||
informerFactory.Core().V1().Nodes(),
|
informerFactory.Core().V1().Nodes(),
|
||||||
informerFactory.Core().V1().Pods(),
|
informerFactory.Core().V1().Pods(),
|
||||||
@ -608,7 +607,7 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
f := &configFactory{}
|
f := &Configurator{}
|
||||||
binderFunc := getBinderFunc(f.client, extenders)
|
binderFunc := getBinderFunc(f.client, extenders)
|
||||||
binder := binderFunc(pod)
|
binder := binderFunc(pod)
|
||||||
|
|
||||||
|
@ -144,7 +144,6 @@ func New(client clientset.Interface,
|
|||||||
}
|
}
|
||||||
// Set up the configurator which can create schedulers from configs.
|
// Set up the configurator which can create schedulers from configs.
|
||||||
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||||
SchedulerName: options.schedulerName,
|
|
||||||
Client: client,
|
Client: client,
|
||||||
NodeInformer: nodeInformer,
|
NodeInformer: nodeInformer,
|
||||||
PodInformer: podInformer,
|
PodInformer: podInformer,
|
||||||
|
@ -57,12 +57,14 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EmptyFramework is an empty framework used in tests.
|
var (
|
||||||
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
|
emptyPluginRegistry = framework.Registry{}
|
||||||
var EmptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, EmptyPluginConfig)
|
// emptyPluginConfig is an empty plugin config used in tests.
|
||||||
|
emptyPluginConfig []kubeschedulerconfig.PluginConfig
|
||||||
// EmptyPluginConfig is an empty plugin config used in tests.
|
// emptyFramework is an empty framework used in tests.
|
||||||
var EmptyPluginConfig = []kubeschedulerconfig.PluginConfig{}
|
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
|
||||||
|
emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, emptyPluginConfig)
|
||||||
|
)
|
||||||
|
|
||||||
type fakeBinder struct {
|
type fakeBinder struct {
|
||||||
b func(binding *v1.Binding) error
|
b func(binding *v1.Binding) error
|
||||||
@ -195,9 +197,9 @@ func TestSchedulerCreation(t *testing.T) {
|
|||||||
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
|
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
|
||||||
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
|
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
|
||||||
stopCh,
|
stopCh,
|
||||||
EmptyPluginRegistry,
|
emptyPluginRegistry,
|
||||||
nil,
|
nil,
|
||||||
EmptyPluginConfig,
|
emptyPluginConfig,
|
||||||
WithBindTimeoutSeconds(defaultBindTimeout))
|
WithBindTimeoutSeconds(defaultBindTimeout))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -299,7 +301,7 @@ func TestScheduler(t *testing.T) {
|
|||||||
NextPod: func() *v1.Pod {
|
NextPod: func() *v1.Pod {
|
||||||
return item.sendPod
|
return item.sendPod
|
||||||
},
|
},
|
||||||
Framework: EmptyFramework,
|
Framework: emptyFramework,
|
||||||
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
|
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
|
||||||
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||||
})
|
})
|
||||||
@ -645,7 +647,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
predicates.EmptyPredicateMetadataProducer,
|
predicates.EmptyPredicateMetadataProducer,
|
||||||
[]priorities.PriorityConfig{},
|
[]priorities.PriorityConfig{},
|
||||||
priorities.EmptyPriorityMetadataProducer,
|
priorities.EmptyPriorityMetadataProducer,
|
||||||
EmptyFramework,
|
emptyFramework,
|
||||||
[]algorithm.SchedulerExtender{},
|
[]algorithm.SchedulerExtender{},
|
||||||
nil,
|
nil,
|
||||||
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
|
||||||
@ -677,7 +679,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
Recorder: &events.FakeRecorder{},
|
Recorder: &events.FakeRecorder{},
|
||||||
PodConditionUpdater: fakePodConditionUpdater{},
|
PodConditionUpdater: fakePodConditionUpdater{},
|
||||||
PodPreemptor: fakePodPreemptor{},
|
PodPreemptor: fakePodPreemptor{},
|
||||||
Framework: EmptyFramework,
|
Framework: emptyFramework,
|
||||||
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -691,7 +693,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
||||||
framework, _ := framework.NewFramework(EmptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{})
|
framework, _ := framework.NewFramework(emptyPluginRegistry, nil, []kubeschedulerconfig.PluginConfig{})
|
||||||
algo := core.NewGenericScheduler(
|
algo := core.NewGenericScheduler(
|
||||||
scache,
|
scache,
|
||||||
internalqueue.NewSchedulingQueue(nil, nil),
|
internalqueue.NewSchedulingQueue(nil, nil),
|
||||||
|
@ -1,90 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package scheduler
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
|
||||||
corelisters "k8s.io/client-go/listers/core/v1"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
|
||||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
|
||||||
)
|
|
||||||
|
|
||||||
// FakeConfigurator is an implementation for test.
|
|
||||||
type FakeConfigurator struct {
|
|
||||||
Config *factory.Config
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPredicateMetadataProducer is not implemented yet.
|
|
||||||
func (fc *FakeConfigurator) GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) {
|
|
||||||
return nil, fmt.Errorf("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPredicates is not implemented yet.
|
|
||||||
func (fc *FakeConfigurator) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) {
|
|
||||||
return nil, fmt.Errorf("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetHardPodAffinitySymmetricWeight is not implemented yet.
|
|
||||||
func (fc *FakeConfigurator) GetHardPodAffinitySymmetricWeight() int32 {
|
|
||||||
panic("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
// MakeDefaultErrorFunc is not implemented yet.
|
|
||||||
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetClient is not implemented yet.
|
|
||||||
func (fc *FakeConfigurator) GetClient() clientset.Interface {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetScheduledPodLister is not implemented yet.
|
|
||||||
func (fc *FakeConfigurator) GetScheduledPodLister() corelisters.PodLister {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create returns FakeConfigurator.Config
|
|
||||||
func (fc *FakeConfigurator) Create() (*factory.Config, error) {
|
|
||||||
return fc.Config, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateFromProvider returns FakeConfigurator.Config
|
|
||||||
func (fc *FakeConfigurator) CreateFromProvider(providerName string) (*factory.Config, error) {
|
|
||||||
return fc.Config, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateFromConfig returns FakeConfigurator.Config
|
|
||||||
func (fc *FakeConfigurator) CreateFromConfig(policy schedulerapi.Policy) (*factory.Config, error) {
|
|
||||||
return fc.Config, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateFromKeys returns FakeConfigurator.Config
|
|
||||||
func (fc *FakeConfigurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*factory.Config, error) {
|
|
||||||
return fc.Config, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EmptyPluginRegistry is an empty plugin registry used in tests.
|
|
||||||
var EmptyPluginRegistry = framework.Registry{}
|
|
@ -84,7 +84,6 @@ func createConfiguratorArgsWithPodInformer(
|
|||||||
stopCh <-chan struct{},
|
stopCh <-chan struct{},
|
||||||
) *factory.ConfigFactoryArgs {
|
) *factory.ConfigFactoryArgs {
|
||||||
return &factory.ConfigFactoryArgs{
|
return &factory.ConfigFactoryArgs{
|
||||||
SchedulerName: schedulerName,
|
|
||||||
Client: clientSet,
|
Client: clientSet,
|
||||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||||
PodInformer: podInformer,
|
PodInformer: podInformer,
|
||||||
|
@ -42,6 +42,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
|
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
|
||||||
"//test/integration/framework:go_default_library",
|
"//test/integration/framework:go_default_library",
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
storagev1beta1 "k8s.io/api/storage/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
"k8s.io/csi-translation-lib/plugins"
|
"k8s.io/csi-translation-lib/plugins"
|
||||||
@ -359,9 +358,9 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
|||||||
if b.N < minPods {
|
if b.N < minPods {
|
||||||
b.N = minPods
|
b.N = minPods
|
||||||
}
|
}
|
||||||
schedulerConfigFactory, finalFunc := mustSetupScheduler()
|
schedulerConfigArgs, finalFunc := mustSetupScheduler()
|
||||||
defer finalFunc()
|
defer finalFunc()
|
||||||
c := schedulerConfigFactory.GetClient()
|
c := schedulerConfigArgs.Client
|
||||||
|
|
||||||
nodePreparer := framework.NewIntegrationTestNodePreparer(
|
nodePreparer := framework.NewIntegrationTestNodePreparer(
|
||||||
c,
|
c,
|
||||||
@ -378,8 +377,9 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
|||||||
podCreator := testutils.NewTestPodCreator(c, config)
|
podCreator := testutils.NewTestPodCreator(c, config)
|
||||||
podCreator.CreatePods()
|
podCreator.CreatePods()
|
||||||
|
|
||||||
|
podLister := schedulerConfigArgs.PodInformer.Lister()
|
||||||
for {
|
for {
|
||||||
scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything())
|
scheduled, err := getScheduledPods(podLister)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("%v", err)
|
klog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
@ -397,7 +397,7 @@ func benchmarkScheduling(numNodes, numExistingPods, minPods int,
|
|||||||
for {
|
for {
|
||||||
// This can potentially affect performance of scheduler, since List() is done under mutex.
|
// This can potentially affect performance of scheduler, since List() is done under mutex.
|
||||||
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
||||||
scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything())
|
scheduled, err := getScheduledPods(podLister)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("%v", err)
|
klog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
|
@ -18,17 +18,19 @@ package benchmark
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
"k8s.io/klog"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
|
||||||
testutils "k8s.io/kubernetes/test/utils"
|
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
listers "k8s.io/client-go/listers/core/v1"
|
||||||
|
"k8s.io/klog"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||||
|
testutils "k8s.io/kubernetes/test/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -101,22 +103,22 @@ func TestSchedule100Node3KPods(t *testing.T) {
|
|||||||
|
|
||||||
// testConfig contains the some input parameters needed for running test-suite
|
// testConfig contains the some input parameters needed for running test-suite
|
||||||
type testConfig struct {
|
type testConfig struct {
|
||||||
numPods int
|
numPods int
|
||||||
numNodes int
|
numNodes int
|
||||||
mutatedNodeTemplate *v1.Node
|
mutatedNodeTemplate *v1.Node
|
||||||
mutatedPodTemplate *v1.Pod
|
mutatedPodTemplate *v1.Pod
|
||||||
schedulerSupportFunctions factory.Configurator
|
schedulerSupport *factory.ConfigFactoryArgs
|
||||||
destroyFunc func()
|
destroyFunc func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBaseConfig returns baseConfig after initializing number of nodes and pods.
|
// getBaseConfig returns baseConfig after initializing number of nodes and pods.
|
||||||
func getBaseConfig(nodes int, pods int) *testConfig {
|
func getBaseConfig(nodes int, pods int) *testConfig {
|
||||||
schedulerConfigFactory, destroyFunc := mustSetupScheduler()
|
schedulerConfigArgs, destroyFunc := mustSetupScheduler()
|
||||||
return &testConfig{
|
return &testConfig{
|
||||||
schedulerSupportFunctions: schedulerConfigFactory,
|
schedulerSupport: schedulerConfigArgs,
|
||||||
destroyFunc: destroyFunc,
|
destroyFunc: destroyFunc,
|
||||||
numNodes: nodes,
|
numNodes: nodes,
|
||||||
numPods: pods,
|
numPods: pods,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,10 +134,11 @@ func schedulePods(config *testConfig) int32 {
|
|||||||
// We are interested in low scheduling rates (i.e. qps=2),
|
// We are interested in low scheduling rates (i.e. qps=2),
|
||||||
minQPS := int32(math.MaxInt32)
|
minQPS := int32(math.MaxInt32)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
podLister := config.schedulerSupport.PodInformer.Lister()
|
||||||
// Bake in time for the first pod scheduling event.
|
// Bake in time for the first pod scheduling event.
|
||||||
for {
|
for {
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything())
|
scheduled, err := getScheduledPods(podLister)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("%v", err)
|
klog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
@ -153,7 +156,7 @@ func schedulePods(config *testConfig) int32 {
|
|||||||
// This can potentially affect performance of scheduler, since List() is done under mutex.
|
// This can potentially affect performance of scheduler, since List() is done under mutex.
|
||||||
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
|
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
|
||||||
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
// TODO: Setup watch on apiserver and wait until all pods scheduled.
|
||||||
scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything())
|
scheduled, err := getScheduledPods(podLister)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("%v", err)
|
klog.Fatalf("%v", err)
|
||||||
}
|
}
|
||||||
@ -183,6 +186,20 @@ func schedulePods(config *testConfig) int32 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getScheduledPods(lister listers.PodLister) ([]*v1.Pod, error) {
|
||||||
|
all, err := lister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
scheduled := make([]*v1.Pod, 0, len(all))
|
||||||
|
for _, pod := range all {
|
||||||
|
if len(pod.Spec.NodeName) > 0 {
|
||||||
|
scheduled = append(scheduled, pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return scheduled, nil
|
||||||
|
}
|
||||||
|
|
||||||
// mutateNodeTemplate returns the modified node needed for creation of nodes.
|
// mutateNodeTemplate returns the modified node needed for creation of nodes.
|
||||||
func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) {
|
func (na nodeAffinity) mutateNodeTemplate(node *v1.Node) {
|
||||||
labels := make(map[string]string)
|
labels := make(map[string]string)
|
||||||
@ -220,19 +237,17 @@ func (na nodeAffinity) mutatePodTemplate(pod *v1.Pod) {
|
|||||||
// generateNodes generates nodes to be used for scheduling.
|
// generateNodes generates nodes to be used for scheduling.
|
||||||
func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) {
|
func (inputConfig *schedulerPerfConfig) generateNodes(config *testConfig) {
|
||||||
for i := 0; i < inputConfig.NodeCount; i++ {
|
for i := 0; i < inputConfig.NodeCount; i++ {
|
||||||
config.schedulerSupportFunctions.GetClient().CoreV1().Nodes().Create(config.mutatedNodeTemplate)
|
config.schedulerSupport.Client.CoreV1().Nodes().Create(config.mutatedNodeTemplate)
|
||||||
|
|
||||||
}
|
}
|
||||||
for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ {
|
for i := 0; i < config.numNodes-inputConfig.NodeCount; i++ {
|
||||||
config.schedulerSupportFunctions.GetClient().CoreV1().Nodes().Create(baseNodeTemplate)
|
config.schedulerSupport.Client.CoreV1().Nodes().Create(baseNodeTemplate)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// generatePods generates pods to be used for scheduling.
|
// generatePods generates pods to be used for scheduling.
|
||||||
func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) {
|
func (inputConfig *schedulerPerfConfig) generatePods(config *testConfig) {
|
||||||
testutils.CreatePod(config.schedulerSupportFunctions.GetClient(), "sample", inputConfig.PodCount, config.mutatedPodTemplate)
|
testutils.CreatePod(config.schedulerSupport.Client, "sample", inputConfig.PodCount, config.mutatedPodTemplate)
|
||||||
testutils.CreatePod(config.schedulerSupportFunctions.GetClient(), "sample", config.numPods-inputConfig.PodCount, basePodTemplate)
|
testutils.CreatePod(config.schedulerSupport.Client, "sample", config.numPods-inputConfig.PodCount, basePodTemplate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects.
|
// generatePodAndNodeTopology is the wrapper function for modifying both pods and node objects.
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
// remove resources after finished.
|
// remove resources after finished.
|
||||||
// Notes on rate limiter:
|
// Notes on rate limiter:
|
||||||
// - client rate limit is set to 5000.
|
// - client rate limit is set to 5000.
|
||||||
func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) {
|
func mustSetupScheduler() (*factory.ConfigFactoryArgs, util.ShutdownFunc) {
|
||||||
apiURL, apiShutdown := util.StartApiserver()
|
apiURL, apiShutdown := util.StartApiserver()
|
||||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
|
clientSet := clientset.NewForConfigOrDie(&restclient.Config{
|
||||||
Host: apiURL,
|
Host: apiURL,
|
||||||
@ -39,11 +39,11 @@ func mustSetupScheduler() (factory.Configurator, util.ShutdownFunc) {
|
|||||||
QPS: 5000.0,
|
QPS: 5000.0,
|
||||||
Burst: 5000,
|
Burst: 5000,
|
||||||
})
|
})
|
||||||
schedulerConfig, schedulerShutdown := util.StartScheduler(clientSet)
|
schedulerConfigArgs, schedulerShutdown := util.StartScheduler(clientSet)
|
||||||
|
|
||||||
shutdownFunc := func() {
|
shutdownFunc := func() {
|
||||||
schedulerShutdown()
|
schedulerShutdown()
|
||||||
apiShutdown()
|
apiShutdown()
|
||||||
}
|
}
|
||||||
return schedulerConfig, shutdownFunc
|
return schedulerConfigArgs, shutdownFunc
|
||||||
}
|
}
|
||||||
|
@ -57,9 +57,9 @@ func StartApiserver() (string, ShutdownFunc) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
|
// StartScheduler configures and starts a scheduler given a handle to the clientSet interface
|
||||||
// and event broadcaster. It returns a handle to the configurator for the running scheduler
|
// and event broadcaster. It returns a handle to the configurator args for the running scheduler
|
||||||
// and the shutdown function to stop it.
|
// and the shutdown function to stop it.
|
||||||
func StartScheduler(clientSet clientset.Interface) (factory.Configurator, ShutdownFunc) {
|
func StartScheduler(clientSet clientset.Interface) (*factory.ConfigFactoryArgs, ShutdownFunc) {
|
||||||
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
|
evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
|
||||||
@ -67,9 +67,10 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
|
|||||||
|
|
||||||
evtBroadcaster.StartRecordingToSink(stopCh)
|
evtBroadcaster.StartRecordingToSink(stopCh)
|
||||||
|
|
||||||
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory, stopCh)
|
configuratorArgs := createSchedulerConfiguratorArgs(clientSet, informerFactory, stopCh)
|
||||||
|
configurator := factory.NewConfigFactory(configuratorArgs)
|
||||||
|
|
||||||
config, err := schedulerConfigurator.CreateFromConfig(schedulerapi.Policy{})
|
config, err := configurator.CreateFromConfig(schedulerapi.Policy{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("Error creating scheduler: %v", err)
|
klog.Fatalf("Error creating scheduler: %v", err)
|
||||||
}
|
}
|
||||||
@ -95,18 +96,17 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo
|
|||||||
close(stopCh)
|
close(stopCh)
|
||||||
klog.Infof("destroyed scheduler")
|
klog.Infof("destroyed scheduler")
|
||||||
}
|
}
|
||||||
return schedulerConfigurator, shutdownFunc
|
return configuratorArgs, shutdownFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// createSchedulerConfigurator create a configurator for scheduler with given informer factory and default name.
|
// createSchedulerConfigurator create a configurator for scheduler with given informer factory.
|
||||||
func createSchedulerConfigurator(
|
func createSchedulerConfiguratorArgs(
|
||||||
clientSet clientset.Interface,
|
clientSet clientset.Interface,
|
||||||
informerFactory informers.SharedInformerFactory,
|
informerFactory informers.SharedInformerFactory,
|
||||||
stopCh <-chan struct{},
|
stopCh <-chan struct{},
|
||||||
) factory.Configurator {
|
) *factory.ConfigFactoryArgs {
|
||||||
|
|
||||||
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
return &factory.ConfigFactoryArgs{
|
||||||
SchedulerName: v1.DefaultSchedulerName,
|
|
||||||
Client: clientSet,
|
Client: clientSet,
|
||||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||||
PodInformer: informerFactory.Core().V1().Pods(),
|
PodInformer: informerFactory.Core().V1().Pods(),
|
||||||
@ -123,5 +123,5 @@ func createSchedulerConfigurator(
|
|||||||
DisablePreemption: false,
|
DisablePreemption: false,
|
||||||
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||||
StopCh: stopCh,
|
StopCh: stopCh,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user