Merge pull request #32888 from deads2k/client-10-fixup-remaining-listers

Automatic merge from submit-queue

simplify RC and SVC listers

Make the RC and SVC listers use the common list functions that more closely match client APIs, are consistent with other listers, and avoid unnecessary copies.
This commit is contained in:
Kubernetes Submit Queue 2016-09-21 04:13:56 -07:00 committed by GitHub
commit 2d9d84dc64
23 changed files with 315 additions and 326 deletions

View File

@ -55,7 +55,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
}
e.serviceStore.Store, e.serviceController = cache.NewInformer(
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Services(api.NamespaceAll).List(options)
@ -73,6 +73,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
},
DeleteFunc: e.enqueueService,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
e.podStore.Indexer, e.podController = cache.NewIndexerInformer(
@ -262,7 +263,7 @@ func (e *endpointController) syncService(key string) error {
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := e.serviceStore.Store.GetByKey(key)
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a

View File

@ -114,7 +114,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
},
)
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = cache.NewInformer(
cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Services(v1.NamespaceAll).List(options)
@ -149,6 +149,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cc.clientMap[clusterName] = cachedClusterClient
go cachedClusterClient.serviceController.Run(wait.NeverStop)

View File

@ -63,7 +63,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
return nil
}
serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key)
serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.serviceQueue.Add(key)

View File

