From 04db076e5fc9c4b7a8503fe2ce01716b01801cc9 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Fri, 12 Dec 2014 14:29:20 -0800 Subject: [PATCH] Enhancements to scheduler priority functions - Modified the existing spreading priority to consider service pods explicitly - Added a new priority function to spread pods across zones --- pkg/scheduler/listers.go | 37 +++ pkg/scheduler/spreading.go | 109 ++++++++- pkg/scheduler/spreading_test.go | 222 ++++++++++++++++-- .../algorithmprovider/defaults/defaults.go | 13 +- plugin/pkg/scheduler/factory/factory.go | 59 ++++- 5 files changed, 399 insertions(+), 41 deletions(-) diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index 6c893dcce93..9e7829a8efe 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -17,6 +17,8 @@ limitations under the License. package scheduler import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) @@ -52,3 +54,38 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) { } return selected, nil } + +// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. +type ServiceLister interface { + // Lists all the services + ListServices() (api.ServiceList, error) + // Gets the service for the given pod + GetPodService(api.Pod) (api.Service, error) +} + +// FakeServiceLister implements ServiceLister on []api.Service for test purposes. +type FakeServiceLister []api.Service + +// FakeServiceLister returns api.ServiceList, the list of all services. +func (f FakeServiceLister) ListServices() (api.ServiceList, error) { + return api.ServiceList{Items: f}, nil +} + +// GetPodService gets the service that has the selector that can match the labels on the given pod +// We are assuming a single service per pod. +// In case of multiple services per pod, the first service found is returned +func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) { + var selector labels.Selector + + for _, service := range f { + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + return service, nil + } + } + return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) +} diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 0c831484104..3afa6379731 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -17,28 +17,44 @@ limitations under the License. package scheduler import ( - "math/rand" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) +type ServiceSpread struct { + serviceLister ServiceLister +} + +func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction { + serviceSpread := &ServiceSpread{ + serviceLister: serviceLister, + } + return serviceSpread.CalculateSpreadPriority +} + // CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. // Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority // may not provide optimal spreading for the members of that Service. // TODO: consider if we want to include Service label sets in the scheduling priority. -func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { - pods, err := podLister.List(labels.SelectorFromSet(pod.Labels)) - if err != nil { - return nil, err +func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var maxCount int + var pods []api.Pod + var err error + + service, err := s.serviceLister.GetPodService(pod) + if err == nil { + selector := labels.SelectorFromSet(service.Spec.Selector) + pods, err = podLister.ListPods(selector) + if err != nil { + return nil, err + } } + minions, err := minionLister.List() if err != nil { return nil, err } - var maxCount int - var fScore float32 = 10.0 counts := map[string]int{} if len(pods) > 0 { for _, pod := range pods { @@ -54,6 +70,8 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest for _, minion := range minions.Items { + // initializing to the default/max minion score of 10 + fScore := float32(10) if maxCount > 0 { fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount)) } @@ -62,6 +80,77 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini return result, nil } -func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { - return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random) +type ZoneSpread struct { + serviceLister ServiceLister + zoneLabel string +} + +func NewZoneSpreadPriority(serviceLister ServiceLister, zoneLabel string) PriorityFunction { + zoneSpread := &ZoneSpread{ + serviceLister: serviceLister, + zoneLabel: zoneLabel, + } + return zoneSpread.ZoneSpreadPriority +} + +func (z *ZoneSpread) ZoneSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var service api.Service + var pods []api.Pod + var err error + + service, err = z.serviceLister.GetPodService(pod) + if err == nil { + selector := labels.SelectorFromSet(service.Spec.Selector) + pods, err = podLister.ListPods(selector) + if err != nil { + return nil, err + } + } + + minions, err := minionLister.List() + if err != nil { + return nil, err + } + + // find the zones that the minions belong to + openMinions := []string{} + zonedMinions := map[string]string{} + for _, minion := range minions.Items { + if labels.Set(minion.Labels).Has(z.zoneLabel) { + zone := labels.Set(minion.Labels).Get(z.zoneLabel) + zonedMinions[minion.Name] = zone + } else { + openMinions = append(openMinions, minion.Name) + } + } + + podCounts := map[string]int{} + numServicePods := len(pods) + if numServicePods > 0 { + for _, pod := range pods { + zone, exists := zonedMinions[pod.Status.Host] + if !exists { + continue + } + podCounts[zone]++ + } + } + + result := []HostPriority{} + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest + for minion := range zonedMinions { + // initializing to the default/max minion score of 10 + fScore := float32(10) + if numServicePods > 0 { + fScore = 10 * (float32(numServicePods-podCounts[zonedMinions[minion]]) / float32(numServicePods)) + } + result = append(result, HostPriority{host: minion, score: int(fScore)}) + } + // add the open minions with a score of 0 + for _, minion := range openMinions { + result = append(result, HostPriority{host: minion, score: 0}) + } + + return result, nil } diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 0e9c9093464..5c2614d4798 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -18,12 +18,13 @@ package scheduler import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -func TestSpreadPriority(t *testing.T) { +func TestServiceSpreadPriority(t *testing.T) { labels1 := map[string]string{ "foo": "bar", "baz": "blah", @@ -32,16 +33,17 @@ func TestSpreadPriority(t *testing.T) { "bar": "foo", "baz": "blah", } - machine1Status := api.PodStatus{ + zone1Status := api.PodStatus{ Host: "machine1", } - machine2Status := api.PodStatus{ + zone2Status := api.PodStatus{ Host: "machine2", } tests := []struct { pod api.Pod pods []api.Pod nodes []string + services []api.Service expectedList HostPriorityList test string }{ @@ -52,55 +54,72 @@ func TestSpreadPriority(t *testing.T) { }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, - pods: []api.Pod{{Status: machine1Status}}, + pods: []api.Pod{{Status: zone1Status}}, nodes: []string{"machine1", "machine2"}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "no labels", + test: "no services", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, - pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, + pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "different labels", + test: "different services", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}}, - test: "one label match", + test: "two pods, one service pod", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "two label matches on different machines", + test: "three pods, two service pods on different machines", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}}, - test: "three label matches", + test: "four pods, three service pods", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "service with partial pod label matches", }, } for _, test := range tests { - list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) + serviceSpread := ServiceSpread{serviceLister: FakeServiceLister(test.services)} + list, err := serviceSpread.CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -109,3 +128,166 @@ func TestSpreadPriority(t *testing.T) { } } } + +func TestZoneSpreadPriority(t *testing.T) { + labels1 := map[string]string{ + "foo": "bar", + "baz": "blah", + } + labels2 := map[string]string{ + "bar": "foo", + "baz": "blah", + } + zone1 := map[string]string{ + "zone": "zone1", + } + zone2 := map[string]string{ + "zone": "zone2", + } + nozone := map[string]string{ + "name": "value", + } + zone0Status := api.PodStatus{ + Host: "machine01", + } + zone1Status := api.PodStatus{ + Host: "machine11", + } + zone2Status := api.PodStatus{ + Host: "machine21", + } + labeledNodes := map[string]map[string]string{ + "machine01": nozone, "machine02": nozone, + "machine11": zone1, "machine12": zone1, + "machine21": zone2, "machine22": zone2, + } + tests := []struct { + pod api.Pod + pods []api.Pod + nodes map[string]map[string]string + services []api.Service + expectedList HostPriorityList + test string + }{ + { + nodes: labeledNodes, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "nothing scheduled", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{{Status: zone1Status}}, + nodes: labeledNodes, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "no services", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "different services", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 0}, {"machine22", 0}, + {"machine01", 0}, {"machine02", 0}}, + test: "three pods, one service pod", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 5}, {"machine12", 5}, + {"machine21", 5}, {"machine22", 5}, + {"machine01", 0}, {"machine02", 0}}, + test: "three pods, two service pods on different machines", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 6}, {"machine12", 6}, + {"machine21", 3}, {"machine22", 3}, + {"machine01", 0}, {"machine02", 0}}, + test: "four pods, three service pods", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []HostPriority{{"machine11", 3}, {"machine12", 3}, + {"machine21", 6}, {"machine22", 6}, + {"machine01", 0}, {"machine02", 0}}, + test: "service with partial pod label matches", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 7}, {"machine12", 7}, + {"machine21", 5}, {"machine22", 5}, + {"machine01", 0}, {"machine02", 0}}, + test: "service pod on non-zoned minion", + }, + } + + for _, test := range tests { + zoneSpread := ZoneSpread{serviceLister: FakeServiceLister(test.services), zoneLabel: "zone"} + list, err := zoneSpread.ZoneSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes))) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort the two lists to avoid failures on account of different ordering + sort.Sort(test.expectedList) + sort.Sort(list) + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } +} + +func makeLabeledMinionList(nodeMap map[string]map[string]string) (result api.NodeList) { + nodes := []api.Node{} + for nodeName, labels := range nodeMap { + nodes = append(nodes, api.Node{ObjectMeta: api.ObjectMeta{Name: nodeName, Labels: labels}}) + } + return api.NodeList{Items: nodes} +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index e93af9a4e32..69d5397ccf3 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -31,11 +31,11 @@ func defaultPredicates() util.StringSet { return util.NewStringSet( // Fit is defined based on the absence of port conflicts. factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts), - // Fit is determined by resource availability + // Fit is determined by resource availability. factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)), - // Fit is determined by non-conflicting disk volumes + // Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict), - // Fit is determined by node selector query + // Fit is determined by node selector query. factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)), // Fit is determined by the presence of the Host parameter and a string match factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost), @@ -46,8 +46,11 @@ func defaultPriorities() util.StringSet { return util.NewStringSet( // Prioritize nodes by least requested utilization. factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), - // spreads pods by minimizing the number of pods on the same minion with the same labels. - factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1), + // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. + factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), + // spreads pods belonging to the same service across minions in different zones + // TODO: remove the hardcoding of the "zone" label and move it to a constant + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewZoneSpreadPriority(factory.ServiceLister, "zone"), 1), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 80c226f5e02..b6a4afb14a6 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -35,8 +35,9 @@ import ( ) var ( - PodLister = &cache.StoreToPodLister{cache.NewStore()} - MinionLister = &cache.StoreToNodeLister{cache.NewStore()} + PodLister = &cache.StoreToPodLister{cache.NewStore()} + MinionLister = &cache.StoreToNodeLister{cache.NewStore()} + ServiceLister = &cache.StoreToServiceLister{cache.NewStore()} ) // ConfigFactory knows how to fill out a scheduler config with its support functions. @@ -48,15 +49,18 @@ type ConfigFactory struct { PodLister *cache.StoreToPodLister // a means to list all minions MinionLister *cache.StoreToNodeLister + // a means to list all services + ServiceLister *cache.StoreToServiceLister } // NewConfigFactory initializes the factory. func NewConfigFactory(client *client.Client) *ConfigFactory { return &ConfigFactory{ - Client: client, - PodQueue: cache.NewFIFO(), - PodLister: PodLister, - MinionLister: MinionLister, + Client: client, + PodQueue: cache.NewFIFO(), + PodLister: PodLister, + MinionLister: MinionLister, + ServiceLister: ServiceLister, } } @@ -105,6 +109,11 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() } + // Watch and cache all service objects. Scheduler needs to find all pods + // created by the same service, so that it can spread them correctly. + // Cache this locally. + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run() + r := rand.New(rand.NewSource(time.Now().UnixNano())) algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) @@ -178,6 +187,15 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &nodeEnumerator{list}, nil } +// createServiceLW returns a listWatch that gets all changes to services. +func (factory *ConfigFactory) createServiceLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: parseSelectorOrDie(""), + resource: "services", + } +} + func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err) @@ -208,6 +226,35 @@ type nodeEnumerator struct { *api.NodeList } +// storeToServiceLister turns a store into a service lister. The store must contain (only) services. +type storeToServiceLister struct { + cache.Store +} + +func (s *storeToServiceLister) ListServices() (services api.ServiceList, err error) { + for _, m := range s.List() { + services.Items = append(services.Items, *(m.(*api.Service))) + } + return services, nil +} + +func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) { + var selector labels.Selector + + for _, m := range s.List() { + service = *m.(*api.Service) + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + return service, nil + } + } + return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) +} + // Len returns the number of items in the node list. func (ne *nodeEnumerator) Len() int { if ne.NodeList == nil {