diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index f98dff6e4a2..a8eb8f57700 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -75,17 +75,41 @@ func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) { return nil, fmt.Errorf("minion '%v' is not in cache", id) } -// StoreToServiceLister makes a Store have the List method of the client.ServiceInterface +// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface // The Store must contain (only) Services. type StoreToServiceLister struct { Store } -func (s *StoreToServiceLister) List() (svcs api.ServiceList, err error) { +func (s *StoreToServiceLister) List() (services api.ServiceList, err error) { for _, m := range s.Store.List() { - svcs.Items = append(svcs.Items, *(m.(*api.Service))) + services.Items = append(services.Items, *(m.(*api.Service))) } - return svcs, nil + return services, nil +} + +// TODO: Move this back to scheduler as a helper function that takes a Store, +// rather than a method of StoreToServiceLister. +func (s *StoreToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { + var selector labels.Selector + var service api.Service + + for _, m := range s.Store.List() { + service = *m.(*api.Service) + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + services = append(services, service) + } + } + if len(services) == 0 { + err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return } // TODO: add StoreToEndpointsLister for use in kube-proxy. diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index e6655ef9acf..10d887c9417 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -58,7 +58,7 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) { // ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. type ServiceLister interface { // Lists all the services - ListServices() (api.ServiceList, error) + List() (api.ServiceList, error) // Gets the services for the given pod GetPodServices(api.Pod) ([]api.Service, error) } @@ -67,7 +67,7 @@ type ServiceLister interface { type FakeServiceLister []api.Service // FakeServiceLister returns api.ServiceList, the list of all services. -func (f FakeServiceLister) ListServices() (api.ServiceList, error) { +func (f FakeServiceLister) List() (api.ServiceList, error) { return api.ServiceList{Items: f}, nil } diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 27f3345da3f..5f91ba248f8 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -32,9 +32,6 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return &node, nil } -<<<<<<< HEAD -func makeResources(milliCPU int64, memory int64) api.NodeResources { -======= type FakeNodeListInfo []api.Node func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { @@ -46,8 +43,7 @@ func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return nil, fmt.Errorf("Unable to find node: %s", nodeName) } -func makeResources(milliCPU int, memory int) api.NodeResources { ->>>>>>> e0101c2... Adding service affinity predicate +func makeResources(milliCPU int64, memory int64) api.NodeResources { return api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), diff --git a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go index 47db0acf0b4..ee023355d47 100644 --- a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go +++ b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go @@ -38,7 +38,7 @@ func affinityPredicates() util.StringSet { "NoDiskConflict", // Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})), - // Fit is defined based on the presence/absence of the "region" label on a minion, regardless of value. + // Fit is defined based on the presence of the "region" label on a minion, regardless of value. factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)), ) } @@ -48,9 +48,8 @@ func affinityPriorities() util.StringSet { "LeastRequestedPriority", "ServiceSpreadingPriority", // spreads pods belonging to the same service across minions in different zones - // region and zone can be nested infrastructure topology levels and defined by labels on minions factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2), - // Prioritize nodes based on the presence/absence of a label on a minion, regardless of value. + // Prioritize nodes based on the presence of the "zone" label on a minion, regardless of value. factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1), ) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index f53314021b7..d538c181378 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -187,12 +187,12 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &nodeEnumerator{list}, nil } -// createServiceLW returns a listWatch that gets all changes to services. -func (factory *ConfigFactory) createServiceLW() *listWatch { - return &listWatch{ - client: factory.Client, - fieldSelector: parseSelectorOrDie(""), - resource: "services", +// createServiceLW returns a cache.ListWatch that gets all changes to services. +func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { + return &cache.ListWatch{ + Client: factory.Client, + FieldSelector: parseSelectorOrDie(""), + Resource: "services", } } @@ -226,40 +226,6 @@ type nodeEnumerator struct { *api.NodeList } -// storeToServiceLister turns a store into a service lister. The store must contain (only) services. -type storeToServiceLister struct { - cache.Store -} - -func (s *storeToServiceLister) ListServices() (services api.ServiceList, err error) { - for _, m := range s.List() { - services.Items = append(services.Items, *(m.(*api.Service))) - } - return services, nil -} - -func (s *storeToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { - var selector labels.Selector - var service api.Service - - for _, m := range s.List() { - service = *m.(*api.Service) - // consider only services that are in the same namespace as the pod - if service.Namespace != pod.Namespace { - continue - } - selector = labels.Set(service.Spec.Selector).AsSelector() - if selector.Matches(labels.Set(pod.Labels)) { - services = append(services, service) - } - } - if len(services) == 0 { - err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - - return -} - // Len returns the number of items in the node list. func (ne *nodeEnumerator) Len() int { if ne.NodeList == nil {