@ -144,7 +144,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
queue: workqueue.New(),
knownClusterSet: make(sets.String),
}
s.serviceStore.Store, s.serviceController = cache.NewInformer(
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return s.federationClient.Core().Services(v1.NamespaceAll).List(options)
@ -165,6 +165,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
},
DeleteFunc: s.enqueueService,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
s.clusterStore.Store, s.clusterController = cache.NewInformer(
&cache.ListWatch{
@ -816,7 +817,7 @@ func (s *ServiceController) syncService(key string) error {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
obj, exists, err := s.serviceStore.Store.GetByKey(key)
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
)
// AppendFunc is used to add a matching item to whatever list the caller is using
type AppendFunc func(interface{})
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
@ -136,116 +137,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) {
return
}
// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Indexer
}
// Exists checks if the given rc exists in the store.
func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) {
_, exists, err := s.Indexer.Get(controller)
if err != nil {
return false, err
}
return exists, nil
}
// StoreToReplicationControllerLister lists all controllers in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) {
for _, c := range s.Indexer.List() {
controllers = append(controllers, *(c.(*api.ReplicationController)))
}
return controllers, nil
}
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
return storeReplicationControllersNamespacer{s.Indexer, namespace}
}
type storeReplicationControllersNamespacer struct {
indexer Indexer
namespace string
}
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) {
controllers := []api.ReplicationController{}
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return controllers, nil
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return controllers, nil
}
for _, m := range items {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return controllers, nil
}
func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name)
}
return obj.(*api.ReplicationController), nil
}
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
var selector labels.Selector
var rc api.ReplicationController
if len(pod.Labels) == 0 {
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
return
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
items, err := s.Indexer.Index(NamespaceIndex, key)
if err != nil {
return
}
for _, m := range items {
rc = *m.(*api.ReplicationController)
selector = labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
controllers = append(controllers, rc)
}
if len(controllers) == 0 {
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
type StoreToDeploymentLister struct {
Indexer
@ -518,47 +409,6 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
return
}
// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
// The Store must contain (only) Services.
type StoreToServiceLister struct {
Store
}
func (s *StoreToServiceLister) List() (services api.ServiceList, err error) {
for _, m := range s.Store.List() {
services.Items = append(services.Items, *(m.(*api.Service)))
}
return services, nil
}
// TODO: Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToServiceLister.
func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
var selector labels.Selector
var service api.Service
for _, m := range s.Store.List() {
service = *m.(*api.Service)
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
if service.Spec.Selector == nil {
// services with nil selectors match nothing, not everything.
continue
}
selector = labels.Set(service.Spec.Selector).AsSelectorPreValidated()
if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service)
}
}
if len(services) == 0 {
err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToEndpointsLister makes a Store that lists endpoints.
type StoreToEndpointsLister struct {
Store

View File

@ -17,6 +17,8 @@ limitations under the License.
package cache
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/labels"
@ -25,24 +27,24 @@ import (
// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.
// StoreToPodLister makes a Store have the List method of the client.PodInterface
// The Store must contain (only) Pods.
//
// Lister makes an Index have the List method. The Stores must contain only the expected type
// Example:
// s := cache.NewStore()
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
// l := StoreToPodLister{s}
// l.List()
// StoreToPodLister helps list pods
type StoreToPodLister struct {
Indexer Indexer
}
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api.Pod, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
ret = append(ret, m.(*api.Pod))
})
return pods, err
return ret, err
}
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
@ -54,11 +56,11 @@ type storePodsNamespacer struct {
namespace string
}
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
func (s storePodsNamespacer) List(selector labels.Selector) (ret []*api.Pod, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
ret = append(ret, m.(*api.Pod))
})
return pods, err
return ret, err
}
func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
@ -71,3 +73,133 @@ func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
}
return obj.(*api.Pod), nil
}
// StoreToReplicationControllerLister helps list rcs
type StoreToReplicationControllerLister struct {
Indexer Indexer
}
func (s *StoreToReplicationControllerLister) List(selector labels.Selector) (ret []*api.ReplicationController, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*api.ReplicationController))
})
return ret, err
}
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
return storeReplicationControllersNamespacer{s.Indexer, namespace}
}
type storeReplicationControllersNamespacer struct {
indexer Indexer
namespace string
}
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (ret []*api.ReplicationController, err error) {
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*api.ReplicationController))
})
return ret, err
}
func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name)
}
return obj.(*api.ReplicationController), nil
}
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
return
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
items, err := s.Indexer.Index(NamespaceIndex, key)
if err != nil {
return
}
for _, m := range items {
rc := m.(*api.ReplicationController)
selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
controllers = append(controllers, rc)
}
if len(controllers) == 0 {
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToServiceLister helps list services
type StoreToServiceLister struct {
Indexer Indexer
}
func (s *StoreToServiceLister) List(selector labels.Selector) (ret []*api.Service, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*api.Service))
})
return ret, err
}
func (s *StoreToServiceLister) Services(namespace string) storeServicesNamespacer {
return storeServicesNamespacer{s.Indexer, namespace}
}
type storeServicesNamespacer struct {
indexer Indexer
namespace string
}
func (s storeServicesNamespacer) List(selector labels.Selector) (ret []*api.Service, err error) {
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*api.Service))
})
return ret, err
}
func (s storeServicesNamespacer) Get(name string) (*api.Service, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("service"), name)
}
return obj.(*api.Service), nil
}
// TODO: Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToServiceLister.
func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
allServices, err := s.Services(pod.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
for i := range allServices {
service := allServices[i]
if service.Spec.Selector == nil {
// services with nil selectors match nothing, not everything.
continue
}
selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service)
}
}
return services, nil
}

View File

