diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 4eef19c8815..f06923366a5 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -55,7 +55,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController { client: client, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), } - e.serviceStore.Store, e.serviceController = cache.NewInformer( + e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return e.client.Core().Services(api.NamespaceAll).List(options) @@ -73,6 +73,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController { }, DeleteFunc: e.enqueueService, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) e.podStore.Indexer, e.podController = cache.NewIndexerInformer( @@ -262,7 +263,7 @@ func (e *endpointController) syncService(key string) error { defer func() { glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := e.serviceStore.Store.GetByKey(key) + obj, exists, err := e.serviceStore.Indexer.GetByKey(key) if err != nil || !exists { // Delete the corresponding endpoint, as the service has been deleted. // TODO: Please note that this will delete an endpoint when a diff --git a/federation/pkg/federation-controller/service/cluster_helper.go b/federation/pkg/federation-controller/service/cluster_helper.go index 90168d2dd7c..516851d580f 100644 --- a/federation/pkg/federation-controller/service/cluster_helper.go +++ b/federation/pkg/federation-controller/service/cluster_helper.go @@ -114,7 +114,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa }, ) - cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = cache.NewInformer( + cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { return clientset.Core().Services(v1.NamespaceAll).List(options) @@ -149,6 +149,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName) }, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) cc.clientMap[clusterName] = cachedClusterClient go cachedClusterClient.serviceController.Run(wait.NeverStop) diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 7ccd844b980..bb0178afffd 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -63,7 +63,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache // if serviceCache does not exists, that means the service is not created by federation, we should skip it return nil } - serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key) + serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key) if err != nil { glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err) clusterCache.serviceQueue.Add(key) diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index cb4757467e3..7baefaf67ab 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -144,7 +144,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte queue: workqueue.New(), knownClusterSet: make(sets.String), } - s.serviceStore.Store, s.serviceController = cache.NewInformer( + s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { return s.federationClient.Core().Services(v1.NamespaceAll).List(options) @@ -165,6 +165,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte }, DeleteFunc: s.enqueueService, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) s.clusterStore.Store, s.clusterController = cache.NewInformer( &cache.ListWatch{ @@ -816,7 +817,7 @@ func (s *ServiceController) syncService(key string) error { glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) }() // obj holds the latest service info from apiserver - obj, exists, err := s.serviceStore.Store.GetByKey(key) + obj, exists, err := s.serviceStore.Indexer.GetByKey(key) if err != nil { glog.Errorf("Unable to retrieve service %v from store: %v", key, err) s.queue.Add(key) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 849e39cef1e..6bdf3e3b7d5 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/labels" ) +// AppendFunc is used to add a matching item to whatever list the caller is using type AppendFunc func(interface{}) func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error { @@ -136,116 +137,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) { return } -// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. -type StoreToReplicationControllerLister struct { - Indexer -} - -// Exists checks if the given rc exists in the store. -func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) { - _, exists, err := s.Indexer.Get(controller) - if err != nil { - return false, err - } - return exists, nil -} - -// StoreToReplicationControllerLister lists all controllers in the store. -// TODO: converge on the interface in pkg/client -func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) { - for _, c := range s.Indexer.List() { - controllers = append(controllers, *(c.(*api.ReplicationController))) - } - return controllers, nil -} - -func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer { - return storeReplicationControllersNamespacer{s.Indexer, namespace} -} - -type storeReplicationControllersNamespacer struct { - indexer Indexer - namespace string -} - -func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) { - controllers := []api.ReplicationController{} - - if s.namespace == api.NamespaceAll { - for _, m := range s.indexer.List() { - rc := *(m.(*api.ReplicationController)) - if selector.Matches(labels.Set(rc.Labels)) { - controllers = append(controllers, rc) - } - } - return controllers, nil - } - - key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} - items, err := s.indexer.Index(NamespaceIndex, key) - if err != nil { - // Ignore error; do slow search without index. - glog.Warningf("can not retrieve list of objects using index : %v", err) - for _, m := range s.indexer.List() { - rc := *(m.(*api.ReplicationController)) - if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) { - controllers = append(controllers, rc) - } - } - return controllers, nil - } - for _, m := range items { - rc := *(m.(*api.ReplicationController)) - if selector.Matches(labels.Set(rc.Labels)) { - controllers = append(controllers, rc) - } - } - return controllers, nil -} - -func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) { - obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) - if err != nil { - return nil, err - } - if !exists { - return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name) - } - return obj.(*api.ReplicationController), nil -} - -// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. -func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { - var selector labels.Selector - var rc api.ReplicationController - - if len(pod.Labels) == 0 { - err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) - return - } - - key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}} - items, err := s.Indexer.Index(NamespaceIndex, key) - if err != nil { - return - } - - for _, m := range items { - rc = *m.(*api.ReplicationController) - selector = labels.Set(rc.Spec.Selector).AsSelectorPreValidated() - - // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - continue - } - controllers = append(controllers, rc) - } - if len(controllers) == 0 { - err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return -} - // StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. type StoreToDeploymentLister struct { Indexer @@ -518,47 +409,6 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex return } -// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface -// The Store must contain (only) Services. -type StoreToServiceLister struct { - Store -} - -func (s *StoreToServiceLister) List() (services api.ServiceList, err error) { - for _, m := range s.Store.List() { - services.Items = append(services.Items, *(m.(*api.Service))) - } - return services, nil -} - -// TODO: Move this back to scheduler as a helper function that takes a Store, -// rather than a method of StoreToServiceLister. -func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) { - var selector labels.Selector - var service api.Service - - for _, m := range s.Store.List() { - service = *m.(*api.Service) - // consider only services that are in the same namespace as the pod - if service.Namespace != pod.Namespace { - continue - } - if service.Spec.Selector == nil { - // services with nil selectors match nothing, not everything. - continue - } - selector = labels.Set(service.Spec.Selector).AsSelectorPreValidated() - if selector.Matches(labels.Set(pod.Labels)) { - services = append(services, service) - } - } - if len(services) == 0 { - err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - - return -} - // StoreToEndpointsLister makes a Store that lists endpoints. type StoreToEndpointsLister struct { Store diff --git a/pkg/client/cache/listers_core.go b/pkg/client/cache/listers_core.go index 851c8834a10..ed766072592 100644 --- a/pkg/client/cache/listers_core.go +++ b/pkg/client/cache/listers_core.go @@ -17,6 +17,8 @@ limitations under the License. package cache import ( + "fmt" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/labels" @@ -25,24 +27,24 @@ import ( // TODO: generate these classes and methods for all resources of interest using // a script. Can use "go generate" once 1.4 is supported by all users. -// StoreToPodLister makes a Store have the List method of the client.PodInterface -// The Store must contain (only) Pods. -// +// Lister makes an Index have the List method. The Stores must contain only the expected type // Example: // s := cache.NewStore() // lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} // r := cache.NewReflector(lw, &api.Pod{}, s).Run() // l := StoreToPodLister{s} // l.List() + +// StoreToPodLister helps list pods type StoreToPodLister struct { Indexer Indexer } -func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { +func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api.Pod, err error) { err = ListAll(s.Indexer, selector, func(m interface{}) { - pods = append(pods, m.(*api.Pod)) + ret = append(ret, m.(*api.Pod)) }) - return pods, err + return ret, err } func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { @@ -54,11 +56,11 @@ type storePodsNamespacer struct { namespace string } -func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) { +func (s storePodsNamespacer) List(selector labels.Selector) (ret []*api.Pod, err error) { err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { - pods = append(pods, m.(*api.Pod)) + ret = append(ret, m.(*api.Pod)) }) - return pods, err + return ret, err } func (s storePodsNamespacer) Get(name string) (*api.Pod, error) { @@ -71,3 +73,133 @@ func (s storePodsNamespacer) Get(name string) (*api.Pod, error) { } return obj.(*api.Pod), nil } + +// StoreToReplicationControllerLister helps list rcs +type StoreToReplicationControllerLister struct { + Indexer Indexer +} + +func (s *StoreToReplicationControllerLister) List(selector labels.Selector) (ret []*api.ReplicationController, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*api.ReplicationController)) + }) + return ret, err +} + +func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer { + return storeReplicationControllersNamespacer{s.Indexer, namespace} +} + +type storeReplicationControllersNamespacer struct { + indexer Indexer + namespace string +} + +func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (ret []*api.ReplicationController, err error) { + err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*api.ReplicationController)) + }) + return ret, err +} + +func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name) + } + return obj.(*api.ReplicationController), nil +} + +// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. +func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) + return + } + + key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}} + items, err := s.Indexer.Index(NamespaceIndex, key) + if err != nil { + return + } + + for _, m := range items { + rc := m.(*api.ReplicationController) + selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated() + + // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + controllers = append(controllers, rc) + } + if len(controllers) == 0 { + err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + +// StoreToServiceLister helps list services +type StoreToServiceLister struct { + Indexer Indexer +} + +func (s *StoreToServiceLister) List(selector labels.Selector) (ret []*api.Service, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*api.Service)) + }) + return ret, err +} + +func (s *StoreToServiceLister) Services(namespace string) storeServicesNamespacer { + return storeServicesNamespacer{s.Indexer, namespace} +} + +type storeServicesNamespacer struct { + indexer Indexer + namespace string +} + +func (s storeServicesNamespacer) List(selector labels.Selector) (ret []*api.Service, err error) { + err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*api.Service)) + }) + return ret, err +} + +func (s storeServicesNamespacer) Get(name string) (*api.Service, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("service"), name) + } + return obj.(*api.Service), nil +} + +// TODO: Move this back to scheduler as a helper function that takes a Store, +// rather than a method of StoreToServiceLister. +func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) { + allServices, err := s.Services(pod.Namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + for i := range allServices { + service := allServices[i] + if service.Spec.Selector == nil { + // services with nil selectors match nothing, not everything. + continue + } + selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated() + if selector.Matches(labels.Set(pod.Labels)) { + services = append(services, service) + } + } + + return services, nil +} diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 0c91e5ac21f..36223eabd96 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -128,7 +128,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { testCases := []struct { description string inRCs []*api.ReplicationController - list func(StoreToReplicationControllerLister) ([]api.ReplicationController, error) + list func(StoreToReplicationControllerLister) ([]*api.ReplicationController, error) outRCNames sets.String expectErr bool onlyIfIndexedByNamespace bool @@ -143,7 +143,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"}, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { return lister.ReplicationControllers(api.NamespaceAll).List(labels.Set{}.AsSelectorPreValidated()) }, outRCNames: sets.NewString("hmm", "foo"), @@ -158,7 +158,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"}, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { return lister.ReplicationControllers("hmm").List(labels.Set{}.AsSelectorPreValidated()) }, outRCNames: sets.NewString("hmm"), @@ -168,8 +168,8 @@ func TestStoreToReplicationControllerLister(t *testing.T) { inRCs: []*api.ReplicationController{ {ObjectMeta: api.ObjectMeta{Name: "basic"}}, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { - return lister.List() + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { + return lister.List(labels.Everything()) }, outRCNames: sets.NewString("basic"), }, @@ -183,7 +183,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { }, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"}, } @@ -199,7 +199,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -228,7 +228,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { }, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -754,7 +754,7 @@ func TestStoreToPodLister(t *testing.T) { } func TestStoreToServiceLister(t *testing.T) { - store := NewStore(MetaNamespaceKeyFunc) + store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc}) store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 21c7fc8257c..30879f0e7bb 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -80,7 +80,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), } - e.serviceStore.Store, e.serviceController = cache.NewInformer( + e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return e.client.Core().Services(api.NamespaceAll).List(options) @@ -99,6 +99,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client }, DeleteFunc: e.enqueueService, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -335,7 +336,7 @@ func (e *EndpointController) syncService(key string) error { glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := e.serviceStore.Store.GetByKey(key) + obj, exists, err := e.serviceStore.Indexer.GetByKey(key) if err != nil || !exists { // Delete the corresponding endpoint, as the service has been deleted. // TODO: Please note that this will delete an endpoint when a diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 53451e072f2..0e479636202 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -110,7 +110,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, }) @@ -174,7 +174,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{}, @@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{}, @@ -255,7 +255,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{}, @@ -293,7 +293,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 0, 1, 1) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{}, @@ -331,7 +331,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 1, 1, 1) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{}, @@ -373,7 +373,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, @@ -414,7 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0) - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, @@ -435,7 +435,7 @@ func TestSyncEndpointsItems(t *testing.T) { endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found! - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, @@ -478,7 +478,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{ Name: "foo", Namespace: ns, @@ -539,7 +539,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} - endpoints.serviceStore.Store.Add(&api.Service{ + endpoints.serviceStore.Indexer.Add(&api.Service{ ObjectMeta: api.ObjectMeta{ Name: "foo", Namespace: ns, diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index fafbad77e11..121ccb4e784 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -269,16 +269,16 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon } // update lookup cache - rm.lookupCache.Update(pod, &controllers[0]) + rm.lookupCache.Update(pod, controllers[0]) - return &controllers[0] + return controllers[0] } // isCacheValid check if the cache is valid func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool { - exists, err := rm.rcStore.Exists(cachedRC) + _, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name) // rc has been deleted or updated, cache is invalid - if err != nil || !exists || !isControllerMatch(pod, cachedRC) { + if err != nil || !isControllerMatch(pod, cachedRC) { return false } return true diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index e7e22795a06..a5ea91304da 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -71,7 +71,7 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, } // OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. -type OverlappingControllers []api.ReplicationController +type OverlappingControllers []*api.ReplicationController func (o OverlappingControllers) Len() int { return len(o) } func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 8af49cef8e5..f4010107a92 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -119,7 +119,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN }, workingQueue: workqueue.NewDelayingQueue(), } - s.serviceStore.Store, s.serviceController = cache.NewInformer( + s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { return s.kubeClient.Core().Services(api.NamespaceAll).List(options) @@ -141,6 +141,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN }, DeleteFunc: s.enqueueService, }, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) if err := s.init(); err != nil { return nil, err @@ -724,7 +725,7 @@ func (s *ServiceController) syncService(key string) error { glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) }() // obj holds the latest service info from apiserver - obj, exists, err := s.serviceStore.Store.GetByKey(key) + obj, exists, err := s.serviceStore.Indexer.GetByKey(key) if err != nil { glog.Infof("Unable to retrieve service %v from store: %v", key, err) s.workingQueue.Add(key) diff --git a/pkg/kubelet/envvars/envvars.go b/pkg/kubelet/envvars/envvars.go index 186a0a87c81..883acf61943 100644 --- a/pkg/kubelet/envvars/envvars.go +++ b/pkg/kubelet/envvars/envvars.go @@ -27,10 +27,10 @@ import ( // FromServices builds environment variables that a container is started with, // which tell the container where to find the services it may need, which are // provided as an argument. -func FromServices(services *api.ServiceList) []api.EnvVar { +func FromServices(services []*api.Service) []api.EnvVar { var result []api.EnvVar - for i := range services.Items { - service := &services.Items[i] + for i := range services { + service := services[i] // ignore services where ClusterIP is "None" or empty // the services passed to this method should be pre-filtered diff --git a/pkg/kubelet/envvars/envvars_test.go b/pkg/kubelet/envvars/envvars_test.go index 9818741be33..9a3876b6f88 100644 --- a/pkg/kubelet/envvars/envvars_test.go +++ b/pkg/kubelet/envvars/envvars_test.go @@ -25,63 +25,61 @@ import ( ) func TestFromServices(t *testing.T) { - sl := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo-bar"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - ClusterIP: "1.2.3.4", - Ports: []api.ServicePort{ - {Port: 8080, Protocol: "TCP"}, - }, + sl := []*api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "foo-bar"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + ClusterIP: "1.2.3.4", + Ports: []api.ServicePort{ + {Port: 8080, Protocol: "TCP"}, }, }, - { - ObjectMeta: api.ObjectMeta{Name: "abc-123"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - ClusterIP: "5.6.7.8", - Ports: []api.ServicePort{ - {Name: "u-d-p", Port: 8081, Protocol: "UDP"}, - {Name: "t-c-p", Port: 8081, Protocol: "TCP"}, - }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "abc-123"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + ClusterIP: "5.6.7.8", + Ports: []api.ServicePort{ + {Name: "u-d-p", Port: 8081, Protocol: "UDP"}, + {Name: "t-c-p", Port: 8081, Protocol: "TCP"}, }, }, - { - ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - ClusterIP: "9.8.7.6", - Ports: []api.ServicePort{ - {Port: 8082, Protocol: "TCP"}, - {Name: "8083", Port: 8083, Protocol: "TCP"}, - }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + ClusterIP: "9.8.7.6", + Ports: []api.ServicePort{ + {Port: 8082, Protocol: "TCP"}, + {Name: "8083", Port: 8083, Protocol: "TCP"}, }, }, - { - ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - ClusterIP: "None", - Ports: []api.ServicePort{ - {Port: 8082, Protocol: "TCP"}, - }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + ClusterIP: "None", + Ports: []api.ServicePort{ + {Port: 8082, Protocol: "TCP"}, }, }, - { - ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - ClusterIP: "", - Ports: []api.ServicePort{ - {Port: 8082, Protocol: "TCP"}, - }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + ClusterIP: "", + Ports: []api.ServicePort{ + {Port: 8082, Protocol: "TCP"}, }, }, }, } - vars := envvars.FromServices(&sl) + vars := envvars.FromServices(sl) expected := []api.EnvVar{ {Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"}, {Name: "FOO_BAR_SERVICE_PORT", Value: "8080"}, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5d1286e4170..1d50e96e61c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -77,6 +77,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/volumemanager" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/security/apparmor" "k8s.io/kubernetes/pkg/types" @@ -371,7 +372,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub dockerExecHandler = &dockertools.NativeExecHandler{} } - serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) + serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if kubeClient != nil { // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // than an interface. There is no way to construct a list+watcher using resource name. @@ -385,7 +386,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run() } - serviceLister := &cache.StoreToServiceLister{Store: serviceStore} + serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore} nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) if kubeClient != nil { @@ -777,7 +778,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub } type serviceLister interface { - List() (api.ServiceList, error) + List(labels.Selector) ([]*api.Service, error) } type nodeLister interface { @@ -1447,7 +1448,7 @@ var masterServices = sets.NewString("kubernetes") // pod in namespace ns should see. func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) { var ( - serviceMap = make(map[string]api.Service) + serviceMap = make(map[string]*api.Service) m = make(map[string]string) ) @@ -1457,15 +1458,16 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) { // Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars. return m, nil } - services, err := kl.serviceLister.List() + services, err := kl.serviceLister.List(labels.Everything()) if err != nil { return m, fmt.Errorf("failed to list services when setting up env vars.") } // project the services in namespace ns onto the master services - for _, service := range services.Items { + for i := range services { + service := services[i] // ignore services where ClusterIP is "None" or empty - if !api.IsServiceIPSet(&service) { + if !api.IsServiceIPSet(service) { continue } serviceName := service.Name @@ -1485,12 +1487,13 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) { } } } - services.Items = []api.Service{} - for _, service := range serviceMap { - services.Items = append(services.Items, service) + + mappedServices := []*api.Service{} + for key := range serviceMap { + mappedServices = append(mappedServices, serviceMap[key]) } - for _, e := range envvars.FromServices(&services) { + for _, e := range envvars.FromServices(mappedServices) { m[e.Name] = e.Value } return m, nil diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 92def577369..9ca42a117e6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -61,6 +61,7 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/clock" @@ -896,13 +897,11 @@ func TestDNSConfigurationParams(t *testing.T) { } type testServiceLister struct { - services []api.Service + services []*api.Service } -func (ls testServiceLister) List() (api.ServiceList, error) { - return api.ServiceList{ - Items: ls.services, - }, nil +func (ls testServiceLister) List(labels.Selector) ([]*api.Service, error) { + return ls.services, nil } type testNodeLister struct { @@ -938,8 +937,8 @@ func (e envs) Swap(i, j int) { e[i], e[j] = e[j], e[i] } func (e envs) Less(i, j int) bool { return e[i].Name < e[j].Name } -func buildService(name, namespace, clusterIP, protocol string, port int) api.Service { - return api.Service{ +func buildService(name, namespace, clusterIP, protocol string, port int) *api.Service { + return &api.Service{ ObjectMeta: api.ObjectMeta{Name: name, Namespace: namespace}, Spec: api.ServiceSpec{ Ports: []api.ServicePort{{ @@ -952,7 +951,7 @@ func buildService(name, namespace, clusterIP, protocol string, port int) api.Ser } func TestMakeEnvironmentVariables(t *testing.T) { - services := []api.Service{ + services := []*api.Service{ buildService("kubernetes", api.NamespaceDefault, "1.2.3.1", "TCP", 8081), buildService("test", "test1", "1.2.3.3", "TCP", 8083), buildService("kubernetes", "test2", "1.2.3.4", "TCP", 8084), diff --git a/plugin/pkg/scheduler/algorithm/listers.go b/plugin/pkg/scheduler/algorithm/listers.go index 7887614535d..066fe305997 100644 --- a/plugin/pkg/scheduler/algorithm/listers.go +++ b/plugin/pkg/scheduler/algorithm/listers.go @@ -63,24 +63,25 @@ func (f FakePodLister) List(s labels.Selector) (selected []*api.Pod, err error) // ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. type ServiceLister interface { // Lists all the services - List() (api.ServiceList, error) + List(labels.Selector) ([]*api.Service, error) // Gets the services for the given pod - GetPodServices(*api.Pod) ([]api.Service, error) + GetPodServices(*api.Pod) ([]*api.Service, error) } // FakeServiceLister implements ServiceLister on []api.Service for test purposes. -type FakeServiceLister []api.Service +type FakeServiceLister []*api.Service // List returns api.ServiceList, the list of all services. -func (f FakeServiceLister) List() (api.ServiceList, error) { - return api.ServiceList{Items: f}, nil +func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) { + return f, nil } // GetPodServices gets the services that have the selector that match the labels on the given pod -func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) { +func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) { var selector labels.Selector - for _, service := range f { + for i := range f { + service := f[i] // consider only services that are in the same namespace as the pod if service.Namespace != pod.Namespace { continue @@ -100,37 +101,38 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, // ControllerLister interface represents anything that can produce a list of ReplicationController; the list is consumed by a scheduler. type ControllerLister interface { // Lists all the replication controllers - List() ([]api.ReplicationController, error) + List(labels.Selector) ([]*api.ReplicationController, error) // Gets the services for the given pod - GetPodControllers(*api.Pod) ([]api.ReplicationController, error) + GetPodControllers(*api.Pod) ([]*api.ReplicationController, error) } // EmptyControllerLister implements ControllerLister on []api.ReplicationController returning empty data type EmptyControllerLister struct{} // List returns nil -func (f EmptyControllerLister) List() ([]api.ReplicationController, error) { +func (f EmptyControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) { return nil, nil } // GetPodControllers returns nil -func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { +func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) { return nil, nil } // FakeControllerLister implements ControllerLister on []api.ReplicationController for test purposes. -type FakeControllerLister []api.ReplicationController +type FakeControllerLister []*api.ReplicationController // List returns []api.ReplicationController, the list of all ReplicationControllers. -func (f FakeControllerLister) List() ([]api.ReplicationController, error) { +func (f FakeControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) { return f, nil } // GetPodControllers gets the ReplicationControllers that have the selector that match the labels on the given pod -func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { +func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) { var selector labels.Selector - for _, controller := range f { + for i := range f { + controller := f[i] if controller.Namespace != pod.Namespace { continue } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index fb88a9bba81..ed7e96ad5e4 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -670,7 +670,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n // skip looking at other pods in the service if the current pod defines all the required affinity labels if !labelsExist { services, err := s.serviceLister.GetPodServices(pod) - if err == nil { + if err == nil && len(services) > 0 { // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index cb96b4c42a3..6f699bfaef1 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -1209,7 +1209,7 @@ func TestServiceAffinity(t *testing.T) { tests := []struct { pod *api.Pod pods []*api.Pod - services []api.Service + services []*api.Service node *api.Node labels []string fits bool @@ -1240,7 +1240,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, fits: true, labels: []string{"region"}, test: "service pod on same node", @@ -1249,7 +1249,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, fits: true, labels: []string{"region"}, test: "service pod on different node, region match", @@ -1258,7 +1258,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, fits: false, labels: []string{"region"}, test: "service pod on different node, region mismatch", @@ -1267,7 +1267,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}}, fits: true, labels: []string{"region"}, test: "service in different namespace, region mismatch", @@ -1276,7 +1276,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns2"}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, fits: true, labels: []string{"region"}, test: "pod in different namespace, region mismatch", @@ -1285,7 +1285,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, fits: false, labels: []string{"region"}, test: "service and pod in same namespace, region mismatch", @@ -1294,7 +1294,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, node: &node1, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, fits: false, labels: []string{"region", "zone"}, test: "service pod on different node, multiple labels, not all match", @@ -1303,7 +1303,7 @@ func TestServiceAffinity(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, node: &node4, - services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, fits: true, labels: []string{"region", "zone"}, test: "service pod on different node, multiple labels, all match", diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 14bd4afea37..edd978a2f3e 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -194,7 +194,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister // The label to be considered is provided to the struct (ServiceAntiAffinity). func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { var nsServicePods []*api.Pod - if services, err := s.serviceLister.GetPodServices(pod); err == nil { + if services, err := s.serviceLister.GetPodServices(pod); err == nil && len(services) > 0 { // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 7ce57ef9335..13fc6542ef6 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -57,9 +57,9 @@ func TestSelectorSpreadPriority(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []string - rcs []api.ReplicationController + rcs []*api.ReplicationController rss []extensions.ReplicaSet - services []api.Service + services []*api.Service expectedList schedulerapi.HostPriorityList test string }{ @@ -80,7 +80,7 @@ func TestSelectorSpreadPriority(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 10}}, test: "different services", }, @@ -91,7 +91,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}}, test: "two pods, one service pod", }, @@ -105,7 +105,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}}, test: "five pods, one service pod in no namespace", }, @@ -118,7 +118,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}}, test: "four pods, one service pod in default namespace", }, @@ -132,7 +132,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}}, test: "five pods, one service pod in specific namespace", }, @@ -144,7 +144,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}}, test: "three pods, two service pods on different machines", }, @@ -157,7 +157,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 5}, {Host: "machine2", Score: 0}}, test: "four pods, three service pods", }, @@ -169,7 +169,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "service with partial pod label matches", }, @@ -181,8 +181,8 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to // do spreading between all pods. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, @@ -196,7 +196,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, @@ -210,8 +210,8 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "disjoined service and replication controller should be treated equally", @@ -224,7 +224,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, @@ -238,7 +238,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // Both Nodes have one pod from the given RC, hence both get 0 score. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}}, test: "Replication controller with partial pod label matches", @@ -264,7 +264,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "Another replication controller with partial pod label matches", }, @@ -344,9 +344,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []string - rcs []api.ReplicationController + rcs []*api.ReplicationController rss []extensions.ReplicaSet - services []api.Service + services []*api.Service expectedList schedulerapi.HostPriorityList test string }{ @@ -378,7 +378,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { { pod: buildPod("", labels1, nil), pods: []*api.Pod{buildPod(nodeMachine1Zone1, labels2, nil)}, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, expectedList: []schedulerapi.HostPriority{ {Host: nodeMachine1Zone1, Score: 10}, {Host: nodeMachine1Zone2, Score: 10}, @@ -395,7 +395,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone1, labels2, nil), buildPod(nodeMachine1Zone2, labels1, nil), }, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ {Host: nodeMachine1Zone1, Score: 10}, {Host: nodeMachine1Zone2, Score: 0}, // Already have pod on machine @@ -415,7 +415,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone3, labels2, nil), buildPod(nodeMachine2Zone3, labels1, nil), }, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ {Host: nodeMachine1Zone1, Score: 10}, {Host: nodeMachine1Zone2, Score: 0}, // Pod on node @@ -434,7 +434,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine2Zone2, labels2, nil), buildPod(nodeMachine1Zone3, labels1, nil), }, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ {Host: nodeMachine1Zone1, Score: 0}, // Pod on node {Host: nodeMachine1Zone2, Score: 0}, // Pod on node @@ -453,7 +453,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone3, labels1, nil), buildPod(nodeMachine2Zone2, labels2, nil), }, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ {Host: nodeMachine1Zone1, Score: 0}, // Pod on node {Host: nodeMachine1Zone2, Score: 0}, // Pod on node @@ -471,7 +471,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")), buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")), }, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ // Note that because we put two pods on the same node (nodeMachine1Zone3), // the values here are questionable for zone2, in particular for nodeMachine1Zone2. @@ -548,7 +548,7 @@ func TestZoneSpreadPriority(t *testing.T) { pod *api.Pod pods []*api.Pod nodes map[string]map[string]string - services []api.Service + services []*api.Service expectedList schedulerapi.HostPriorityList test string }{ @@ -573,7 +573,7 @@ func TestZoneSpreadPriority(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10}, {Host: "machine21", Score: 10}, {Host: "machine22", Score: 10}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, @@ -587,7 +587,7 @@ func TestZoneSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10}, {Host: "machine21", Score: 0}, {Host: "machine22", Score: 0}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, @@ -601,7 +601,7 @@ func TestZoneSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 5}, {Host: "machine12", Score: 5}, {Host: "machine21", Score: 5}, {Host: "machine22", Score: 5}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, @@ -616,7 +616,7 @@ func TestZoneSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, Namespace: "ns1"}}, }, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 0}, {Host: "machine12", Score: 0}, {Host: "machine21", Score: 10}, {Host: "machine22", Score: 10}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, @@ -631,7 +631,7 @@ func TestZoneSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 6}, {Host: "machine12", Score: 6}, {Host: "machine21", Score: 3}, {Host: "machine22", Score: 3}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, @@ -645,7 +645,7 @@ func TestZoneSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 3}, {Host: "machine12", Score: 3}, {Host: "machine21", Score: 6}, {Host: "machine22", Score: 6}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, @@ -660,7 +660,7 @@ func TestZoneSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: labeledNodes, - services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 7}, {Host: "machine12", Score: 7}, {Host: "machine21", Score: 5}, {Host: "machine22", Score: 5}, {Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}}, diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 550d1bd0615..f49cb99c4d1 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -108,7 +108,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini NodeLister: &cache.StoreToNodeLister{}, PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, - ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, schedulerCache: schedulerCache, @@ -401,7 +401,7 @@ func (f *ConfigFactory) Run() { // Watch and cache all service objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything) + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything) // Watch and cache all ReplicationController objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 46d34176f94..c47064ba7d0 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -493,8 +493,8 @@ func TestZeroRequest(t *testing.T) { { Function: algorithmpriorities.NewSelectorSpreadPriority( algorithm.FakePodLister(test.pods), - algorithm.FakeServiceLister([]api.Service{}), - algorithm.FakeControllerLister([]api.ReplicationController{}), + algorithm.FakeServiceLister([]*api.Service{}), + algorithm.FakeControllerLister([]*api.ReplicationController{}), algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})), Weight: 1, },