Switch scheduler to use generated listers/informers

Where possible, switch the scheduler to use generated listers and
informers. There are still some places where it probably makes more
sense to use one-off reflectors/informers (listing/watching just a
single node, listing/watching scheduled & unscheduled pods using a field
selector).
This commit is contained in:
Andy Goldstein
2017-02-21 15:00:57 -05:00
parent bb7cc74069
commit 9d8d6ad16c
24 changed files with 417 additions and 221 deletions

View File

@@ -18,6 +18,9 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
"//pkg/client/leaderelection/resourcelock:go_default_library",
"//pkg/util/configz:go_default_library",

View File

@@ -21,6 +21,8 @@ import (
"io/ioutil"
"os"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
"k8s.io/apimachinery/pkg/runtime"
@@ -69,8 +71,28 @@ func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) {
}
// createScheduler encapsulates the entire creation of a runnable scheduler.
func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight)
func createScheduler(
s *options.SchedulerServer,
kubecli *clientset.Clientset,
nodeInformer coreinformers.NodeInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
serviceInformer coreinformers.ServiceInformer,
recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
configurator := factory.NewConfigFactory(
s.SchedulerName,
kubecli,
nodeInformer,
pvInformer,
pvcInformer,
replicationControllerInformer,
replicaSetInformer,
serviceInformer,
s.HardPodAffinitySymmetricWeight,
)
// Rebuild the configurator with a default Create(...) method.
configurator = &schedulerConfigurator{

View File

@@ -14,6 +14,7 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/componentconfig/install:go_default_library",
"//pkg/apis/componentconfig/v1alpha1:go_default_library",
"//pkg/client/leaderelection:go_default_library",
"//pkg/features:go_default_library",

View File

@@ -27,6 +27,8 @@ import (
// add the kubernetes feature gates
_ "k8s.io/kubernetes/pkg/features"
// install the componentconfig api so we get its defaulting and conversion functions
_ "k8s.io/kubernetes/pkg/apis/componentconfig/install"
"github.com/spf13/pflag"
)

View File

@@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/server/healthz"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/util/configz"
@@ -67,24 +68,47 @@ func Run(s *options.SchedulerServer) error {
if err != nil {
return fmt.Errorf("unable to create kube client: %v", err)
}
recorder := createRecorder(kubecli, s)
sched, err := createScheduler(s, kubecli, recorder)
informerFactory := informers.NewSharedInformerFactory(kubecli, 0)
sched, err := createScheduler(
s,
kubecli,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
recorder,
)
if err != nil {
return fmt.Errorf("error creating scheduler: %v", err)
}
go startHTTP(s)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
run := func(_ <-chan struct{}) {
sched.Run()
select {}
}
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return fmt.Errorf("unable to get hostname: %v", err)
}
// TODO: enable other lock types
rl := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
@@ -97,6 +121,7 @@ func Run(s *options.SchedulerServer) error {
EventRecorder: recorder,
},
}
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
@@ -109,6 +134,7 @@ func Run(s *options.SchedulerServer) error {
},
},
})
panic("unreachable")
}

View File

@@ -40,6 +40,7 @@ go_library(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/metrics:go_default_library",

View File

@@ -19,12 +19,13 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",

View File

@@ -25,12 +25,13 @@ import (
"time"
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/legacylisters"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
@@ -58,13 +59,22 @@ type PersistentVolumeInfo interface {
GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error)
}
// CachedPersistentVolumeInfo implements PersistentVolumeInfo
type CachedPersistentVolumeInfo struct {
corelisters.PersistentVolumeLister
}
func (c *CachedPersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
return c.Get(pvID)
}
type PersistentVolumeClaimInfo interface {
GetPersistentVolumeClaimInfo(namespace string, name string) (*v1.PersistentVolumeClaim, error)
}
// CachedPersistentVolumeClaimInfo implements PersistentVolumeClaimInfo
type CachedPersistentVolumeClaimInfo struct {
*listers.StoreToPersistentVolumeClaimLister
corelisters.PersistentVolumeClaimLister
}
// GetPersistentVolumeClaimInfo fetches the claim in specified namespace with specified name
@@ -73,22 +83,22 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace
}
type CachedNodeInfo struct {
*listers.StoreToNodeLister
corelisters.NodeLister
}
// GetNodeInfo returns cached data for the node 'id'.
func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
node, exists, err := c.Get(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: id}})
node, err := c.Get(id)
if apierrors.IsNotFound(err) {
return nil, fmt.Errorf("node '%v' not found", id)
}
if err != nil {
return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err)
}
if !exists {
return nil, fmt.Errorf("node '%v' not found", id)
}
return node.(*v1.Node), nil
return node, nil
}
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file

View File