@ -128,7 +128,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
testCases := []struct {
description string
inRCs []*api.ReplicationController
list func(StoreToReplicationControllerLister) ([]api.ReplicationController, error)
list func(StoreToReplicationControllerLister) ([]*api.ReplicationController, error)
outRCNames sets.String
expectErr bool
onlyIfIndexedByNamespace bool
@ -143,7 +143,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
return lister.ReplicationControllers(api.NamespaceAll).List(labels.Set{}.AsSelectorPreValidated())
},
outRCNames: sets.NewString("hmm", "foo"),
@ -158,7 +158,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
return lister.ReplicationControllers("hmm").List(labels.Set{}.AsSelectorPreValidated())
},
outRCNames: sets.NewString("hmm"),
@ -168,8 +168,8 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
inRCs: []*api.ReplicationController{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
return lister.List()
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
return lister.List(labels.Everything())
},
outRCNames: sets.NewString("basic"),
},
@ -183,7 +183,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"},
}
@ -199,7 +199,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -228,7 +228,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -754,7 +754,7 @@ func TestStoreToPodLister(t *testing.T) {
}
func TestStoreToServiceLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{

View File

@ -80,7 +80,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
}
e.serviceStore.Store, e.serviceController = cache.NewInformer(
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Services(api.NamespaceAll).List(options)
@ -99,6 +99,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
},
DeleteFunc: e.enqueueService,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -335,7 +336,7 @@ func (e *EndpointController) syncService(key string) error {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := e.serviceStore.Store.GetByKey(key)
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a

View File

@ -110,7 +110,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
})
@ -174,7 +174,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
@ -255,7 +255,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
@ -293,7 +293,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
@ -331,7 +331,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
@ -373,7 +373,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
@ -414,7 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
@ -435,7 +435,7 @@ func TestSyncEndpointsItems(t *testing.T) {
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found!
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
@ -478,7 +478,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
@ -539,7 +539,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Store.Add(&api.Service{
endpoints.serviceStore.Indexer.Add(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,

View File

@ -269,16 +269,16 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
}
// update lookup cache
rm.lookupCache.Update(pod, &controllers[0])
rm.lookupCache.Update(pod, controllers[0])
return &controllers[0]
return controllers[0]
}
// isCacheValid check if the cache is valid
func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool {
exists, err := rm.rcStore.Exists(cachedRC)
_, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
// rc has been deleted or updated, cache is invalid
if err != nil || !exists || !isControllerMatch(pod, cachedRC) {
if err != nil || !isControllerMatch(pod, cachedRC) {
return false
}
return true

View File

@ -71,7 +71,7 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface,
}
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
type OverlappingControllers []api.ReplicationController
type OverlappingControllers []*api.ReplicationController
func (o OverlappingControllers) Len() int { return len(o) }
func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }

View File

@ -119,7 +119,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
},
workingQueue: workqueue.NewDelayingQueue(),
}
s.serviceStore.Store, s.serviceController = cache.NewInformer(
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
@ -141,6 +141,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
},
DeleteFunc: s.enqueueService,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
if err := s.init(); err != nil {
return nil, err
@ -724,7 +725,7 @@ func (s *ServiceController) syncService(key string) error {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
obj, exists, err := s.serviceStore.Store.GetByKey(key)
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
if err != nil {
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
s.workingQueue.Add(key)

View File

@ -27,10 +27,10 @@ import (
// FromServices builds environment variables that a container is started with,
// which tell the container where to find the services it may need, which are
// provided as an argument.
func FromServices(services *api.ServiceList) []api.EnvVar {
func FromServices(services []*api.Service) []api.EnvVar {
var result []api.EnvVar
for i := range services.Items {
service := &services.Items[i]
for i := range services {
service := services[i]
// ignore services where ClusterIP is "None" or empty
// the services passed to this method should be pre-filtered

View File

@ -25,63 +25,61 @@ import (
)
func TestFromServices(t *testing.T) {
sl := api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "foo-bar"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{
{Port: 8080, Protocol: "TCP"},
},
sl := []*api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "foo-bar"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{
{Port: 8080, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "abc-123"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "5.6.7.8",
Ports: []api.ServicePort{
{Name: "u-d-p", Port: 8081, Protocol: "UDP"},
{Name: "t-c-p", Port: 8081, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "abc-123"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "5.6.7.8",
Ports: []api.ServicePort{
{Name: "u-d-p", Port: 8081, Protocol: "UDP"},
{Name: "t-c-p", Port: 8081, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "9.8.7.6",
Ports: []api.ServicePort{
{Port: 8082, Protocol: "TCP"},
{Name: "8083", Port: 8083, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "9.8.7.6",
Ports: []api.ServicePort{
{Port: 8082, Protocol: "TCP"},
{Name: "8083", Port: 8083, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "None",
Ports: []api.ServicePort{
{Port: 8082, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "None",
Ports: []api.ServicePort{
{Port: 8082, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "",
Ports: []api.ServicePort{
{Port: 8082, Protocol: "TCP"},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
ClusterIP: "",
Ports: []api.ServicePort{
{Port: 8082, Protocol: "TCP"},
},
},
},
}
vars := envvars.FromServices(&sl)
vars := envvars.FromServices(sl)
expected := []api.EnvVar{
{Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"},
{Name: "FOO_BAR_SERVICE_PORT", Value: "8080"},

View File

@ -77,6 +77,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/types"
@ -371,7 +372,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
dockerExecHandler = &dockertools.NativeExecHandler{}
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
@ -385,7 +386,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
}
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
}
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
@ -777,7 +778,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
}
type serviceLister interface {
List() (api.ServiceList, error)
List(labels.Selector) ([]*api.Service, error)
}
type nodeLister interface {
@ -1447,7 +1448,7 @@ var masterServices = sets.NewString("kubernetes")
// pod in namespace ns should see.
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
var (
serviceMap = make(map[string]api.Service)
serviceMap = make(map[string]*api.Service)
m = make(map[string]string)
)
@ -1457,15 +1458,16 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m, nil
}
services, err := kl.serviceLister.List()
services, err := kl.serviceLister.List(labels.Everything())
if err != nil {
return m, fmt.Errorf("failed to list services when setting up env vars.")
}
// project the services in namespace ns onto the master services
for _, service := range services.Items {
for i := range services {
service := services[i]
// ignore services where ClusterIP is "None" or empty
if !api.IsServiceIPSet(&service) {
if !api.IsServiceIPSet(service) {
continue
}
serviceName := service.Name
@ -1485,12 +1487,13 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
}
}
}
services.Items = []api.Service{}
for _, service := range serviceMap {
services.Items = append(services.Items, service)
mappedServices := []*api.Service{}
for key := range serviceMap {
mappedServices = append(mappedServices, serviceMap[key])
}
for _, e := range envvars.FromServices(&services) {
for _, e := range envvars.FromServices(mappedServices) {
m[e.Name] = e.Value
}
return m, nil

View File

@ -61,6 +61,7 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/clock"
@ -896,13 +897,11 @@ func TestDNSConfigurationParams(t *testing.T) {
}
type testServiceLister struct {
services []api.Service
services []*api.Service
}
func (ls testServiceLister) List() (api.ServiceList, error) {
return api.ServiceList{
Items: ls.services,
}, nil
func (ls testServiceLister) List(labels.Selector) ([]*api.Service, error) {
return ls.services, nil
}
type testNodeLister struct {
@ -938,8 +937,8 @@ func (e envs) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
func (e envs) Less(i, j int) bool { return e[i].Name < e[j].Name }
func buildService(name, namespace, clusterIP, protocol string, port int) api.Service {
return api.Service{
func buildService(name, namespace, clusterIP, protocol string, port int) *api.Service {
return &api.Service{
ObjectMeta: api.ObjectMeta{Name: name, Namespace: namespace},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
@ -952,7 +951,7 @@ func buildService(name, namespace, clusterIP, protocol string, port int) api.Ser
}
func TestMakeEnvironmentVariables(t *testing.T) {
services := []api.Service{
services := []*api.Service{
buildService("kubernetes", api.NamespaceDefault, "1.2.3.1", "TCP", 8081),
buildService("test", "test1", "1.2.3.3", "TCP", 8083),
buildService("kubernetes", "test2", "1.2.3.4", "TCP", 8084),

View File

@ -63,24 +63,25 @@ func (f FakePodLister) List(s labels.Selector) (selected []*api.Pod, err error)
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.
type ServiceLister interface {
// Lists all the services
List() (api.ServiceList, error)
List(labels.Selector) ([]*api.Service, error)
// Gets the services for the given pod
GetPodServices(*api.Pod) ([]api.Service, error)
GetPodServices(*api.Pod) ([]*api.Service, error)
}
// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
type FakeServiceLister []api.Service
type FakeServiceLister []*api.Service
// List returns api.ServiceList, the list of all services.
func (f FakeServiceLister) List() (api.ServiceList, error) {
return api.ServiceList{Items: f}, nil
func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) {
return f, nil
}
// GetPodServices gets the services that have the selector that match the labels on the given pod
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
var selector labels.Selector
for _, service := range f {
for i := range f {
service := f[i]
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
@ -100,37 +101,38 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service,
// ControllerLister interface represents anything that can produce a list of ReplicationController; the list is consumed by a scheduler.
type ControllerLister interface {
// Lists all the replication controllers
List() ([]api.ReplicationController, error)
List(labels.Selector) ([]*api.ReplicationController, error)
// Gets the services for the given pod
GetPodControllers(*api.Pod) ([]api.ReplicationController, error)
GetPodControllers(*api.Pod) ([]*api.ReplicationController, error)
}
// EmptyControllerLister implements ControllerLister on []api.ReplicationController returning empty data
type EmptyControllerLister struct{}
// List returns nil
func (f EmptyControllerLister) List() ([]api.ReplicationController, error) {
func (f EmptyControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) {
return nil, nil
}
// GetPodControllers returns nil
func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
return nil, nil
}
// FakeControllerLister implements ControllerLister on []api.ReplicationController for test purposes.
type FakeControllerLister []api.ReplicationController
type FakeControllerLister []*api.ReplicationController
// List returns []api.ReplicationController, the list of all ReplicationControllers.
func (f FakeControllerLister) List() ([]api.ReplicationController, error) {
func (f FakeControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) {
return f, nil
}
// GetPodControllers gets the ReplicationControllers that have the selector that match the labels on the given pod
func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
var selector labels.Selector
for _, controller := range f {
for i := range f {
controller := f[i]
if controller.Namespace != pod.Namespace {
continue
}

View File

@ -670,7 +670,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
// skip looking at other pods in the service if the current pod defines all the required affinity labels
if !labelsExist {
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
if err == nil && len(services) > 0 {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)

View File

@ -1209,7 +1209,7 @@ func TestServiceAffinity(t *testing.T) {
tests := []struct {
pod *api.Pod
pods []*api.Pod
services []api.Service
services []*api.Service
node *api.Node
labels []string
fits bool
@ -1240,7 +1240,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region"},
test: "service pod on same node",
@ -1249,7 +1249,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region"},
test: "service pod on different node, region match",
@ -1258,7 +1258,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: false,
labels: []string{"region"},
test: "service pod on different node, region mismatch",
@ -1267,7 +1267,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}},
fits: true,
labels: []string{"region"},
test: "service in different namespace, region mismatch",
@ -1276,7 +1276,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns2"}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
fits: true,
labels: []string{"region"},
test: "pod in different namespace, region mismatch",
@ -1285,7 +1285,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
fits: false,
labels: []string{"region"},
test: "service and pod in same namespace, region mismatch",
@ -1294,7 +1294,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: &node1,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: false,
labels: []string{"region", "zone"},
test: "service pod on different node, multiple labels, not all match",
@ -1303,7 +1303,7 @@ func TestServiceAffinity(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: &node4,
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region", "zone"},
test: "service pod on different node, multiple labels, all match",

View File

@ -194,7 +194,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister
// The label to be considered is provided to the struct (ServiceAntiAffinity).
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
var nsServicePods []*api.Pod
if services, err := s.serviceLister.GetPodServices(pod); err == nil {
if services, err := s.serviceLister.GetPodServices(pod); err == nil && len(services) > 0 {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)

View File

@ -57,9 +57,9 @@ func TestSelectorSpreadPriority(t *testing.T) {
pod *api.Pod
pods []*api.Pod
nodes []string
rcs []api.ReplicationController
rcs []*api.ReplicationController
rss []extensions.ReplicaSet
services []api.Service
services []*api.Service
expectedList schedulerapi.HostPriorityList
test string
}{
@ -80,7 +80,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 10}},
test: "different services",
},
@ -91,7 +91,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
test: "two pods, one service pod",
},
@ -105,7 +105,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
test: "five pods, one service pod in no namespace",
},
@ -118,7 +118,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
test: "four pods, one service pod in default namespace",
},
@ -132,7 +132,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
test: "five pods, one service pod in specific namespace",
},
@ -144,7 +144,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
test: "three pods, two service pods on different machines",
},
@ -157,7 +157,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 5}, {Host: "machine2", Score: 0}},
test: "four pods, three service pods",
},
@ -169,7 +169,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "service with partial pod label matches",
},
@ -181,8 +181,8 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
// "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to
// do spreading between all pods. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
@ -196,7 +196,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
@ -210,8 +210,8 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
// Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "disjoined service and replication controller should be treated equally",
@ -224,7 +224,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
@ -238,7 +238,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
// Both Nodes have one pod from the given RC, hence both get 0 score.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
test: "Replication controller with partial pod label matches",
@ -264,7 +264,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "Another replication controller with partial pod label matches",
},
@ -344,9 +344,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
pod *api.Pod
pods []*api.Pod
nodes []string
rcs []api.ReplicationController
rcs []*api.ReplicationController
rss []extensions.ReplicaSet
services []api.Service
services []*api.Service
expectedList schedulerapi.HostPriorityList
test string
}{
@ -378,7 +378,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
{
pod: buildPod("", labels1, nil),
pods: []*api.Pod{buildPod(nodeMachine1Zone1, labels2, nil)},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []schedulerapi.HostPriority{
{Host: nodeMachine1Zone1, Score: 10},
{Host: nodeMachine1Zone2, Score: 10},
@ -395,7 +395,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
buildPod(nodeMachine1Zone1, labels2, nil),
buildPod(nodeMachine1Zone2, labels1, nil),
},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{
{Host: nodeMachine1Zone1, Score: 10},
{Host: nodeMachine1Zone2, Score: 0}, // Already have pod on machine
@ -415,7 +415,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
buildPod(nodeMachine1Zone3, labels2, nil),
buildPod(nodeMachine2Zone3, labels1, nil),
},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{
{Host: nodeMachine1Zone1, Score: 10},
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
@ -434,7 +434,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
buildPod(nodeMachine2Zone2, labels2, nil),
buildPod(nodeMachine1Zone3, labels1, nil),
},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{
{Host: nodeMachine1Zone1, Score: 0}, // Pod on node
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
@ -453,7 +453,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
buildPod(nodeMachine1Zone3, labels1, nil),
buildPod(nodeMachine2Zone2, labels2, nil),
},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{
{Host: nodeMachine1Zone1, Score: 0}, // Pod on node
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
@ -471,7 +471,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")),
buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")),
},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{
// Note that because we put two pods on the same node (nodeMachine1Zone3),
// the values here are questionable for zone2, in particular for nodeMachine1Zone2.
@ -548,7 +548,7 @@ func TestZoneSpreadPriority(t *testing.T) {
pod *api.Pod
pods []*api.Pod
nodes map[string]map[string]string
services []api.Service
services []*api.Service
expectedList schedulerapi.HostPriorityList
test string
}{
@ -573,7 +573,7 @@ func TestZoneSpreadPriority(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10},
{Host: "machine21", Score: 10}, {Host: "machine22", Score: 10},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
@ -587,7 +587,7 @@ func TestZoneSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10},
{Host: "machine21", Score: 0}, {Host: "machine22", Score: 0},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
@ -601,7 +601,7 @@ func TestZoneSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 5}, {Host: "machine12", Score: 5},
{Host: "machine21", Score: 5}, {Host: "machine22", Score: 5},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
@ -616,7 +616,7 @@ func TestZoneSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 0}, {Host: "machine12", Score: 0},
{Host: "machine21", Score: 10}, {Host: "machine22", Score: 10},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
@ -631,7 +631,7 @@ func TestZoneSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 6}, {Host: "machine12", Score: 6},
{Host: "machine21", Score: 3}, {Host: "machine22", Score: 3},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
@ -645,7 +645,7 @@ func TestZoneSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 3}, {Host: "machine12", Score: 3},
{Host: "machine21", Score: 6}, {Host: "machine22", Score: 6},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
@ -660,7 +660,7 @@ func TestZoneSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 7}, {Host: "machine12", Score: 7},
{Host: "machine21", Score: 5}, {Host: "machine22", Score: 5},
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},

View File

@ -108,7 +108,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
NodeLister: &cache.StoreToNodeLister{},
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
schedulerCache: schedulerCache,
@ -401,7 +401,7 @@ func (f *ConfigFactory) Run() {
// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything)
// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.

View File

@ -493,8 +493,8 @@ func TestZeroRequest(t *testing.T) {
{
Function: algorithmpriorities.NewSelectorSpreadPriority(
algorithm.FakePodLister(test.pods),
algorithm.FakeServiceLister([]api.Service{}),
algorithm.FakeControllerLister([]api.ReplicationController{}),
algorithm.FakeServiceLister([]*api.Service{}),
algorithm.FakeControllerLister([]*api.ReplicationController{}),
algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})),
Weight: 1,
},