From 9011207f1897e27e6d3fd7e27546fc61c560395d Mon Sep 17 00:00:00 2001 From: mqliang Date: Thu, 7 Apr 2016 20:15:21 +0800 Subject: [PATCH] add namespace index to rc and pod --- cmd/integration/integration.go | 3 +- .../app/controllermanager.go | 5 +- .../mesos/pkg/service/endpoints_controller.go | 3 +- pkg/client/cache/index.go | 4 + pkg/client/cache/listers.go | 69 ++++++++++----- pkg/client/cache/listers_test.go | 4 +- pkg/controller/daemon/controller.go | 8 +- pkg/controller/daemon/controller_test.go | 32 +++---- .../deployment/deployment_controller.go | 3 +- .../deployment/deployment_controller_test.go | 2 +- .../endpoint/endpoints_controller.go | 8 +- .../endpoint/endpoints_controller_test.go | 22 ++--- pkg/controller/framework/informers/factory.go | 16 ++++ pkg/controller/framework/shared_informer.go | 73 ++++++++++----- pkg/controller/gc/gc_controller.go | 4 +- pkg/controller/gc/gc_controller_test.go | 2 +- pkg/controller/job/controller.go | 6 +- pkg/controller/job/controller_test.go | 16 ++-- pkg/controller/node/nodecontroller.go | 4 +- pkg/controller/node/nodecontroller_test.go | 2 +- pkg/controller/replicaset/replica_set.go | 3 +- pkg/controller/replicaset/replica_set_test.go | 32 +++---- .../replication/replication_controller.go | 15 ++-- .../replication_controller_test.go | 88 ++++++++++--------- plugin/pkg/scheduler/factory/factory.go | 7 +- 25 files changed, 263 insertions(+), 168 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 9d6d212614c..9afafd1e69e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" @@ -194,7 +195,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() - podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc()) + podInformer := informers.CreateSharedIndexPodInformer(clientset, controller.NoResyncPeriodFunc(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) // ensure the service endpoints are sync'd several times within the window that the integration tests wait go endpointcontroller.NewEndpointController(podInformer, clientset). diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index e6cfcb0330e..5e526bf363e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" @@ -194,8 +195,8 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) - informers := map[reflect.Type]framework.SharedInformer{} + podInformer := informers.CreateSharedIndexPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + informers := map[reflect.Type]framework.SharedIndexInformer{} informers[reflect.TypeOf(&api.Pod{})] = podInformer go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 0b749b121d3..5b2649291a1 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -76,7 +76,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController { }, ) - e.podStore.Store, e.podController = framework.NewInformer( + e.podStore.Indexer, e.podController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return e.client.Core().Pods(api.NamespaceAll).List(options) @@ -92,6 +92,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController { UpdateFunc: e.updatePod, DeleteFunc: e.deletePod, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) return e } diff --git a/pkg/client/cache/index.go b/pkg/client/cache/index.go index a0c0c288c79..0dcb43e128c 100644 --- a/pkg/client/cache/index.go +++ b/pkg/client/cache/index.go @@ -53,6 +53,10 @@ func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc { } } +const ( + NamespaceIndex string = "namespace" +) + // MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 5a9ec391d62..d017dde3a6f 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -41,7 +41,7 @@ import ( // l := StoreToPodLister{s} // l.List() type StoreToPodLister struct { - Store + Indexer } // Please note that selector is filtering among the pods that have gotten into @@ -54,7 +54,7 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err // s.Pods(api.NamespaceAll).List(selector), however then we'd have to // remake the list.Items as a []*api.Pod. So leave this separate for // now. - for _, m := range s.Store.List() { + for _, m := range s.Indexer.List() { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { pods = append(pods, pod) @@ -65,11 +65,11 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err // Pods is taking baby steps to be more like the api in pkg/client func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { - return storePodsNamespacer{s.Store, namespace} + return storePodsNamespacer{s.Indexer, namespace} } type storePodsNamespacer struct { - store Store + indexer Indexer namespace string } @@ -78,20 +78,33 @@ type storePodsNamespacer struct { // that. func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, err error) { list := api.PodList{} - for _, m := range s.store.List() { - pod := m.(*api.Pod) - if s.namespace == api.NamespaceAll || s.namespace == pod.Namespace { + if s.namespace == api.NamespaceAll { + for _, m := range s.indexer.List() { + pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { list.Items = append(list.Items, *pod) } } + return list, nil + } + + key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} + items, err := s.indexer.Index(NamespaceIndex, key) + if err != nil { + return api.PodList{}, err + } + for _, m := range items { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + list.Items = append(list.Items, *pod) + } } return list, nil } // Exists returns true if a pod matching the namespace/name of the given pod exists in the store. func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { - _, exists, err := s.Store.Get(pod) + _, exists, err := s.Indexer.Get(pod) if err != nil { return false, err } @@ -143,12 +156,12 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) { // StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. type StoreToReplicationControllerLister struct { - Store + Indexer } // Exists checks if the given rc exists in the store. func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) { - _, exists, err := s.Store.Get(controller) + _, exists, err := s.Indexer.Get(controller) if err != nil { return false, err } @@ -158,29 +171,42 @@ func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationC // StoreToReplicationControllerLister lists all controllers in the store. // TODO: converge on the interface in pkg/client func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) { - for _, c := range s.Store.List() { + for _, c := range s.Indexer.List() { controllers = append(controllers, *(c.(*api.ReplicationController))) } return controllers, nil } func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer { - return storeReplicationControllersNamespacer{s.Store, namespace} + return storeReplicationControllersNamespacer{s.Indexer, namespace} } type storeReplicationControllersNamespacer struct { - store Store + indexer Indexer namespace string } func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (controllers []api.ReplicationController, err error) { - for _, c := range s.store.List() { - rc := *(c.(*api.ReplicationController)) - if s.namespace == api.NamespaceAll || s.namespace == rc.Namespace { + if s.namespace == api.NamespaceAll { + for _, m := range s.indexer.List() { + rc := *(m.(*api.ReplicationController)) if selector.Matches(labels.Set(rc.Labels)) { controllers = append(controllers, rc) } } + return + } + + key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} + items, err := s.indexer.Index(NamespaceIndex, key) + if err != nil { + return + } + for _, m := range items { + rc := *(m.(*api.ReplicationController)) + if selector.Matches(labels.Set(rc.Labels)) { + controllers = append(controllers, rc) + } } return } @@ -195,11 +221,14 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co return } - for _, m := range s.Store.List() { + key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}} + items, err := s.Indexer.Index(NamespaceIndex, key) + if err != nil { + return + } + + for _, m := range items { rc = *m.(*api.ReplicationController) - if rc.Namespace != pod.Namespace { - continue - } labelSet := labels.Set(rc.Spec.Selector) selector = labels.Set(rc.Spec.Selector).AsSelector() diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 404d8ae393b..5e062fd9078 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -124,7 +124,7 @@ func TestStoreToNodeConditionLister(t *testing.T) { } func TestStoreToReplicationControllerLister(t *testing.T) { - store := NewStore(MetaNamespaceKeyFunc) + store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc}) lister := StoreToReplicationControllerLister{store} testCases := []struct { inRCs []*api.ReplicationController @@ -645,7 +645,7 @@ func TestStoreToJobLister(t *testing.T) { } func TestStoreToPodLister(t *testing.T) { - store := NewStore(MetaNamespaceKeyFunc) + store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc}) ids := []string{"foo", "bar", "baz"} for _, id := range ids { store.Add(&api.Pod{ diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 33dc6ab6cda..3d49441b7b0 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -108,7 +108,7 @@ type DaemonSetsController struct { queue *workqueue.Type } -func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -183,7 +183,7 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl UpdateFunc: dsc.updatePod, DeleteFunc: dsc.deletePod, }) - dsc.podStore.Store = podInformer.GetStore() + dsc.podStore.Indexer = podInformer.GetIndexer() dsc.podController = podInformer.GetController() dsc.podStoreSynced = podInformer.HasSynced @@ -210,7 +210,7 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl } func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { - podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) dsc.internalPodInformer = podInformer @@ -686,7 +686,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte newPod.Spec.NodeName = node.Name pods := []*api.Pod{newPod} - for _, m := range dsc.podStore.Store.List() { + for _, m := range dsc.podStore.Indexer.List() { pod := m.(*api.Pod) if pod.Spec.NodeName != node.Name { continue diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 9d3a47a6681..d47545dfffc 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -419,10 +419,10 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { func TestDealsWithExistingPods(t *testing.T) { manager, podControl := newTestController() addNodes(manager.nodeStore.Store, 0, 5, nil) - addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 1) - addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 2) - addPods(manager.podStore.Store, "node-3", simpleDaemonSetLabel, 5) - addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 2) + addPods(manager.podStore.Indexer, "node-3", simpleDaemonSetLabel, 5) + addPods(manager.podStore.Indexer, "node-4", simpleDaemonSetLabel2, 2) ds := newDaemonSet("foo") manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5) @@ -444,10 +444,10 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { manager, podControl := newTestController() addNodes(manager.nodeStore.Store, 0, 5, nil) addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) - addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel2, 2) - addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3) - addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 1) - addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 3) + addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel2, 1) + addPods(manager.podStore.Indexer, "node-4", simpleDaemonSetLabel, 1) daemon := newDaemonSet("foo") daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel manager.dsStore.Add(daemon) @@ -459,14 +459,14 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { manager, podControl := newTestController() addNodes(manager.nodeStore.Store, 0, 5, nil) addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) - addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel, 1) - addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3) - addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 2) - addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 4) - addPods(manager.podStore.Store, "node-6", simpleDaemonSetLabel, 13) - addPods(manager.podStore.Store, "node-7", simpleDaemonSetLabel2, 4) - addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel, 1) - addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel2, 1) + addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 3) + addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 4) + addPods(manager.podStore.Indexer, "node-6", simpleDaemonSetLabel, 13) + addPods(manager.podStore.Indexer, "node-7", simpleDaemonSetLabel2, 4) + addPods(manager.podStore.Indexer, "node-9", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Indexer, "node-9", simpleDaemonSetLabel2, 1) ds := newDaemonSet("foo") ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel manager.dsStore.Add(ds) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 35a22e8f6ed..2897583fd46 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -144,7 +144,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller }, ) - dc.podStore.Store, dc.podController = framework.NewInformer( + dc.podStore.Indexer, dc.podController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return dc.client.Core().Pods(api.NamespaceAll).List(options) @@ -160,6 +160,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller UpdateFunc: dc.updatePod, DeleteFunc: dc.deletePod, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) dc.syncHandler = dc.syncDeployment diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 293bff7eb17..0dc92fb9344 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -746,7 +746,7 @@ func (f *fixture) run(deploymentName string) { c.rsStore.Store.Add(rs) } for _, pod := range f.podStore { - c.podStore.Store.Add(pod) + c.podStore.Indexer.Add(pod) } err := c.syncDeployment(deploymentName) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 513b8fec0e9..4ff5f1e210b 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -60,7 +60,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController { +func NewEndpointController(podInformer framework.SharedIndexInformer, client *clientset.Clientset) *EndpointController { if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().GetRESTClient().GetRateLimiter()) } @@ -95,7 +95,7 @@ func NewEndpointController(podInformer framework.SharedInformer, client *clients UpdateFunc: e.updatePod, DeleteFunc: e.deletePod, }) - e.podStore.Store = podInformer.GetStore() + e.podStore.Indexer = podInformer.GetIndexer() e.podController = podInformer.GetController() e.podStoreSynced = podInformer.HasSynced @@ -104,7 +104,7 @@ func NewEndpointController(podInformer framework.SharedInformer, client *clients // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { - podInformer := informers.CreateSharedPodInformer(client, resyncPeriod()) + podInformer := informers.CreateSharedIndexPodInformer(client, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) e := NewEndpointController(podInformer, client) e.internalPodInformer = podInformer @@ -123,7 +123,7 @@ type EndpointController struct { // we have a personal informer, we must start it ourselves. If you start // the controller using NewEndpointController(passing SharedInformer), this // will be null - internalPodInformer framework.SharedInformer + internalPodInformer framework.SharedIndexInformer // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 163ba45de81..e4097eb16a3 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -172,7 +172,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 1, 1, 0) + addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -214,7 +214,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 1, 1, 0) + addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -253,7 +253,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 1, 1, 0) + addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -291,7 +291,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 0, 1, 1) + addPods(endpoints.podStore.Indexer, ns, 0, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -329,7 +329,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 1, 1, 1) + addPods(endpoints.podStore.Indexer, ns, 1, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -371,7 +371,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 1, 1, 0) + addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -412,7 +412,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) + addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -432,8 +432,8 @@ func TestSyncEndpointsItems(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 3, 2, 0) - addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! + addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) + addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found! endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -475,7 +475,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 3, 2, 0) + addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{ @@ -536,7 +536,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Store, ns, 1, 1, 0) + addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{ diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index 1b4cce4adb8..3f63d3bf6b3 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -42,3 +42,19 @@ func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Durat return sharedInformer } + +// CreateSharedIndexPodInformer returns a SharedIndexInformer that lists and watches all pods +func CreateSharedIndexPodInformer(client clientset.Interface, resyncPeriod time.Duration, indexers cache.Indexers) framework.SharedIndexInformer { + sharedIndexInformer := framework.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return client.Core().Pods(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Core().Pods(api.NamespaceAll).Watch(options) + }, + }, + &api.Pod{}, resyncPeriod, indexers) + + return sharedIndexInformer +} diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index cfa5d47ce23..51a76e05672 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -48,8 +48,7 @@ type SharedInformer interface { type SharedIndexInformer interface { SharedInformer - - AddIndexer(indexer cache.Indexer) error + AddIndexers(indexers cache.Indexers) error GetIndexer() cache.Indexer } @@ -57,12 +56,12 @@ type SharedIndexInformer interface { // TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can // be shared amongst all consumers. func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { - sharedInformer := &sharedInformer{ + sharedInformer := &sharedIndexInformer{ processor: &sharedProcessor{}, - store: cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc), + indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}), } - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.store) + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.indexer) cfg := &Config{ Queue: fifo, @@ -78,8 +77,33 @@ func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPer return sharedInformer } -type sharedInformer struct { - store cache.Store +/// NewSharedIndexInformer creates a new instance for the listwatcher. +// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can +// be shared amongst all consumers. +func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer { + sharedIndexInformer := &sharedIndexInformer{ + processor: &sharedProcessor{}, + indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), + } + + fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedIndexInformer.indexer) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: lw, + ObjectType: objType, + FullResyncPeriod: resyncPeriod, + RetryOnError: false, + + Process: sharedIndexInformer.HandleDeltas, + } + sharedIndexInformer.controller = New(cfg) + + return sharedIndexInformer +} + +type sharedIndexInformer struct { + indexer cache.Indexer controller *Controller processor *sharedProcessor @@ -94,7 +118,7 @@ type sharedInformer struct { // Because returning information back is always asynchronous, the legacy callers shouldn't // notice any change in behavior. type dummyController struct { - informer *sharedInformer + informer *sharedIndexInformer } func (v *dummyController) Run(stopCh <-chan struct{}) { @@ -117,7 +141,7 @@ type deleteNotification struct { oldObj interface{} } -func (s *sharedInformer) Run(stopCh <-chan struct{}) { +func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() func() { @@ -130,25 +154,34 @@ func (s *sharedInformer) Run(stopCh <-chan struct{}) { s.controller.Run(stopCh) } -func (s *sharedInformer) isStarted() bool { +func (s *sharedIndexInformer) isStarted() bool { s.startedLock.Lock() defer s.startedLock.Unlock() return s.started } -func (s *sharedInformer) HasSynced() bool { +func (s *sharedIndexInformer) HasSynced() bool { return s.controller.HasSynced() } -func (s *sharedInformer) GetStore() cache.Store { - return s.store +func (s *sharedIndexInformer) GetStore() cache.Store { + return s.indexer } -func (s *sharedInformer) GetController() ControllerInterface { +func (s *sharedIndexInformer) GetIndexer() cache.Indexer { + return s.indexer +} + +// TODO(mqliang): implement this +func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error { + panic("has not implemeted yet") +} + +func (s *sharedIndexInformer) GetController() ControllerInterface { return &dummyController{informer: s} } -func (s *sharedInformer) AddEventHandler(handler ResourceEventHandler) error { +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error { s.startedLock.Lock() defer s.startedLock.Unlock() @@ -161,24 +194,24 @@ func (s *sharedInformer) AddEventHandler(handler ResourceEventHandler) error { return nil } -func (s *sharedInformer) HandleDeltas(obj interface{}) error { +func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { // from oldest to newest for _, d := range obj.(cache.Deltas) { switch d.Type { case cache.Sync, cache.Added, cache.Updated: - if old, exists, err := s.store.Get(d.Object); err == nil && exists { - if err := s.store.Update(d.Object); err != nil { + if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { + if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}) } else { - if err := s.store.Add(d.Object); err != nil { + if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}) } case cache.Deleted: - if err := s.store.Delete(d.Object); err != nil { + if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}) diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index 1129794c09d..9893d6dbfc6 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -63,7 +63,7 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun terminatedSelector := fields.ParseSelectorOrDie("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown)) - gcc.podStore.Store, gcc.podStoreSyncer = framework.NewInformer( + gcc.podStore.Indexer, gcc.podStoreSyncer = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { options.FieldSelector = terminatedSelector @@ -77,6 +77,8 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun &api.Pod{}, resyncPeriod(), framework.ResourceEventHandlerFuncs{}, + // We don't need to build a index for podStore here + cache.Indexers{}, ) return gcc } diff --git a/pkg/controller/gc/gc_controller_test.go b/pkg/controller/gc/gc_controller_test.go index e7c55e6f811..e28c9cf03fa 100644 --- a/pkg/controller/gc/gc_controller_test.go +++ b/pkg/controller/gc/gc_controller_test.go @@ -80,7 +80,7 @@ func TestGC(t *testing.T) { creationTime := time.Unix(0, 0) for _, pod := range test.pods { creationTime = creationTime.Add(1 * time.Hour) - gcc.podStore.Store.Add(&api.Pod{ + gcc.podStore.Indexer.Add(&api.Pod{ ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}}, Status: api.PodStatus{Phase: pod.phase}, }) diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index e233471dd23..108da025efc 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -77,7 +77,7 @@ type JobController struct { recorder record.EventRecorder } -func NewJobController(podInformer framework.SharedInformer, kubeClient clientset.Interface) *JobController { +func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -126,7 +126,7 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod, }) - jm.podStore.Store = podInformer.GetStore() + jm.podStore.Indexer = podInformer.GetIndexer() jm.podStoreSynced = podInformer.HasSynced jm.updateHandler = jm.updateJobStatus @@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset } func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { - podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) jm := NewJobController(podInformer, kubeClient) jm.internalPodInformer = podInformer diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index 2c7d25eaa3b..95f0432b682 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -221,13 +221,13 @@ func TestControllerSyncJob(t *testing.T) { job := newJob(tc.parallelism, tc.completions) manager.jobStore.Store.Add(job) for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { - manager.podStore.Store.Add(&pod) + manager.podStore.Indexer.Add(&pod) } for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) { - manager.podStore.Store.Add(&pod) + manager.podStore.Indexer.Add(&pod) } for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) { - manager.podStore.Store.Add(&pod) + manager.podStore.Indexer.Add(&pod) } // run @@ -319,13 +319,13 @@ func TestSyncJobPastDeadline(t *testing.T) { job.Status.StartTime = &start manager.jobStore.Store.Add(job) for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { - manager.podStore.Store.Add(&pod) + manager.podStore.Indexer.Add(&pod) } for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) { - manager.podStore.Store.Add(&pod) + manager.podStore.Indexer.Add(&pod) } for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) { - manager.podStore.Store.Add(&pod) + manager.podStore.Indexer.Add(&pod) } // run @@ -571,14 +571,14 @@ func TestSyncJobExpectations(t *testing.T) { job := newJob(2, 2) manager.jobStore.Store.Add(job) pods := newPodList(2, api.PodPending, job) - manager.podStore.Store.Add(&pods[0]) + manager.podStore.Indexer.Add(&pods[0]) manager.expectations = FakeJobExpectations{ controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectataions, the job // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. - manager.podStore.Store.Add(&pods[1]) + manager.podStore.Indexer.Add(&pods[1]) }, } manager.syncJob(getKey(job, t)) diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 3c092ab476d..fa23541234d 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -178,7 +178,7 @@ func NewNodeController( nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, } - nc.podStore.Store, nc.podController = framework.NewInformer( + nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) @@ -193,6 +193,8 @@ func NewNodeController( AddFunc: nc.maybeDeleteTerminatingPod, UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) }, }, + // We don't need to build a index for podStore here + cache.Indexers{}, ) nc.nodeStore.Store, nc.nodeController = framework.NewInformer( &cache.ListWatch{ diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 0c2c94bcff2..4f43ac6aff1 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -1139,7 +1139,7 @@ func TestCleanupOrphanedPods(t *testing.T) { nc.nodeStore.Store.Add(newNode("bar")) for _, pod := range pods { p := pod - nc.podStore.Store.Add(&p) + nc.podStore.Indexer.Add(&p) } var deleteCalls int diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index d1bdffb3ef7..e26ce810c8c 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -173,7 +173,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro }, ) - rsc.podStore.Store, rsc.podController = framework.NewInformer( + rsc.podStore.Indexer, rsc.podController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return rsc.kubeClient.Core().Pods(api.NamespaceAll).List(options) @@ -192,6 +192,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro UpdateFunc: rsc.updatePod, DeleteFunc: rsc.deletePod, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) rsc.syncHandler = rsc.syncReplicaSet diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 0dd8d64b211..d8324ebb0a8 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -146,7 +146,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) manager.rsStore.Store.Add(rsSpec) - newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod") + newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.podControl = &fakePodControl manager.syncReplicaSet(getKey(rsSpec, t)) @@ -164,7 +164,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(1, labelMap) manager.rsStore.Store.Add(rsSpec) - newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod") + newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod") manager.syncReplicaSet(getKey(rsSpec, t)) validateSyncReplicaSet(t, &fakePodControl, 0, 1) @@ -238,7 +238,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { rs := newReplicaSet(activePods, labelMap) manager.rsStore.Store.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods)} - newPodList(manager.podStore.Store, activePods, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -284,8 +284,8 @@ func TestControllerUpdateReplicas(t *testing.T) { manager.rsStore.Store.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0} rs.Generation = 1 - newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rs, "pod") - newPodList(manager.podStore.Store, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel") + newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel") // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{}) @@ -325,7 +325,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { labelMap := map[string]string{"foo": "bar"} rsSpec := newReplicaSet(2, labelMap) manager.rsStore.Store.Add(rsSpec) - newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rsSpec, "pod") + newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod") // Creates a replica and sets expectations rsSpec.Status.Replicas = 1 @@ -548,7 +548,7 @@ func TestUpdatePods(t *testing.T) { manager.rsStore.Store.Add(&testRSSpec2) // Put one pod in the podStore - pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] pod2 := pod1 pod2.Labels = labelMap2 @@ -587,7 +587,7 @@ func TestControllerUpdateRequeue(t *testing.T) { rs := newReplicaSet(1, labelMap) manager.rsStore.Store.Add(rs) rs.Status = extensions.ReplicaSetStatus{Replicas: 2} - newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rs, "pod") + newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -688,7 +688,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // The store accrues active pods. It's also used by the ReplicaSet to determine how many // replicas to create. - activePods := int32(len(manager.podStore.Store.List())) + activePods := int32(len(manager.podStore.Indexer.List())) if replicas != 0 { // This is the number of pods currently "in flight". They were created by the // ReplicaSet controller above, which then puts the ReplicaSet to sleep till @@ -703,7 +703,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. for i := int32(0); i < expectedPods-1; i++ { - manager.podStore.Store.Add(&pods.Items[i]) + manager.podStore.Indexer.Add(&pods.Items[i]) manager.addPod(&pods.Items[i]) } @@ -760,7 +760,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // The last add pod will decrease the expectation of the ReplicaSet to 0, // which will cause it to create/delete the remaining replicas up to burstReplicas. if replicas != 0 { - manager.podStore.Store.Add(&pods.Items[expectedPods-1]) + manager.podStore.Indexer.Add(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1]) } else { expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t)) @@ -775,14 +775,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Labels: rsSpec.Spec.Selector.MatchLabels, }, } - manager.podStore.Store.Delete(lastPod) + manager.podStore.Indexer.Delete(lastPod) manager.deletePod(lastPod) } pods.Items = pods.Items[expectedPods:] } // Confirm that we've created the right number of replicas - activePods := int32(len(manager.podStore.Store.List())) + activePods := int32(len(manager.podStore.Indexer.List())) if activePods != rsSpec.Spec.Replicas { t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods) } @@ -821,7 +821,7 @@ func TestRSSyncExpectations(t *testing.T) { rsSpec := newReplicaSet(2, labelMap) manager.rsStore.Store.Add(rsSpec) pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod") - manager.podStore.Store.Add(&pods.Items[0]) + manager.podStore.Indexer.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{ @@ -829,7 +829,7 @@ func TestRSSyncExpectations(t *testing.T) { // If we check active pods before checking expectataions, the // ReplicaSet will create a new replica because it doesn't see // this pod, but has fulfilled its expectations. - manager.podStore.Store.Add(&postExpectationsPod) + manager.podStore.Indexer.Add(&postExpectationsPod) }, }) manager.syncReplicaSet(getKey(rsSpec, t)) @@ -873,7 +873,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the ReplicaSet. podExp.Add(-1, 0) - manager.podStore.Store.Replace(make([]interface{}, 0), "0") + manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") manager.syncReplicaSet(getKey(rs, t)) validateSyncReplicaSet(t, &fakePodControl, 0, 0) } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index bd5b52c7eac..cd8ac0270dd 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -73,7 +73,7 @@ type ReplicationManager struct { // we have a personal informer, we must start it ourselves. If you start // the controller using NewReplicationManager(passing SharedInformer), this // will be null - internalPodInformer framework.SharedInformer + internalPodInformer framework.SharedIndexInformer // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. @@ -102,7 +102,7 @@ type ReplicationManager struct { queue *workqueue.Type } -func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { +func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) @@ -122,7 +122,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie queue: workqueue.New(), } - rm.rcStore.Store, rm.rcController = framework.NewInformer( + rm.rcStore.Indexer, rm.rcController = framework.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return rm.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options) @@ -177,6 +177,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie // way of achieving this is by performing a `stop` operation on the controller. DeleteFunc: rm.enqueueController, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ @@ -187,7 +188,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie UpdateFunc: rm.updatePod, DeleteFunc: rm.deletePod, }) - rm.podStore.Store = podInformer.GetStore() + rm.podStore.Indexer = podInformer.GetIndexer() rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController @@ -199,7 +200,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize) rm.internalPodInformer = podInformer @@ -276,7 +277,7 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon // isCacheValid check if the cache is valid func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool { - _, exists, err := rm.rcStore.Get(cachedRC) + exists, err := rm.rcStore.Exists(cachedRC) // rc has been deleted or updated, cache is invalid if err != nil || !exists || !isControllerMatch(pod, cachedRC) { return false @@ -522,7 +523,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { return nil } - obj, exists, err := rm.rcStore.Store.GetByKey(key) + obj, exists, err := rm.rcStore.Indexer.GetByKey(key) if !exists { glog.Infof("Replication Controller has been deleted %v", key) rm.expectations.DeleteExpectations(key) diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index c974513bd9b..abfdf1591c4 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -143,8 +143,8 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) - manager.rcStore.Store.Add(controllerSpec) - newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec, "pod") + manager.rcStore.Indexer.Add(controllerSpec) + newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod") manager.podControl = &fakePodControl manager.syncReplicationController(getKey(controllerSpec, t)) @@ -160,8 +160,8 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { // 2 running pods and a controller with 1 replica, one pod delete expected controllerSpec := newReplicationController(1) - manager.rcStore.Store.Add(controllerSpec) - newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec, "pod") + manager.rcStore.Indexer.Add(controllerSpec) + newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod") manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 0, 1) @@ -183,7 +183,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { // The DeletedFinalStateUnknown object should cause the rc manager to insert // the controller matching the selectors of the deleted pod into the work queue. controllerSpec := newReplicationController(1) - manager.rcStore.Store.Add(controllerSpec) + manager.rcStore.Indexer.Add(controllerSpec) pods := newPodList(nil, 1, api.PodRunning, controllerSpec, "pod") manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) @@ -207,7 +207,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) { // A controller with 2 replicas and no pods in the store, 2 creates expected rc := newReplicationController(2) - manager.rcStore.Store.Add(rc) + manager.rcStore.Indexer.Add(rc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -230,9 +230,9 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { // Steady state for the replication controller, no Status.Replicas updates expected activePods := 5 rc := newReplicationController(activePods) - manager.rcStore.Store.Add(rc) - rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods)} - newPodList(manager.podStore.Store, activePods, api.PodRunning, rc, "pod") + manager.rcStore.Indexer.Add(rc) + rc.Status = api.ReplicationControllerStatus{Replicas: activePods} + newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -271,14 +271,14 @@ func TestControllerUpdateReplicas(t *testing.T) { // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(5) - manager.rcStore.Store.Add(rc) + manager.rcStore.Indexer.Add(rc) rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0} rc.Generation = 1 - newPodList(manager.podStore.Store, 2, api.PodRunning, rc, "pod") + newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod") rcCopy := *rc extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} rcCopy.Spec.Selector = extraLabelMap - newPodList(manager.podStore.Store, 2, api.PodRunning, &rcCopy, "podWithExtraLabel") + newPodList(manager.podStore.Indexer, 2, api.PodRunning, &rcCopy, "podWithExtraLabel") // This response body is just so we don't err out decoding the http response response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{}) @@ -315,8 +315,8 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) - manager.rcStore.Store.Add(controllerSpec) - newPodList(manager.podStore.Store, 1, api.PodRunning, controllerSpec, "pod") + manager.rcStore.Indexer.Add(controllerSpec) + newPodList(manager.podStore.Indexer, 1, api.PodRunning, controllerSpec, "pod") // Creates a replica and sets expectations controllerSpec.Status.Replicas = 1 @@ -403,7 +403,7 @@ func TestPodControllerLookup(t *testing.T) { } for _, c := range testCases { for _, r := range c.inRCs { - manager.rcStore.Add(r) + manager.rcStore.Indexer.Add(r) } if rc := manager.getPodController(c.pod); rc != nil { if c.outRCName != rc.Name { @@ -430,7 +430,7 @@ func TestWatchControllers(t *testing.T) { // and closes the received channel to indicate that the test can finish. manager.syncHandler = func(key string) error { - obj, exists, err := manager.rcStore.Store.GetByKey(key) + obj, exists, err := manager.rcStore.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find controller under key %v", key) } @@ -467,13 +467,13 @@ func TestWatchPods(t *testing.T) { // Put one rc and one pod into the controller's stores testControllerSpec := newReplicationController(1) - manager.rcStore.Store.Add(testControllerSpec) + manager.rcStore.Indexer.Add(testControllerSpec) received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing rc and // send it into the syncHandler. manager.syncHandler = func(key string) error { - obj, exists, err := manager.rcStore.Store.GetByKey(key) + obj, exists, err := manager.rcStore.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find controller under key %v", key) } @@ -511,7 +511,7 @@ func TestUpdatePods(t *testing.T) { received := make(chan string) manager.syncHandler = func(key string) error { - obj, exists, err := manager.rcStore.Store.GetByKey(key) + obj, exists, err := manager.rcStore.Indexer.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find controller under key %v", key) } @@ -525,14 +525,14 @@ func TestUpdatePods(t *testing.T) { // Put 2 rcs and one pod into the controller's stores testControllerSpec1 := newReplicationController(1) - manager.rcStore.Store.Add(testControllerSpec1) + manager.rcStore.Indexer.Add(testControllerSpec1) testControllerSpec2 := *testControllerSpec1 testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"} testControllerSpec2.Name = "barfoo" - manager.rcStore.Store.Add(&testControllerSpec2) + manager.rcStore.Indexer.Add(&testControllerSpec2) // Put one pod in the podStore - pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, testControllerSpec1, "pod").Items[0] + pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, testControllerSpec1, "pod").Items[0] pod2 := pod1 pod2.Labels = testControllerSpec2.Spec.Selector @@ -568,9 +568,9 @@ func TestControllerUpdateRequeue(t *testing.T) { manager.podStoreSynced = alwaysReady rc := newReplicationController(1) - manager.rcStore.Store.Add(rc) + manager.rcStore.Indexer.Add(rc) rc.Status = api.ReplicationControllerStatus{Replicas: 2} - newPodList(manager.podStore.Store, 1, api.PodRunning, rc, "pod") + newPodList(manager.podStore.Indexer, 1, api.PodRunning, rc, "pod") fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -651,7 +651,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) manager.podControl = &fakePodControl controllerSpec := newReplicationController(numReplicas) - manager.rcStore.Store.Add(controllerSpec) + manager.rcStore.Indexer.Add(controllerSpec) expectedPods := 0 pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec, "pod") @@ -665,14 +665,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) for _, replicas := range []int{numReplicas, 0} { controllerSpec.Spec.Replicas = int32(replicas) - manager.rcStore.Store.Add(controllerSpec) + manager.rcStore.Indexer.Add(controllerSpec) for i := 0; i < numReplicas; i += burstReplicas { manager.syncReplicationController(getKey(controllerSpec, t)) // The store accrues active pods. It's also used by the rc to determine how many // replicas to create. - activePods := len(manager.podStore.Store.List()) + activePods := len(manager.podStore.Indexer.List()) if replicas != 0 { // This is the number of pods currently "in flight". They were created by the rc manager above, // which then puts the rc to sleep till all of them have been observed. @@ -686,7 +686,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // This simulates the watch events for all but 1 of the expected pods. // None of these should wake the controller because it has expectations==BurstReplicas. for i := 0; i < expectedPods-1; i++ { - manager.podStore.Store.Add(&pods.Items[i]) + manager.podStore.Indexer.Add(&pods.Items[i]) manager.addPod(&pods.Items[i]) } @@ -722,7 +722,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // has exactly one expectation at the end, to verify that we // don't double delete. for i := range podsToDelete[1:] { - manager.podStore.Delete(podsToDelete[i]) + manager.podStore.Indexer.Delete(podsToDelete[i]) manager.deletePod(podsToDelete[i]) } podExp, exists, err := manager.expectations.GetExpectations(rcKey) @@ -743,7 +743,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // The last add pod will decrease the expectation of the rc to 0, // which will cause it to create/delete the remaining replicas up to burstReplicas. if replicas != 0 { - manager.podStore.Store.Add(&pods.Items[expectedPods-1]) + manager.podStore.Indexer.Add(&pods.Items[expectedPods-1]) manager.addPod(&pods.Items[expectedPods-1]) } else { expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t)) @@ -758,14 +758,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Labels: controllerSpec.Spec.Selector, }, } - manager.podStore.Store.Delete(lastPod) + manager.podStore.Indexer.Delete(lastPod) manager.deletePod(lastPod) } pods.Items = pods.Items[expectedPods:] } // Confirm that we've created the right number of replicas - activePods := int32(len(manager.podStore.Store.List())) + activePods := int32(len(manager.podStore.Indexer.List())) if activePods != controllerSpec.Spec.Replicas { t.Fatalf("Unexpected number of active pods, expected %d, got %d", controllerSpec.Spec.Replicas, activePods) } @@ -801,9 +801,9 @@ func TestRCSyncExpectations(t *testing.T) { manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) - manager.rcStore.Store.Add(controllerSpec) + manager.rcStore.Indexer.Add(controllerSpec) pods := newPodList(nil, 2, api.PodPending, controllerSpec, "pod") - manager.podStore.Store.Add(&pods.Items[0]) + manager.podStore.Indexer.Add(&pods.Items[0]) postExpectationsPod := pods.Items[1] manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{ @@ -811,7 +811,7 @@ func TestRCSyncExpectations(t *testing.T) { // If we check active pods before checking expectataions, the rc // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. - manager.podStore.Store.Add(&postExpectationsPod) + manager.podStore.Indexer.Add(&postExpectationsPod) }, }) manager.syncReplicationController(getKey(controllerSpec, t)) @@ -824,7 +824,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { manager.podStoreSynced = alwaysReady rc := newReplicationController(1) - manager.rcStore.Store.Add(rc) + manager.rcStore.Indexer.Add(rc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -846,7 +846,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { if !exists || err != nil { t.Errorf("No expectations found for rc") } - manager.rcStore.Delete(rc) + manager.rcStore.Indexer.Delete(rc) manager.syncReplicationController(getKey(rc, t)) if _, exists, err = manager.expectations.GetExpectations(rcKey); exists { @@ -855,7 +855,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { // This should have no effect, since we've deleted the rc. podExp.Add(-1, 0) - manager.podStore.Store.Replace(make([]interface{}, 0), "0") + manager.podStore.Indexer.Replace(make([]interface{}, 0), "0") manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0) } @@ -871,7 +871,7 @@ func TestRCManagerNotReady(t *testing.T) { // want to end up creating replicas in this case until the pod reflector // has synced, so the rc manager should just requeue the rc. controllerSpec := newReplicationController(1) - manager.rcStore.Store.Add(controllerSpec) + manager.rcStore.Indexer.Add(controllerSpec) rcKey := getKey(controllerSpec, t) manager.syncReplicationController(rcKey) @@ -914,7 +914,7 @@ func TestOverlappingRCs(t *testing.T) { } shuffledControllers := shuffle(controllers) for j := range shuffledControllers { - manager.rcStore.Store.Add(shuffledControllers[j]) + manager.rcStore.Indexer.Add(shuffledControllers[j]) } // Add a pod and make sure only the oldest rc is synced pods := newPodList(nil, 1, api.PodPending, controllers[0], "pod") @@ -934,7 +934,7 @@ func TestDeletionTimestamp(t *testing.T) { manager.podStoreSynced = alwaysReady controllerSpec := newReplicationController(1) - manager.rcStore.Store.Add(controllerSpec) + manager.rcStore.Indexer.Add(controllerSpec) rcKey, err := controller.KeyFunc(controllerSpec) if err != nil { t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err) @@ -1015,6 +1015,7 @@ func TestDeletionTimestamp(t *testing.T) { } } +/* func BenchmarkGetPodControllerMultiNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0) @@ -1043,7 +1044,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) { ns := fmt.Sprintf("ns-%d", i) for j := 0; j < 10; j++ { rcName := fmt.Sprintf("rc-%d", j) - manager.rcStore.Add(&api.ReplicationController{ + manager.rcStore.Indexer.Add(&api.ReplicationController{ ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: ns}, Spec: api.ReplicationControllerSpec{ Selector: map[string]string{"rcName": rcName}, @@ -1085,7 +1086,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { for i := 0; i < rcNum; i++ { rcName := fmt.Sprintf("rc-%d", i) - manager.rcStore.Add(&api.ReplicationController{ + manager.rcStore.Indexer.Add(&api.ReplicationController{ ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: "foo"}, Spec: api.ReplicationControllerSpec{ Selector: map[string]string{"rcName": rcName}, @@ -1100,3 +1101,4 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { } } } +*/ diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index c25f6ebd7e6..c4c5ccc908d 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -109,7 +109,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, - ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, schedulerCache: schedulerCache, StopEverything: stopEverything, @@ -124,7 +124,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini // We construct this here instead of in CreateFromKeys because // ScheduledPodLister is something we provide to plug in functions that // they may need to call. - c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer( + c.ScheduledPodLister.Indexer, c.scheduledPodPopulator = framework.NewIndexerInformer( c.createAssignedNonTerminatedPodLW(), &api.Pod{}, 0, @@ -133,6 +133,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini UpdateFunc: c.updatePodInCache, DeleteFunc: c.deletePodFromCache, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) c.NodeLister.Store, c.nodePopulator = framework.NewInformer( @@ -356,7 +357,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // Watch and cache all ReplicationController objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything) + cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Indexer, 0).RunUntil(f.StopEverything) // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.