@@ -35,6 +35,7 @@ go_test(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",

View File

@@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
@@ -345,8 +346,19 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(client, 0)
if _, err := factory.NewConfigFactory(client, "some-scheduler-name", v1.DefaultHardPodAffinitySymmetricWeight).CreateFromConfig(policy); err != nil {
if _, err := factory.NewConfigFactory(
"some-scheduler-name",
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
).CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue
}

View File

@@ -17,10 +17,11 @@ go_library(
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//plugin/pkg/scheduler:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
@@ -56,6 +57,7 @@ go_test(
"//pkg/api/testing:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library",

View File

@@ -32,10 +32,11 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
"k8s.io/kubernetes/pkg/controller/informers"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@@ -58,32 +59,26 @@ type ConfigFactory struct {
// queue for pods that need scheduling
podQueue *cache.FIFO
// a means to list all known scheduled pods.
scheduledPodLister *listers.StoreToPodLister
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
podLister algorithm.PodLister
// a means to list all nodes
nodeLister *listers.StoreToNodeLister
nodeLister corelisters.NodeLister
// a means to list all PersistentVolumes
pVLister *listers.StoreToPVFetcher
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
pVCLister *listers.StoreToPersistentVolumeClaimLister
pVCLister corelisters.PersistentVolumeClaimLister
// a means to list all services
serviceLister *listers.StoreToServiceLister
serviceLister corelisters.ServiceLister
// a means to list all controllers
controllerLister *listers.StoreToReplicationControllerLister
controllerLister corelisters.ReplicationControllerLister
// a means to list all replicasets
replicaSetLister *listers.StoreToReplicaSetLister
replicaSetLister extensionslisters.ReplicaSetLister
// Close this to stop all reflectors
StopEverything chan struct{}
informerFactory informers.SharedInformerFactory
scheduledPodPopulator cache.Controller
nodePopulator cache.Controller
pvPopulator cache.Controller
pvcPopulator cache.Controller
servicePopulator cache.Controller
controllerPopulator cache.Controller
schedulerCache schedulercache.Cache
@@ -102,40 +97,41 @@ type ConfigFactory struct {
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int) scheduler.Configurator {
func NewConfigFactory(
schedulerName string,
client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer extensionsinformers.ReplicaSetInformer,
serviceInformer coreinformers.ServiceInformer,
hardPodAffinitySymmetricWeight int,
) scheduler.Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// TODO: pass this in as an argument...
informerFactory := informers.NewSharedInformerFactory(client, nil, 0)
pvcInformer := informerFactory.PersistentVolumeClaims()
c := &ConfigFactory{
client: client,
podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
scheduledPodLister: &listers.StoreToPodLister{},
informerFactory: informerFactory,
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeLister: &listers.StoreToNodeLister{},
pVLister: &listers.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
client: client,
podLister: schedulerCache,
podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
pVLister: pvInformer.Lister(),
pVCLister: pvcInformer.Lister(),
pvcPopulator: pvcInformer.Informer().GetController(),
serviceLister: &listers.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
controllerLister: &listers.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
replicaSetLister: &listers.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
serviceLister: serviceInformer.Lister(),
controllerLister: replicationControllerInformer.Lister(),
replicaSetLister: replicaSetInformer.Lister(),
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: schedulerName,
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
}
c.podLister = schedulerCache
// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
// ScheduledPodLister is something we provide to plug in functions that
// they may need to call.
c.scheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
var scheduledPodIndexer cache.Indexer
scheduledPodIndexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
c.createAssignedNonTerminatedPodLW(),
&v1.Pod{},
0,
@@ -146,48 +142,27 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
c.scheduledPodLister = corelisters.NewPodLister(scheduledPodIndexer)
c.nodeLister.Store, c.nodePopulator = cache.NewInformer(
c.createNodeLW(),
&v1.Node{},
0,
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
0,
)
c.nodeLister = nodeInformer.Lister()
// TODO(harryz) need to fill all the handlers here and below for equivalence cache
c.pVLister.Store, c.pvPopulator = cache.NewInformer(
c.createPersistentVolumeLW(),
&v1.PersistentVolume{},
0,
cache.ResourceEventHandlerFuncs{},
)
c.serviceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(),
&v1.Service{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
c.controllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
c.createControllerLW(),
&v1.ReplicationController{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return c
}
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetNodeStore() cache.Store {
return c.nodeLister.Store
func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister {
return c.nodeLister
}
func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int {
@@ -204,8 +179,8 @@ func (f *ConfigFactory) GetClient() clientset.Interface {
}
// GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetScheduledPodListerIndexer() cache.Indexer {
return c.scheduledPodLister.Indexer
func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
return c.scheduledPodLister
}
// TODO(harryz) need to update all the handlers here and below for equivalence cache
@@ -394,7 +369,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return &scheduler.Config{
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()),
NodeLister: &nodePredicateLister{f.nodeLister},
Algorithm: algo,
Binder: &binder{f.client},
PodConditionUpdater: &podConditionUpdater{f.client},
@@ -406,6 +381,14 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}, nil
}
type nodePredicateLister struct {
corelisters.NodeLister
}
func (n *nodePredicateLister) List() ([]*v1.Node, error) {
return n.ListWithPredicate(getNodeConditionPredicate())
}
func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
@@ -448,10 +431,10 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
ControllerLister: f.controllerLister,
ReplicaSetLister: f.replicaSetLister,
// All fit predicates only need to consider schedulable nodes.
NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.nodeLister},
PVInfo: f.pVLister,
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.pVCLister},
NodeLister: &nodePredicateLister{f.nodeLister},
NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister},
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister},
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister},
HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight,
}, nil
}
@@ -462,27 +445,6 @@ func (f *ConfigFactory) Run() {
// Begin populating scheduled pods.
go f.scheduledPodPopulator.Run(f.StopEverything)
// Begin populating nodes.
go f.nodePopulator.Run(f.StopEverything)
// Begin populating pv & pvc
go f.pvPopulator.Run(f.StopEverything)
go f.pvcPopulator.Run(f.StopEverything)
// Begin populating services
go f.servicePopulator.Run(f.StopEverything)
// Begin populating controllers
go f.controllerPopulator.Run(f.StopEverything)
// start informers...
f.informerFactory.Start(f.StopEverything)
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.replicaSetLister.Indexer, 0).RunUntil(f.StopEverything)
}
func (f *ConfigFactory) getNextPod() *v1.Pod {
@@ -499,7 +461,7 @@ func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool {
return f.schedulerName == pod.Spec.SchedulerName
}
func getNodeConditionPredicate() listers.NodeConditionPredicate {
func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
return func(node *v1.Node) bool {
for i := range node.Status.Conditions {
cond := &node.Status.Conditions[i]
@@ -542,39 +504,6 @@ func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatc
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", metav1.NamespaceAll, selector)
}
// createNodeLW returns a cache.ListWatch that gets all changes to nodes.
func (factory *ConfigFactory) createNodeLW() *cache.ListWatch {
// all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups
// the NodeCondition is used to filter out the nodes that are not ready or unschedulable
// the filtered list is used as the super set of nodes to consider for scheduling
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes.
func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumes", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims.
func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumeClaims", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "services", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to controllers.
func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "replicationControllers", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
// Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.client.Extensions().RESTClient(), "replicasets", metav1.NamespaceAll, fields.ParseSelectorOrDie(""))
}
func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) {
if err == core.ErrNoNodesAvailable {

View File

@@ -33,6 +33,7 @@ import (
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
@@ -49,7 +50,18 @@ func TestCreate(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
factory.Create()
}
@@ -67,7 +79,18 @@ func TestCreateFromConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@@ -108,7 +131,18 @@ func TestCreateFromEmptyConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
configData = []byte(`{}`)
if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil {
@@ -150,7 +184,19 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)
@@ -247,9 +293,30 @@ func TestResponsibleForPod(t *testing.T) {
defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
// factory of "default-scheduler"
factoryDefaultScheduler := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factoryDefaultScheduler := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
// factory of "foo-scheduler"
factoryFooScheduler := NewConfigFactory(client, "foo-scheduler", v1.DefaultHardPodAffinitySymmetricWeight)
factoryFooScheduler := NewConfigFactory(
"foo-scheduler",
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
v1.DefaultHardPodAffinitySymmetricWeight,
)
// scheduler annotations to be tested
schedulerFitsDefault := "default-scheduler"
schedulerFitsFoo := "foo-scheduler"
@@ -305,7 +372,18 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
// defer server.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
// factory of "default-scheduler"
factory := NewConfigFactory(client, v1.DefaultSchedulerName, -1)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
-1,
)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing")
@@ -337,7 +415,18 @@ func TestInvalidFactoryArgs(t *testing.T) {
}
for _, test := range testCases {
factory := NewConfigFactory(client, v1.DefaultSchedulerName, test.hardPodAffinitySymmetricWeight)
informerFactory := informers.NewSharedInformerFactory(client, 0)
factory := NewConfigFactory(
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
test.hardPodAffinitySymmetricWeight,
)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: %s, got nothing", test.expectErr)

View File

@@ -24,6 +24,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
@@ -69,9 +70,9 @@ type Configurator interface {
ResponsibleForPod(pod *v1.Pod) bool
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeStore() cache.Store
GetNodeLister() corelisters.NodeLister
GetClient() clientset.Interface
GetScheduledPodListerIndexer() cache.Indexer
GetScheduledPodLister() corelisters.PodLister
Run()
Create() (*Config, error)