From 9d8d6ad16cba1e0dea9e195f4a212acc8f7d8470 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Tue, 21 Feb 2017 15:00:57 -0500 Subject: [PATCH] Switch scheduler to use generated listers/informers Where possible, switch the scheduler to use generated listers and informers. There are still some places where it probably makes more sense to use one-off reflectors/informers (listing/watching just a single node, listing/watching scheduled & unscheduled pods using a field selector). --- .../service/servicecontroller_test.go | 14 +- pkg/kubelet/BUILD | 2 +- pkg/kubelet/kubelet.go | 18 +- pkg/kubelet/kubelet_test.go | 29 ++- plugin/cmd/kube-scheduler/app/BUILD | 3 + plugin/cmd/kube-scheduler/app/configurator.go | 26 ++- plugin/cmd/kube-scheduler/app/options/BUILD | 1 + .../cmd/kube-scheduler/app/options/options.go | 2 + plugin/cmd/kube-scheduler/app/server.go | 28 ++- plugin/pkg/scheduler/BUILD | 1 + .../pkg/scheduler/algorithm/predicates/BUILD | 3 +- .../algorithm/predicates/predicates.go | 28 ++- .../algorithmprovider/defaults/BUILD | 1 + .../defaults/compatibility_test.go | 14 +- plugin/pkg/scheduler/factory/BUILD | 8 +- plugin/pkg/scheduler/factory/factory.go | 179 ++++++------------ plugin/pkg/scheduler/factory/factory_test.go | 105 +++++++++- plugin/pkg/scheduler/scheduler.go | 5 +- test/integration/scheduler/extender_test.go | 18 +- test/integration/scheduler/scheduler_test.go | 110 ++++++++--- test/integration/scheduler_perf/BUILD | 2 + .../scheduler_perf/scheduler_bench_test.go | 11 +- .../scheduler_perf/scheduler_test.go | 11 +- test/integration/scheduler_perf/util.go | 19 +- 24 files changed, 417 insertions(+), 221 deletions(-) diff --git a/pkg/controller/service/servicecontroller_test.go b/pkg/controller/service/servicecontroller_test.go index 066f02841a0..14f89b05e2f 100644 --- a/pkg/controller/service/servicecontroller_test.go +++ b/pkg/controller/service/servicecontroller_test.go @@ -195,7 +195,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s0", "333", v1.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s0", "333", v1.ServiceTypeLoadBalancer), nodes}, + {Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, { @@ -206,9 +206,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s2", "666", v1.ServiceTypeLoadBalancer), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s0", "444", v1.ServiceTypeLoadBalancer), nodes}, - {newService("s1", "555", v1.ServiceTypeLoadBalancer), nodes}, - {newService("s2", "666", v1.ServiceTypeLoadBalancer), nodes}, + {Service: newService("s0", "444", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + {Service: newService("s1", "555", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + {Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, { @@ -220,8 +220,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { newService("s4", "123", v1.ServiceTypeClusterIP), }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s1", "888", v1.ServiceTypeLoadBalancer), nodes}, - {newService("s3", "999", v1.ServiceTypeLoadBalancer), nodes}, + {Service: newService("s1", "888", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, { @@ -231,7 +231,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { nil, }, expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {newService("s0", "234", v1.ServiceTypeLoadBalancer), nodes}, + {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, } diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 18f9967c978..5a3dcf9594e 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -43,7 +43,7 @@ go_library( "//pkg/apis/componentconfig/v1alpha1:go_default_library", "//pkg/capabilities:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/features:go_default_library", "//pkg/fieldpath:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 31fed0bb33e..5f1bd580d33 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -55,7 +55,7 @@ import ( "k8s.io/kubernetes/pkg/apis/componentconfig" componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/features" internalapi "k8s.io/kubernetes/pkg/kubelet/api" @@ -374,21 +374,21 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub dockerExecHandler = &dockertools.NativeExecHandler{} } - serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if kubeDeps.KubeClient != nil { serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything()) - cache.NewReflector(serviceLW, &v1.Service{}, serviceStore, 0).Run() + cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0).Run() } - serviceLister := &listers.StoreToServiceLister{Indexer: serviceStore} + serviceLister := corelisters.NewServiceLister(serviceIndexer) - nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if kubeDeps.KubeClient != nil { fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector() nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector) - cache.NewReflector(nodeLW, &v1.Node{}, nodeStore, 0).Run() + cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0).Run() } - nodeLister := &listers.StoreToNodeLister{Store: nodeStore} - nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister} + nodeLister := corelisters.NewNodeLister(nodeIndexer) + nodeInfo := &predicates.CachedNodeInfo{NodeLister: nodeLister} // TODO: get the real node object of ourself, // and use the real node name and UID. @@ -803,7 +803,7 @@ type serviceLister interface { } type nodeLister interface { - List() (machines v1.NodeList, err error) + List(labels.Selector) ([]*v1.Node, error) } // Kubelet is the main kubelet implementation. diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b342c61235b..3fb69e08620 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -401,26 +402,24 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { } type testNodeLister struct { - nodes []v1.Node + nodes []*v1.Node } type testNodeInfo struct { - nodes []v1.Node + nodes []*v1.Node } func (ls testNodeInfo) GetNodeInfo(id string) (*v1.Node, error) { for _, node := range ls.nodes { if node.Name == id { - return &node, nil + return node, nil } } return nil, fmt.Errorf("Node with name: %s does not exist", id) } -func (ls testNodeLister) List() (v1.NodeList, error) { - return v1.NodeList{ - Items: ls.nodes, - }, nil +func (ls testNodeLister) List(selector labels.Selector) ([]*v1.Node, error) { + return ls.nodes, nil } // Tests that we handle port conflicts correctly by setting the failed status in status map. @@ -432,7 +431,7 @@ func TestHandlePortConflicts(t *testing.T) { testKubelet.fakeCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) - kl.nodeLister = testNodeLister{nodes: []v1.Node{ + kl.nodeLister = testNodeLister{nodes: []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, Status: v1.NodeStatus{ @@ -442,7 +441,7 @@ func TestHandlePortConflicts(t *testing.T) { }, }, }} - kl.nodeInfo = testNodeInfo{nodes: []v1.Node{ + kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, Status: v1.NodeStatus{ @@ -487,7 +486,7 @@ func TestHandleHostNameConflicts(t *testing.T) { testKubelet.fakeCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) - kl.nodeLister = testNodeLister{nodes: []v1.Node{ + kl.nodeLister = testNodeLister{nodes: []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"}, Status: v1.NodeStatus{ @@ -497,7 +496,7 @@ func TestHandleHostNameConflicts(t *testing.T) { }, }, }} - kl.nodeInfo = testNodeInfo{nodes: []v1.Node{ + kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"}, Status: v1.NodeStatus{ @@ -535,7 +534,7 @@ func TestHandleNodeSelector(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet - nodes := []v1.Node{ + nodes := []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}, Status: v1.NodeStatus{ @@ -576,7 +575,7 @@ func TestHandleMemExceeded(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet - nodes := []v1.Node{ + nodes := []*v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI), @@ -1874,7 +1873,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet - kl.nodeLister = testNodeLister{nodes: []v1.Node{ + kl.nodeLister = testNodeLister{nodes: []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, Status: v1.NodeStatus{ @@ -1884,7 +1883,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { }, }, }} - kl.nodeInfo = testNodeInfo{nodes: []v1.Node{ + kl.nodeInfo = testNodeInfo{nodes: []*v1.Node{ { ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, Status: v1.NodeStatus{ diff --git a/plugin/cmd/kube-scheduler/app/BUILD b/plugin/cmd/kube-scheduler/app/BUILD index 20acdb4a383..9d9b01837b2 100644 --- a/plugin/cmd/kube-scheduler/app/BUILD +++ b/plugin/cmd/kube-scheduler/app/BUILD @@ -18,6 +18,9 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library", "//pkg/client/leaderelection:go_default_library", "//pkg/client/leaderelection/resourcelock:go_default_library", "//pkg/util/configz:go_default_library", diff --git a/plugin/cmd/kube-scheduler/app/configurator.go b/plugin/cmd/kube-scheduler/app/configurator.go index 337265d12e0..70fa5630f9f 100644 --- a/plugin/cmd/kube-scheduler/app/configurator.go +++ b/plugin/cmd/kube-scheduler/app/configurator.go @@ -21,6 +21,8 @@ import ( "io/ioutil" "os" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1" "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" "k8s.io/apimachinery/pkg/runtime" @@ -69,8 +71,28 @@ func createClient(s *options.SchedulerServer) (*clientset.Clientset, error) { } // createScheduler encapsulates the entire creation of a runnable scheduler. -func createScheduler(s *options.SchedulerServer, kubecli *clientset.Clientset, recorder record.EventRecorder) (*scheduler.Scheduler, error) { - configurator := factory.NewConfigFactory(kubecli, s.SchedulerName, s.HardPodAffinitySymmetricWeight) +func createScheduler( + s *options.SchedulerServer, + kubecli *clientset.Clientset, + nodeInformer coreinformers.NodeInformer, + pvInformer coreinformers.PersistentVolumeInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + replicationControllerInformer coreinformers.ReplicationControllerInformer, + replicaSetInformer extensionsinformers.ReplicaSetInformer, + serviceInformer coreinformers.ServiceInformer, + recorder record.EventRecorder, +) (*scheduler.Scheduler, error) { + configurator := factory.NewConfigFactory( + s.SchedulerName, + kubecli, + nodeInformer, + pvInformer, + pvcInformer, + replicationControllerInformer, + replicaSetInformer, + serviceInformer, + s.HardPodAffinitySymmetricWeight, + ) // Rebuild the configurator with a default Create(...) method. configurator = &schedulerConfigurator{ diff --git a/plugin/cmd/kube-scheduler/app/options/BUILD b/plugin/cmd/kube-scheduler/app/options/BUILD index 6d02773e718..3c1feba0af4 100644 --- a/plugin/cmd/kube-scheduler/app/options/BUILD +++ b/plugin/cmd/kube-scheduler/app/options/BUILD @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/api:go_default_library", "//pkg/apis/componentconfig:go_default_library", + "//pkg/apis/componentconfig/install:go_default_library", "//pkg/apis/componentconfig/v1alpha1:go_default_library", "//pkg/client/leaderelection:go_default_library", "//pkg/features:go_default_library", diff --git a/plugin/cmd/kube-scheduler/app/options/options.go b/plugin/cmd/kube-scheduler/app/options/options.go index 57f5b9e30ea..1d84938553a 100644 --- a/plugin/cmd/kube-scheduler/app/options/options.go +++ b/plugin/cmd/kube-scheduler/app/options/options.go @@ -27,6 +27,8 @@ import ( // add the kubernetes feature gates _ "k8s.io/kubernetes/pkg/features" + // install the componentconfig api so we get its defaulting and conversion functions + _ "k8s.io/kubernetes/pkg/apis/componentconfig/install" "github.com/spf13/pflag" ) diff --git a/plugin/cmd/kube-scheduler/app/server.go b/plugin/cmd/kube-scheduler/app/server.go index e35e4d32ce2..979b16e045c 100644 --- a/plugin/cmd/kube-scheduler/app/server.go +++ b/plugin/cmd/kube-scheduler/app/server.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/server/healthz" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/kubernetes/pkg/util/configz" @@ -67,24 +68,47 @@ func Run(s *options.SchedulerServer) error { if err != nil { return fmt.Errorf("unable to create kube client: %v", err) } + recorder := createRecorder(kubecli, s) - sched, err := createScheduler(s, kubecli, recorder) + + informerFactory := informers.NewSharedInformerFactory(kubecli, 0) + + sched, err := createScheduler( + s, + kubecli, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + recorder, + ) if err != nil { return fmt.Errorf("error creating scheduler: %v", err) } + go startHTTP(s) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + run := func(_ <-chan struct{}) { sched.Run() select {} } + if !s.LeaderElection.LeaderElect { run(nil) panic("unreachable") } + id, err := os.Hostname() if err != nil { return fmt.Errorf("unable to get hostname: %v", err) } + // TODO: enable other lock types rl := &resourcelock.EndpointsLock{ EndpointsMeta: metav1.ObjectMeta{ @@ -97,6 +121,7 @@ func Run(s *options.SchedulerServer) error { EventRecorder: recorder, }, } + leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, @@ -109,6 +134,7 @@ func Run(s *options.SchedulerServer) error { }, }, }) + panic("unreachable") } diff --git a/plugin/pkg/scheduler/BUILD b/plugin/pkg/scheduler/BUILD index 52055569d3e..a235d16123f 100644 --- a/plugin/pkg/scheduler/BUILD +++ b/plugin/pkg/scheduler/BUILD @@ -40,6 +40,7 @@ go_library( deps = [ "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/metrics:go_default_library", diff --git a/plugin/pkg/scheduler/algorithm/predicates/BUILD b/plugin/pkg/scheduler/algorithm/predicates/BUILD index b77a17843af..6181d417663 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/BUILD +++ b/plugin/pkg/scheduler/algorithm/predicates/BUILD @@ -19,12 +19,13 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/kubelet/qos:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/runtime", diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 4be7fe0ca86..10cbe901c80 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -25,12 +25,13 @@ import ( "time" "github.com/golang/glog" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/client/legacylisters" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" @@ -58,13 +59,22 @@ type PersistentVolumeInfo interface { GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) } +// CachedPersistentVolumeInfo implements PersistentVolumeInfo +type CachedPersistentVolumeInfo struct { + corelisters.PersistentVolumeLister +} + +func (c *CachedPersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) { + return c.Get(pvID) +} + type PersistentVolumeClaimInfo interface { GetPersistentVolumeClaimInfo(namespace string, name string) (*v1.PersistentVolumeClaim, error) } // CachedPersistentVolumeClaimInfo implements PersistentVolumeClaimInfo type CachedPersistentVolumeClaimInfo struct { - *listers.StoreToPersistentVolumeClaimLister + corelisters.PersistentVolumeClaimLister } // GetPersistentVolumeClaimInfo fetches the claim in specified namespace with specified name @@ -73,22 +83,22 @@ func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace } type CachedNodeInfo struct { - *listers.StoreToNodeLister + corelisters.NodeLister } // GetNodeInfo returns cached data for the node 'id'. func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) { - node, exists, err := c.Get(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: id}}) + node, err := c.Get(id) + + if apierrors.IsNotFound(err) { + return nil, fmt.Errorf("node '%v' not found", id) + } if err != nil { return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err) } - if !exists { - return nil, fmt.Errorf("node '%v' not found", id) - } - - return node.(*v1.Node), nil + return node, nil } // Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD b/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD index 99770b0471b..7b958a7733c 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD @@ -35,6 +35,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library", "//plugin/pkg/scheduler/factory:go_default_library", diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index fc87095a13a..2fe51736e11 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" @@ -345,8 +346,19 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + informerFactory := informers.NewSharedInformerFactory(client, 0) - if _, err := factory.NewConfigFactory(client, "some-scheduler-name", v1.DefaultHardPodAffinitySymmetricWeight).CreateFromConfig(policy); err != nil { + if _, err := factory.NewConfigFactory( + "some-scheduler-name", + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ).CreateFromConfig(policy); err != nil { t.Errorf("%s: Error constructing: %v", v, err) continue } diff --git a/plugin/pkg/scheduler/factory/BUILD b/plugin/pkg/scheduler/factory/BUILD index 389d3d965c6..ff6eb68528f 100644 --- a/plugin/pkg/scheduler/factory/BUILD +++ b/plugin/pkg/scheduler/factory/BUILD @@ -17,10 +17,11 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", - "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/extensions/v1beta1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//pkg/client/listers/extensions/v1beta1:go_default_library", "//plugin/pkg/scheduler:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", @@ -56,6 +57,7 @@ go_test( "//pkg/api/testing:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library", diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 1d97ad64aa9..76f91cc0084 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -32,10 +32,11 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/v1" - extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" - "k8s.io/kubernetes/pkg/controller/informers" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -58,32 +59,26 @@ type ConfigFactory struct { // queue for pods that need scheduling podQueue *cache.FIFO // a means to list all known scheduled pods. - scheduledPodLister *listers.StoreToPodLister + scheduledPodLister corelisters.PodLister // a means to list all known scheduled pods and pods assumed to have been scheduled. podLister algorithm.PodLister // a means to list all nodes - nodeLister *listers.StoreToNodeLister + nodeLister corelisters.NodeLister // a means to list all PersistentVolumes - pVLister *listers.StoreToPVFetcher + pVLister corelisters.PersistentVolumeLister // a means to list all PersistentVolumeClaims - pVCLister *listers.StoreToPersistentVolumeClaimLister + pVCLister corelisters.PersistentVolumeClaimLister // a means to list all services - serviceLister *listers.StoreToServiceLister + serviceLister corelisters.ServiceLister // a means to list all controllers - controllerLister *listers.StoreToReplicationControllerLister + controllerLister corelisters.ReplicationControllerLister // a means to list all replicasets - replicaSetLister *listers.StoreToReplicaSetLister + replicaSetLister extensionslisters.ReplicaSetLister // Close this to stop all reflectors StopEverything chan struct{} - informerFactory informers.SharedInformerFactory scheduledPodPopulator cache.Controller - nodePopulator cache.Controller - pvPopulator cache.Controller - pvcPopulator cache.Controller - servicePopulator cache.Controller - controllerPopulator cache.Controller schedulerCache schedulercache.Cache @@ -102,40 +97,41 @@ type ConfigFactory struct { // NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only // return the interface. -func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int) scheduler.Configurator { +func NewConfigFactory( + schedulerName string, + client clientset.Interface, + nodeInformer coreinformers.NodeInformer, + pvInformer coreinformers.PersistentVolumeInformer, + pvcInformer coreinformers.PersistentVolumeClaimInformer, + replicationControllerInformer coreinformers.ReplicationControllerInformer, + replicaSetInformer extensionsinformers.ReplicaSetInformer, + serviceInformer coreinformers.ServiceInformer, + hardPodAffinitySymmetricWeight int, +) scheduler.Configurator { stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) - // TODO: pass this in as an argument... - informerFactory := informers.NewSharedInformerFactory(client, nil, 0) - pvcInformer := informerFactory.PersistentVolumeClaims() - c := &ConfigFactory{ - client: client, - podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), - scheduledPodLister: &listers.StoreToPodLister{}, - informerFactory: informerFactory, - // Only nodes in the "Ready" condition with status == "True" are schedulable - nodeLister: &listers.StoreToNodeLister{}, - pVLister: &listers.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + client: client, + podLister: schedulerCache, + podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), + pVLister: pvInformer.Lister(), pVCLister: pvcInformer.Lister(), - pvcPopulator: pvcInformer.Informer().GetController(), - serviceLister: &listers.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - controllerLister: &listers.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - replicaSetLister: &listers.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, + serviceLister: serviceInformer.Lister(), + controllerLister: replicationControllerInformer.Lister(), + replicaSetLister: replicaSetInformer.Lister(), schedulerCache: schedulerCache, StopEverything: stopEverything, schedulerName: schedulerName, hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, } - c.podLister = schedulerCache - // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because // ScheduledPodLister is something we provide to plug in functions that // they may need to call. - c.scheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer( + var scheduledPodIndexer cache.Indexer + scheduledPodIndexer, c.scheduledPodPopulator = cache.NewIndexerInformer( c.createAssignedNonTerminatedPodLW(), &v1.Pod{}, 0, @@ -146,48 +142,27 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) + c.scheduledPodLister = corelisters.NewPodLister(scheduledPodIndexer) - c.nodeLister.Store, c.nodePopulator = cache.NewInformer( - c.createNodeLW(), - &v1.Node{}, - 0, + // Only nodes in the "Ready" condition with status == "True" are schedulable + nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: c.addNodeToCache, UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, + 0, ) + c.nodeLister = nodeInformer.Lister() // TODO(harryz) need to fill all the handlers here and below for equivalence cache - c.pVLister.Store, c.pvPopulator = cache.NewInformer( - c.createPersistentVolumeLW(), - &v1.PersistentVolume{}, - 0, - cache.ResourceEventHandlerFuncs{}, - ) - - c.serviceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( - c.createServiceLW(), - &v1.Service{}, - 0, - cache.ResourceEventHandlerFuncs{}, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - c.controllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer( - c.createControllerLW(), - &v1.ReplicationController{}, - 0, - cache.ResourceEventHandlerFuncs{}, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) return c } // GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. -func (c *ConfigFactory) GetNodeStore() cache.Store { - return c.nodeLister.Store +func (c *ConfigFactory) GetNodeLister() corelisters.NodeLister { + return c.nodeLister } func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int { @@ -204,8 +179,8 @@ func (f *ConfigFactory) GetClient() clientset.Interface { } // GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests. -func (c *ConfigFactory) GetScheduledPodListerIndexer() cache.Indexer { - return c.scheduledPodLister.Indexer +func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister { + return c.scheduledPodLister } // TODO(harryz) need to update all the handlers here and below for equivalence cache @@ -394,7 +369,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return &scheduler.Config{ SchedulerCache: f.schedulerCache, // The scheduler only needs to consider schedulable nodes. - NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()), + NodeLister: &nodePredicateLister{f.nodeLister}, Algorithm: algo, Binder: &binder{f.client}, PodConditionUpdater: &podConditionUpdater{f.client}, @@ -406,6 +381,14 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, }, nil } +type nodePredicateLister struct { + corelisters.NodeLister +} + +func (n *nodePredicateLister) List() ([]*v1.Node, error) { + return n.ListWithPredicate(getNodeConditionPredicate()) +} + func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) { pluginArgs, err := f.getPluginArgs() if err != nil { @@ -448,10 +431,10 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { ControllerLister: f.controllerLister, ReplicaSetLister: f.replicaSetLister, // All fit predicates only need to consider schedulable nodes. - NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()), - NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.nodeLister}, - PVInfo: f.pVLister, - PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.pVCLister}, + NodeLister: &nodePredicateLister{f.nodeLister}, + NodeInfo: &predicates.CachedNodeInfo{NodeLister: f.nodeLister}, + PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: f.pVLister}, + PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{PersistentVolumeClaimLister: f.pVCLister}, HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight, }, nil } @@ -462,27 +445,6 @@ func (f *ConfigFactory) Run() { // Begin populating scheduled pods. go f.scheduledPodPopulator.Run(f.StopEverything) - - // Begin populating nodes. - go f.nodePopulator.Run(f.StopEverything) - - // Begin populating pv & pvc - go f.pvPopulator.Run(f.StopEverything) - go f.pvcPopulator.Run(f.StopEverything) - - // Begin populating services - go f.servicePopulator.Run(f.StopEverything) - - // Begin populating controllers - go f.controllerPopulator.Run(f.StopEverything) - - // start informers... - f.informerFactory.Start(f.StopEverything) - - // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods - // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. - // Cache this locally. - cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.replicaSetLister.Indexer, 0).RunUntil(f.StopEverything) } func (f *ConfigFactory) getNextPod() *v1.Pod { @@ -499,7 +461,7 @@ func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool { return f.schedulerName == pod.Spec.SchedulerName } -func getNodeConditionPredicate() listers.NodeConditionPredicate { +func getNodeConditionPredicate() corelisters.NodeConditionPredicate { return func(node *v1.Node) bool { for i := range node.Status.Conditions { cond := &node.Status.Conditions[i] @@ -542,39 +504,6 @@ func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatc return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", metav1.NamespaceAll, selector) } -// createNodeLW returns a cache.ListWatch that gets all changes to nodes. -func (factory *ConfigFactory) createNodeLW() *cache.ListWatch { - // all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups - // the NodeCondition is used to filter out the nodes that are not ready or unschedulable - // the filtered list is used as the super set of nodes to consider for scheduling - return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.ParseSelectorOrDie("")) -} - -// createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes. -func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumes", metav1.NamespaceAll, fields.ParseSelectorOrDie("")) -} - -// createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims. -func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumeClaims", metav1.NamespaceAll, fields.ParseSelectorOrDie("")) -} - -// Returns a cache.ListWatch that gets all changes to services. -func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "services", metav1.NamespaceAll, fields.ParseSelectorOrDie("")) -} - -// Returns a cache.ListWatch that gets all changes to controllers. -func (factory *ConfigFactory) createControllerLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "replicationControllers", metav1.NamespaceAll, fields.ParseSelectorOrDie("")) -} - -// Returns a cache.ListWatch that gets all changes to replicasets. -func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.client.Extensions().RESTClient(), "replicasets", metav1.NamespaceAll, fields.ParseSelectorOrDie("")) -} - func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 533fddc5ee9..640ed379f25 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -33,6 +33,7 @@ import ( apitesting "k8s.io/kubernetes/pkg/api/testing" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" @@ -49,7 +50,18 @@ func TestCreate(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factory := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) factory.Create() } @@ -67,7 +79,18 @@ func TestCreateFromConfig(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factory := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) // Pre-register some predicate and priority functions RegisterFitPredicate("PredicateOne", PredicateOne) @@ -108,7 +131,18 @@ func TestCreateFromEmptyConfig(t *testing.T) { server := httptest.NewServer(&handler) defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - factory := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factory := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) configData = []byte(`{}`) if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, &policy); err != nil { @@ -150,7 +184,19 @@ func TestDefaultErrorFunc(t *testing.T) { mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler) server := httptest.NewServer(mux) defer server.Close() - factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factory := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second) errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue) @@ -247,9 +293,30 @@ func TestResponsibleForPod(t *testing.T) { defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) // factory of "default-scheduler" - factoryDefaultScheduler := NewConfigFactory(client, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factoryDefaultScheduler := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) // factory of "foo-scheduler" - factoryFooScheduler := NewConfigFactory(client, "foo-scheduler", v1.DefaultHardPodAffinitySymmetricWeight) + factoryFooScheduler := NewConfigFactory( + "foo-scheduler", + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) // scheduler annotations to be tested schedulerFitsDefault := "default-scheduler" schedulerFitsFoo := "foo-scheduler" @@ -305,7 +372,18 @@ func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) { // defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) // factory of "default-scheduler" - factory := NewConfigFactory(client, v1.DefaultSchedulerName, -1) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factory := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + -1, + ) _, err := factory.Create() if err == nil { t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing") @@ -337,7 +415,18 @@ func TestInvalidFactoryArgs(t *testing.T) { } for _, test := range testCases { - factory := NewConfigFactory(client, v1.DefaultSchedulerName, test.hardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(client, 0) + factory := NewConfigFactory( + v1.DefaultSchedulerName, + client, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + test.hardPodAffinitySymmetricWeight, + ) _, err := factory.Create() if err == nil { t.Errorf("expected err: %s, got nothing", test.expectErr) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index de0854ccbfa..1d37ef6d056 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" @@ -69,9 +70,9 @@ type Configurator interface { ResponsibleForPod(pod *v1.Pod) bool // Needs to be exposed for things like integration tests where we want to make fake nodes. - GetNodeStore() cache.Store + GetNodeLister() corelisters.NodeLister GetClient() clientset.Interface - GetScheduledPodListerIndexer() cache.Indexer + GetScheduledPodLister() corelisters.PodLister Run() Create() (*Config, error) diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index c99b8afa4a2..a3257f5f459 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" @@ -239,7 +240,18 @@ func TestSchedulerExtender(t *testing.T) { } policy.APIVersion = api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() - schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + schedulerConfigFactory := factory.NewConfigFactory( + v1.DefaultSchedulerName, + clientSet, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy) if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) @@ -247,7 +259,9 @@ func TestSchedulerExtender(t *testing.T) { eventBroadcaster := record.NewBroadcaster() schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientSet.Core().RESTClient()).Events("")}) - scheduler.New(schedulerConfig).Run() + scheduler := scheduler.New(schedulerConfig) + informerFactory.Start(schedulerConfig.StopEverything) + scheduler.Run() defer close(schedulerConfig.StopEverything) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 41be177cc70..ec497e06861 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -37,6 +37,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" @@ -44,7 +46,7 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) -type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeStore cache.Store, c clientset.Interface) +type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) type nodeStateManager struct { makeSchedulable nodeMutationFunc @@ -59,8 +61,19 @@ func TestUnschedulableNodes(t *testing.T) { defer framework.DeleteTestingNamespace(ns, s, t) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + schedulerConfigFactory := factory.NewConfigFactory( + v1.DefaultSchedulerName, + clientSet, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) @@ -68,11 +81,12 @@ func TestUnschedulableNodes(t *testing.T) { eventBroadcaster := record.NewBroadcaster() schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) + informerFactory.Start(schedulerConfig.StopEverything) scheduler.New(schedulerConfig).Run() defer close(schedulerConfig.StopEverything) - DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeStore()) + DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeLister()) } func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { @@ -94,23 +108,23 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond // Wait till the passFunc confirms that the object it expects to see is in the store. // Used to observe reflected events. -func waitForReflection(t *testing.T, s cache.Store, key string, passFunc func(n interface{}) bool) error { +func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, passFunc func(n interface{}) bool) error { nodes := []*v1.Node{} err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { - if n, _, err := s.GetByKey(key); err == nil && passFunc(n) { + n, err := nodeLister.Get(key) + + switch { + case err == nil && passFunc(n): return true, nil - } else { - if err != nil { - t.Errorf("Unexpected error: %v", err) - } else { - if n == nil { - nodes = append(nodes, nil) - } else { - nodes = append(nodes, n.(*v1.Node)) - } - } - return false, nil + case errors.IsNotFound(err): + nodes = append(nodes, nil) + case err != nil: + t.Errorf("Unexpected error: %v", err) + default: + nodes = append(nodes, n) } + + return false, nil }) if err != nil { t.Logf("Logging consecutive node versions received from store:") @@ -121,7 +135,7 @@ func waitForReflection(t *testing.T, s cache.Store, key string, passFunc func(n return err } -func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Namespace, nodeStore cache.Store) { +func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Namespace, nodeLister corelisters.NodeLister) { // NOTE: This test cannot run in parallel, because it is creating and deleting // non-namespaced objects (Nodes). defer cs.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) @@ -167,12 +181,12 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names nodeModifications := []nodeStateManager{ // Test node.Spec.Unschedulable=true/false { - makeUnSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) { + makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) { n.Spec.Unschedulable = true if _, err := c.Core().Nodes().Update(n); err != nil { t.Fatalf("Failed to update node with unschedulable=true: %v", err) } - err = waitForReflection(t, s, nodeKey, func(node interface{}) bool { + err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { // An unschedulable node should still be present in the store // Nodes that are unschedulable or that are not ready or // have their disk full (Node.Spec.Conditions) are excluded @@ -183,12 +197,12 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names t.Fatalf("Failed to observe reflected update for setting unschedulable=true: %v", err) } }, - makeSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) { + makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) { n.Spec.Unschedulable = false if _, err := c.Core().Nodes().Update(n); err != nil { t.Fatalf("Failed to update node with unschedulable=false: %v", err) } - err = waitForReflection(t, s, nodeKey, func(node interface{}) bool { + err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { return node != nil && node.(*v1.Node).Spec.Unschedulable == false }) if err != nil { @@ -198,7 +212,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names }, // Test node.Status.Conditions=ConditionTrue/Unknown { - makeUnSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) { + makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) { n.Status = v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), @@ -208,14 +222,14 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names if _, err = c.Core().Nodes().UpdateStatus(n); err != nil { t.Fatalf("Failed to update node with bad status condition: %v", err) } - err = waitForReflection(t, s, nodeKey, func(node interface{}) bool { + err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { return node != nil && node.(*v1.Node).Status.Conditions[0].Status == v1.ConditionUnknown }) if err != nil { t.Fatalf("Failed to observe reflected update for status condition update: %v", err) } }, - makeSchedulable: func(t *testing.T, n *v1.Node, s cache.Store, c clientset.Interface) { + makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) { n.Status = v1.NodeStatus{ Capacity: v1.ResourceList{ v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), @@ -225,7 +239,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names if _, err = c.Core().Nodes().UpdateStatus(n); err != nil { t.Fatalf("Failed to update node with healthy status condition: %v", err) } - err = waitForReflection(t, s, nodeKey, func(node interface{}) bool { + err = waitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool { return node != nil && node.(*v1.Node).Status.Conditions[0].Status == v1.ConditionTrue }) if err != nil { @@ -242,7 +256,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names } // Apply the unschedulable modification to the node, and wait for the reflection - mod.makeUnSchedulable(t, unSchedNode, nodeStore, cs) + mod.makeUnSchedulable(t, unSchedNode, nodeLister, cs) // Create the new pod, note that this needs to happen post unschedulable // modification or we have a race in the test. @@ -273,7 +287,7 @@ func DoTestUnschedulableNodes(t *testing.T, cs clientset.Interface, ns *v1.Names if err != nil { t.Fatalf("Failed to get node: %v", err) } - mod.makeSchedulable(t, schedNode, nodeStore, cs) + mod.makeSchedulable(t, schedNode, nodeLister, cs) // Wait until the pod is scheduled. err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(cs, myPod.Namespace, myPod.Name)) @@ -329,7 +343,18 @@ func TestMultiScheduler(t *testing.T) { // non-namespaced objects (Nodes). defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) - schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + schedulerConfigFactory := factory.NewConfigFactory( + v1.DefaultSchedulerName, + clientSet, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) @@ -337,6 +362,7 @@ func TestMultiScheduler(t *testing.T) { eventBroadcaster := record.NewBroadcaster() schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) + informerFactory.Start(schedulerConfig.StopEverything) scheduler.New(schedulerConfig).Run() // default-scheduler will be stopped later @@ -399,8 +425,19 @@ func TestMultiScheduler(t *testing.T) { // 5. create and start a scheduler with name "foo-scheduler" clientSet2 := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + informerFactory2 := informers.NewSharedInformerFactory(clientSet, 0) - schedulerConfigFactory2 := factory.NewConfigFactory(clientSet2, "foo-scheduler", v1.DefaultHardPodAffinitySymmetricWeight) + schedulerConfigFactory2 := factory.NewConfigFactory( + "foo-scheduler", + clientSet2, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) schedulerConfig2, err := schedulerConfigFactory2.Create() if err != nil { t.Errorf("Couldn't create scheduler config: %v", err) @@ -408,6 +445,7 @@ func TestMultiScheduler(t *testing.T) { eventBroadcaster2 := record.NewBroadcaster() schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(api.Scheme, clientv1.EventSource{Component: "foo-scheduler"}) eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet2.Core().RESTClient()).Events("")}) + informerFactory2.Start(schedulerConfig2.StopEverything) scheduler.New(schedulerConfig2).Run() defer close(schedulerConfig2.StopEverything) @@ -490,12 +528,23 @@ func TestAllocatable(t *testing.T) { // 1. create and start default-scheduler clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) // NOTE: This test cannot run in parallel, because it is creating and deleting // non-namespaced objects (Nodes). defer clientSet.Core().Nodes().DeleteCollection(nil, metav1.ListOptions{}) - schedulerConfigFactory := factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + schedulerConfigFactory := factory.NewConfigFactory( + v1.DefaultSchedulerName, + clientSet, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) schedulerConfig, err := schedulerConfigFactory.Create() if err != nil { t.Fatalf("Couldn't create scheduler config: %v", err) @@ -503,6 +552,7 @@ func TestAllocatable(t *testing.T) { eventBroadcaster := record.NewBroadcaster() schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: v1.DefaultSchedulerName}) eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) + informerFactory.Start(schedulerConfig.StopEverything) scheduler.New(schedulerConfig).Run() // default-scheduler will be stopped later defer close(schedulerConfig.StopEverything) diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index 3e64b649875..cc68fab8786 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -16,6 +16,7 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", "//plugin/pkg/scheduler:go_default_library", "//plugin/pkg/scheduler/algorithmprovider:go_default_library", "//plugin/pkg/scheduler/factory:go_default_library", @@ -43,6 +44,7 @@ go_test( "//test/utils:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", ], ) diff --git a/test/integration/scheduler_perf/scheduler_bench_test.go b/test/integration/scheduler_perf/scheduler_bench_test.go index b4f431906c1..d848219ee4c 100644 --- a/test/integration/scheduler_perf/scheduler_bench_test.go +++ b/test/integration/scheduler_perf/scheduler_bench_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" @@ -74,7 +75,10 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) { podCreator.CreatePods() for { - scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List() + scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything()) + if err != nil { + glog.Fatalf("%v", err) + } if len(scheduled) >= numScheduledPods { break } @@ -89,7 +93,10 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) { for { // This can potentially affect performance of scheduler, since List() is done under mutex. // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List() + scheduled, err := schedulerConfigFactory.GetScheduledPodLister().List(labels.Everything()) + if err != nil { + glog.Fatalf("%v", err) + } if len(scheduled) >= numScheduledPods+b.N { break } diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index 93e332b3c79..ecc29915b6e 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -23,6 +23,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" @@ -209,7 +210,10 @@ func schedulePods(config *testConfig) int32 { // Bake in time for the first pod scheduling event. for { time.Sleep(50 * time.Millisecond) - scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List() + scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything()) + if err != nil { + glog.Fatalf("%v", err) + } // 30,000 pods -> wait till @ least 300 are scheduled to start measuring. // TODO Find out why sometimes there may be scheduling blips in the beggining. if len(scheduled) > config.numPods/100 { @@ -224,7 +228,10 @@ func schedulePods(config *testConfig) int32 { // This can potentially affect performance of scheduler, since List() is done under mutex. // Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler. // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List() + scheduled, err := config.schedulerSupportFunctions.GetScheduledPodLister().List(labels.Everything()) + if err != nil { + glog.Fatalf("%v", err) + } // We will be completed when all pods are done being scheduled. // return the worst-case-scenario interval that was seen during this time. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index ff3550d4d1d..ed03e59cf56 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" @@ -58,7 +59,19 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy Burst: 5000, }) - schedulerConfigurator = factory.NewConfigFactory(clientSet, v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + + schedulerConfigurator = factory.NewConfigFactory( + v1.DefaultSchedulerName, + clientSet, + informerFactory.Core().V1().Nodes(), + informerFactory.Core().V1().PersistentVolumes(), + informerFactory.Core().V1().PersistentVolumeClaims(), + informerFactory.Core().V1().ReplicationControllers(), + informerFactory.Extensions().V1beta1().ReplicaSets(), + informerFactory.Core().V1().Services(), + v1.DefaultHardPodAffinitySymmetricWeight, + ) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(clientSet.Core().RESTClient()).Events("")}) @@ -70,11 +83,15 @@ func mustSetupScheduler() (schedulerConfigurator scheduler.Configurator, destroy glog.Fatalf("Error creating scheduler: %v", err) } + stop := make(chan struct{}) + informerFactory.Start(stop) + sched.Run() destroyFunc = func() { glog.Infof("destroying") sched.StopEverything() + close(stop) s.Close() glog.Infof("destroyed") }