From 04db076e5fc9c4b7a8503fe2ce01716b01801cc9 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Fri, 12 Dec 2014 14:29:20 -0800 Subject: [PATCH 1/8] Enhancements to scheduler priority functions - Modified the existing spreading priority to consider service pods explicitly - Added a new priority function to spread pods across zones --- pkg/scheduler/listers.go | 37 +++ pkg/scheduler/spreading.go | 109 ++++++++- pkg/scheduler/spreading_test.go | 222 ++++++++++++++++-- .../algorithmprovider/defaults/defaults.go | 13 +- plugin/pkg/scheduler/factory/factory.go | 59 ++++- 5 files changed, 399 insertions(+), 41 deletions(-) diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index 6c893dcce93..9e7829a8efe 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -17,6 +17,8 @@ limitations under the License. package scheduler import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) @@ -52,3 +54,38 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) { } return selected, nil } + +// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. +type ServiceLister interface { + // Lists all the services + ListServices() (api.ServiceList, error) + // Gets the service for the given pod + GetPodService(api.Pod) (api.Service, error) +} + +// FakeServiceLister implements ServiceLister on []api.Service for test purposes. +type FakeServiceLister []api.Service + +// FakeServiceLister returns api.ServiceList, the list of all services. +func (f FakeServiceLister) ListServices() (api.ServiceList, error) { + return api.ServiceList{Items: f}, nil +} + +// GetPodService gets the service that has the selector that can match the labels on the given pod +// We are assuming a single service per pod. +// In case of multiple services per pod, the first service found is returned +func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) { + var selector labels.Selector + + for _, service := range f { + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + return service, nil + } + } + return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) +} diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 0c831484104..3afa6379731 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -17,28 +17,44 @@ limitations under the License. package scheduler import ( - "math/rand" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) +type ServiceSpread struct { + serviceLister ServiceLister +} + +func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction { + serviceSpread := &ServiceSpread{ + serviceLister: serviceLister, + } + return serviceSpread.CalculateSpreadPriority +} + // CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. // Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority // may not provide optimal spreading for the members of that Service. // TODO: consider if we want to include Service label sets in the scheduling priority. -func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { - pods, err := podLister.List(labels.SelectorFromSet(pod.Labels)) - if err != nil { - return nil, err +func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var maxCount int + var pods []api.Pod + var err error + + service, err := s.serviceLister.GetPodService(pod) + if err == nil { + selector := labels.SelectorFromSet(service.Spec.Selector) + pods, err = podLister.ListPods(selector) + if err != nil { + return nil, err + } } + minions, err := minionLister.List() if err != nil { return nil, err } - var maxCount int - var fScore float32 = 10.0 counts := map[string]int{} if len(pods) > 0 { for _, pod := range pods { @@ -54,6 +70,8 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest for _, minion := range minions.Items { + // initializing to the default/max minion score of 10 + fScore := float32(10) if maxCount > 0 { fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount)) } @@ -62,6 +80,77 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini return result, nil } -func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { - return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random) +type ZoneSpread struct { + serviceLister ServiceLister + zoneLabel string +} + +func NewZoneSpreadPriority(serviceLister ServiceLister, zoneLabel string) PriorityFunction { + zoneSpread := &ZoneSpread{ + serviceLister: serviceLister, + zoneLabel: zoneLabel, + } + return zoneSpread.ZoneSpreadPriority +} + +func (z *ZoneSpread) ZoneSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var service api.Service + var pods []api.Pod + var err error + + service, err = z.serviceLister.GetPodService(pod) + if err == nil { + selector := labels.SelectorFromSet(service.Spec.Selector) + pods, err = podLister.ListPods(selector) + if err != nil { + return nil, err + } + } + + minions, err := minionLister.List() + if err != nil { + return nil, err + } + + // find the zones that the minions belong to + openMinions := []string{} + zonedMinions := map[string]string{} + for _, minion := range minions.Items { + if labels.Set(minion.Labels).Has(z.zoneLabel) { + zone := labels.Set(minion.Labels).Get(z.zoneLabel) + zonedMinions[minion.Name] = zone + } else { + openMinions = append(openMinions, minion.Name) + } + } + + podCounts := map[string]int{} + numServicePods := len(pods) + if numServicePods > 0 { + for _, pod := range pods { + zone, exists := zonedMinions[pod.Status.Host] + if !exists { + continue + } + podCounts[zone]++ + } + } + + result := []HostPriority{} + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest + for minion := range zonedMinions { + // initializing to the default/max minion score of 10 + fScore := float32(10) + if numServicePods > 0 { + fScore = 10 * (float32(numServicePods-podCounts[zonedMinions[minion]]) / float32(numServicePods)) + } + result = append(result, HostPriority{host: minion, score: int(fScore)}) + } + // add the open minions with a score of 0 + for _, minion := range openMinions { + result = append(result, HostPriority{host: minion, score: 0}) + } + + return result, nil } diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 0e9c9093464..5c2614d4798 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -18,12 +18,13 @@ package scheduler import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -func TestSpreadPriority(t *testing.T) { +func TestServiceSpreadPriority(t *testing.T) { labels1 := map[string]string{ "foo": "bar", "baz": "blah", @@ -32,16 +33,17 @@ func TestSpreadPriority(t *testing.T) { "bar": "foo", "baz": "blah", } - machine1Status := api.PodStatus{ + zone1Status := api.PodStatus{ Host: "machine1", } - machine2Status := api.PodStatus{ + zone2Status := api.PodStatus{ Host: "machine2", } tests := []struct { pod api.Pod pods []api.Pod nodes []string + services []api.Service expectedList HostPriorityList test string }{ @@ -52,55 +54,72 @@ func TestSpreadPriority(t *testing.T) { }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, - pods: []api.Pod{{Status: machine1Status}}, + pods: []api.Pod{{Status: zone1Status}}, nodes: []string{"machine1", "machine2"}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "no labels", + test: "no services", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, - pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, + pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "different labels", + test: "different services", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}}, - test: "one label match", + test: "two pods, one service pod", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "two label matches on different machines", + test: "three pods, two service pods on different machines", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}}, - test: "three label matches", + test: "four pods, three service pods", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "service with partial pod label matches", }, } for _, test := range tests { - list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) + serviceSpread := ServiceSpread{serviceLister: FakeServiceLister(test.services)} + list, err := serviceSpread.CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -109,3 +128,166 @@ func TestSpreadPriority(t *testing.T) { } } } + +func TestZoneSpreadPriority(t *testing.T) { + labels1 := map[string]string{ + "foo": "bar", + "baz": "blah", + } + labels2 := map[string]string{ + "bar": "foo", + "baz": "blah", + } + zone1 := map[string]string{ + "zone": "zone1", + } + zone2 := map[string]string{ + "zone": "zone2", + } + nozone := map[string]string{ + "name": "value", + } + zone0Status := api.PodStatus{ + Host: "machine01", + } + zone1Status := api.PodStatus{ + Host: "machine11", + } + zone2Status := api.PodStatus{ + Host: "machine21", + } + labeledNodes := map[string]map[string]string{ + "machine01": nozone, "machine02": nozone, + "machine11": zone1, "machine12": zone1, + "machine21": zone2, "machine22": zone2, + } + tests := []struct { + pod api.Pod + pods []api.Pod + nodes map[string]map[string]string + services []api.Service + expectedList HostPriorityList + test string + }{ + { + nodes: labeledNodes, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "nothing scheduled", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{{Status: zone1Status}}, + nodes: labeledNodes, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "no services", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "different services", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 0}, {"machine22", 0}, + {"machine01", 0}, {"machine02", 0}}, + test: "three pods, one service pod", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 5}, {"machine12", 5}, + {"machine21", 5}, {"machine22", 5}, + {"machine01", 0}, {"machine02", 0}}, + test: "three pods, two service pods on different machines", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 6}, {"machine12", 6}, + {"machine21", 3}, {"machine22", 3}, + {"machine01", 0}, {"machine02", 0}}, + test: "four pods, three service pods", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []HostPriority{{"machine11", 3}, {"machine12", 3}, + {"machine21", 6}, {"machine22", 6}, + {"machine01", 0}, {"machine02", 0}}, + test: "service with partial pod label matches", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 7}, {"machine12", 7}, + {"machine21", 5}, {"machine22", 5}, + {"machine01", 0}, {"machine02", 0}}, + test: "service pod on non-zoned minion", + }, + } + + for _, test := range tests { + zoneSpread := ZoneSpread{serviceLister: FakeServiceLister(test.services), zoneLabel: "zone"} + list, err := zoneSpread.ZoneSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes))) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort the two lists to avoid failures on account of different ordering + sort.Sort(test.expectedList) + sort.Sort(list) + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } +} + +func makeLabeledMinionList(nodeMap map[string]map[string]string) (result api.NodeList) { + nodes := []api.Node{} + for nodeName, labels := range nodeMap { + nodes = append(nodes, api.Node{ObjectMeta: api.ObjectMeta{Name: nodeName, Labels: labels}}) + } + return api.NodeList{Items: nodes} +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index e93af9a4e32..69d5397ccf3 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -31,11 +31,11 @@ func defaultPredicates() util.StringSet { return util.NewStringSet( // Fit is defined based on the absence of port conflicts. factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts), - // Fit is determined by resource availability + // Fit is determined by resource availability. factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)), - // Fit is determined by non-conflicting disk volumes + // Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict), - // Fit is determined by node selector query + // Fit is determined by node selector query. factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)), // Fit is determined by the presence of the Host parameter and a string match factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost), @@ -46,8 +46,11 @@ func defaultPriorities() util.StringSet { return util.NewStringSet( // Prioritize nodes by least requested utilization. factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), - // spreads pods by minimizing the number of pods on the same minion with the same labels. - factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1), + // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. + factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), + // spreads pods belonging to the same service across minions in different zones + // TODO: remove the hardcoding of the "zone" label and move it to a constant + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewZoneSpreadPriority(factory.ServiceLister, "zone"), 1), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 80c226f5e02..b6a4afb14a6 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -35,8 +35,9 @@ import ( ) var ( - PodLister = &cache.StoreToPodLister{cache.NewStore()} - MinionLister = &cache.StoreToNodeLister{cache.NewStore()} + PodLister = &cache.StoreToPodLister{cache.NewStore()} + MinionLister = &cache.StoreToNodeLister{cache.NewStore()} + ServiceLister = &cache.StoreToServiceLister{cache.NewStore()} ) // ConfigFactory knows how to fill out a scheduler config with its support functions. @@ -48,15 +49,18 @@ type ConfigFactory struct { PodLister *cache.StoreToPodLister // a means to list all minions MinionLister *cache.StoreToNodeLister + // a means to list all services + ServiceLister *cache.StoreToServiceLister } // NewConfigFactory initializes the factory. func NewConfigFactory(client *client.Client) *ConfigFactory { return &ConfigFactory{ - Client: client, - PodQueue: cache.NewFIFO(), - PodLister: PodLister, - MinionLister: MinionLister, + Client: client, + PodQueue: cache.NewFIFO(), + PodLister: PodLister, + MinionLister: MinionLister, + ServiceLister: ServiceLister, } } @@ -105,6 +109,11 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() } + // Watch and cache all service objects. Scheduler needs to find all pods + // created by the same service, so that it can spread them correctly. + // Cache this locally. + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run() + r := rand.New(rand.NewSource(time.Now().UnixNano())) algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) @@ -178,6 +187,15 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &nodeEnumerator{list}, nil } +// createServiceLW returns a listWatch that gets all changes to services. +func (factory *ConfigFactory) createServiceLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: parseSelectorOrDie(""), + resource: "services", + } +} + func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err) @@ -208,6 +226,35 @@ type nodeEnumerator struct { *api.NodeList } +// storeToServiceLister turns a store into a service lister. The store must contain (only) services. +type storeToServiceLister struct { + cache.Store +} + +func (s *storeToServiceLister) ListServices() (services api.ServiceList, err error) { + for _, m := range s.List() { + services.Items = append(services.Items, *(m.(*api.Service))) + } + return services, nil +} + +func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) { + var selector labels.Selector + + for _, m := range s.List() { + service = *m.(*api.Service) + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + return service, nil + } + } + return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) +} + // Len returns the number of items in the node list. func (ne *nodeEnumerator) Len() int { if ne.NodeList == nil { From 9dd7d2a0a17286b9b099abec889f16a6da1dde38 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Mon, 22 Dec 2014 13:54:41 -0800 Subject: [PATCH 2/8] Adding label checker predicates and test cases --- pkg/scheduler/predicates.go | 37 +++++++ pkg/scheduler/predicates_test.go | 60 +++++++++++ pkg/scheduler/priorities.go | 45 ++++++++ pkg/scheduler/priorities_test.go | 100 ++++++++++++++++++ pkg/scheduler/spreading.go | 20 ++-- pkg/scheduler/spreading_test.go | 4 +- .../algorithmprovider/defaults/defaults.go | 2 +- .../labelchecker/labelchecker.go | 44 ++++++++ 8 files changed, 299 insertions(+), 13 deletions(-) create mode 100644 plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index fefe23c9fe1..5d721b58943 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -167,6 +167,43 @@ func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error) return pod.Spec.Host == node, nil } +type NodeLabelChecker struct { + info NodeInfo + labels []string + presence bool +} + +func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPredicate { + labelChecker := &NodeLabelChecker{ + info: info, + labels: labels, + presence: presence, + } + return labelChecker.CheckNodeLabelPresence +} + +// CheckNodeLabelPresence checks whether a particular label exists on a minion or not, regardless of its value +// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels +// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected +// +// Alternately, eliminating minions that have a certain label, regardless of value, is also useful +// A minion may have a label with "retiring" as key and the date as the value +// and it may be desirable to avoid scheduling new pods on this minion +func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + var exists bool + minion, err := n.info.GetNodeInfo(node) + if err != nil { + return false, err + } + for _, label := range n.labels { + exists = labels.Set(minion.Labels).Has(label) + if (exists && !n.presence) || (!exists && n.presence) { + return false, nil + } + } + return true, nil +} + func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 5fe556207f1..9903a94ba2e 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -386,3 +386,63 @@ func TestPodFitsSelector(t *testing.T) { } } } + +func TestNodeLabelPresence(t *testing.T) { + label := map[string]string{"foo": "bar", "bar": "foo"} + tests := []struct { + pod api.Pod + existingPods []api.Pod + labels []string + presence bool + fits bool + test string + }{ + { + labels: []string{"baz"}, + presence: true, + fits: false, + test: "label does not match, presence true", + }, + { + labels: []string{"baz"}, + presence: false, + fits: true, + test: "label does not match, presence false", + }, + { + labels: []string{"foo", "baz"}, + presence: true, + fits: false, + test: "one label matches, presence true", + }, + { + labels: []string{"foo", "baz"}, + presence: false, + fits: false, + test: "one label matches, presence false", + }, + { + labels: []string{"foo", "bar"}, + presence: true, + fits: true, + test: "all labels match, presence true", + }, + { + labels: []string{"foo", "bar"}, + presence: false, + fits: false, + test: "all labels match, presence false", + }, + } + for _, test := range tests { + node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}} + labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence} + fits, err := labelChecker.CheckNodeLabelPresence(test.pod, test.existingPods, "machine") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } +} diff --git a/pkg/scheduler/priorities.go b/pkg/scheduler/priorities.go index dc3250cb8a2..47d04117766 100644 --- a/pkg/scheduler/priorities.go +++ b/pkg/scheduler/priorities.go @@ -18,6 +18,7 @@ package scheduler import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/golang/glog" ) @@ -88,3 +89,47 @@ func LeastRequestedPriority(pod api.Pod, podLister PodLister, minionLister Minio } return list, nil } + +type NodeLabelPrioritizer struct { + label string + presence bool +} + +func NewNodeLabelPriority(label string, presence bool) PriorityFunction { + labelPrioritizer := &NodeLabelPrioritizer{ + label: label, + presence: presence, + } + return labelPrioritizer.CalculateNodeLabelPriority +} + +// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value +// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels +// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected +func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var score int + minions, err := minionLister.List() + if err != nil { + return nil, err + } + + // find the zones that the minions belong to + labeledMinions := map[string]bool{} + for _, minion := range minions.Items { + exists := labels.Set(minion.Labels).Has(n.label) + labeledMinions[minion.Name] = (exists && n.presence) || (!exists && !n.presence) + } + + result := []HostPriority{} + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest + for minionName, success := range labeledMinions { + if success { + score = 10 + } else { + score = 0 + } + result = append(result, HostPriority{host: minionName, score: score}) + } + return result, nil +} diff --git a/pkg/scheduler/priorities_test.go b/pkg/scheduler/priorities_test.go index 23f932e0a0d..8aa878586b4 100644 --- a/pkg/scheduler/priorities_test.go +++ b/pkg/scheduler/priorities_test.go @@ -18,6 +18,7 @@ package scheduler import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -238,3 +239,102 @@ func TestLeastRequested(t *testing.T) { } } } + +func TestNewNodeLabelPriority(t *testing.T) { + label1 := map[string]string{"foo": "bar"} + label2 := map[string]string{"bar": "foo"} + label3 := map[string]string{"bar": "baz"} + tests := []struct { + pod api.Pod + pods []api.Pod + nodes []api.Node + label string + presence bool + expectedList HostPriorityList + test string + }{ + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}, {"machine3", 0}}, + label: "baz", + presence: true, + test: "no match found, presence true", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}, {"machine3", 10}}, + label: "baz", + presence: false, + test: "no match found, presence false", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, + label: "foo", + presence: true, + test: "one match found, presence true", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, + label: "foo", + presence: false, + test: "one match found, presence false", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, + label: "bar", + presence: true, + test: "two matches found, presence true", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, + label: "bar", + presence: false, + test: "two matches found, presence false", + }, + } + + for _, test := range tests { + prioritizer := NodeLabelPrioritizer{ + label: test.label, + presence: test.presence, + } + list, err := prioritizer.CalculateNodeLabelPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(api.NodeList{Items: test.nodes})) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort the two lists to avoid failures on account of different ordering + sort.Sort(test.expectedList) + sort.Sort(list) + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } +} diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 3afa6379731..9ec98bb913a 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -80,25 +80,25 @@ func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister return result, nil } -type ZoneSpread struct { +type ServiceAntiAffinity struct { serviceLister ServiceLister - zoneLabel string + label string } -func NewZoneSpreadPriority(serviceLister ServiceLister, zoneLabel string) PriorityFunction { - zoneSpread := &ZoneSpread{ +func NewServiceAntiAffinityPriority(serviceLister ServiceLister, label string) PriorityFunction { + antiAffinity := &ServiceAntiAffinity{ serviceLister: serviceLister, - zoneLabel: zoneLabel, + label: label, } - return zoneSpread.ZoneSpreadPriority + return antiAffinity.CalculateAntiAffinityPriority } -func (z *ZoneSpread) ZoneSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { +func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { var service api.Service var pods []api.Pod var err error - service, err = z.serviceLister.GetPodService(pod) + service, err = s.serviceLister.GetPodService(pod) if err == nil { selector := labels.SelectorFromSet(service.Spec.Selector) pods, err = podLister.ListPods(selector) @@ -116,8 +116,8 @@ func (z *ZoneSpread) ZoneSpreadPriority(pod api.Pod, podLister PodLister, minion openMinions := []string{} zonedMinions := map[string]string{} for _, minion := range minions.Items { - if labels.Set(minion.Labels).Has(z.zoneLabel) { - zone := labels.Set(minion.Labels).Get(z.zoneLabel) + if labels.Set(minion.Labels).Has(s.label) { + zone := labels.Set(minion.Labels).Get(s.label) zonedMinions[minion.Name] = zone } else { openMinions = append(openMinions, minion.Name) diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 5c2614d4798..fe891cf2504 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -270,8 +270,8 @@ func TestZoneSpreadPriority(t *testing.T) { } for _, test := range tests { - zoneSpread := ZoneSpread{serviceLister: FakeServiceLister(test.services), zoneLabel: "zone"} - list, err := zoneSpread.ZoneSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes))) + zoneSpread := ServiceAntiAffinity{serviceLister: FakeServiceLister(test.services), label: "zone"} + list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 69d5397ccf3..4ebe7f7ccc5 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -50,7 +50,7 @@ func defaultPriorities() util.StringSet { factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), // spreads pods belonging to the same service across minions in different zones // TODO: remove the hardcoding of the "zone" label and move it to a constant - factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewZoneSpreadPriority(factory.ServiceLister, "zone"), 1), + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) diff --git a/plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go b/plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go new file mode 100644 index 00000000000..8419d24382e --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go @@ -0,0 +1,44 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This is the default algorithm provider for the scheduler. +package labelchecker + +import ( + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" +) + +const Provider string = "LabelCheckerProvider" + +func init() { + factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities()) +} + +func defaultPredicates() util.StringSet { + return util.NewStringSet( + // Fit is defined based on the presence/absence of a label on a minion, regardless of value. + factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)), + ) +} + +func defaultPriorities() util.StringSet { + return util.NewStringSet( + // Prioritize nodes based on the presence/absence of a label on a minion, regardless of value. + factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("", true), 1), + ) +} From 3f722a3d8ec5855fa8e7b7f674793f991950cbcb Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Mon, 22 Dec 2014 15:55:31 -0800 Subject: [PATCH 3/8] Adding service affinity predicate --- pkg/scheduler/predicates.go | 78 ++++++++++- pkg/scheduler/predicates_test.go | 128 ++++++++++++++++++ .../algorithmprovider/affinity/affinity.go | 46 +++++++ .../algorithmprovider/defaults/defaults.go | 3 - 4 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index 5d721b58943..e31a7f44205 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -183,7 +183,7 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPre } // CheckNodeLabelPresence checks whether a particular label exists on a minion or not, regardless of its value -// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels +// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels // In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected // // Alternately, eliminating minions that have a certain label, regardless of value, is also useful @@ -204,6 +204,82 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []ap return true, nil } +type ServiceAffinity struct { + podLister PodLister + serviceLister ServiceLister + nodeInfo NodeInfo + labels []string +} + +func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceLister, nodeInfo NodeInfo, labels []string) FitPredicate { + affinity := &ServiceAffinity{ + podLister: podLister, + serviceLister: serviceLister, + nodeInfo: nodeInfo, + labels: labels, + } + return affinity.CheckServiceAffinity +} + +func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + var affinitySelector labels.Selector + + // check if the pod being scheduled has the affinity labels specified + affinityLabels := map[string]string{} + labelsExist := true + for _, l := range s.labels { + if labels.Set(pod.Labels).Has(l) { + affinityLabels[l] = labels.Set(pod.Labels).Get(l) + } else { + // the current pod does not specify all the labels, look in the existing service pods + labelsExist = false + } + } + + // skip looking at other pods in the service if the current pod defines all the required affinity labels + if !labelsExist { + service, err := s.serviceLister.GetPodService(pod) + if err == nil { + selector := labels.SelectorFromSet(service.Spec.Selector) + servicePods, err := s.podLister.ListPods(selector) + if err != nil { + return false, err + } + if len(servicePods) > 0 { + // consider any service pod and fetch the minion its hosted on + otherMinion, err := s.nodeInfo.GetNodeInfo(servicePods[0].Status.Host) + if err != nil { + return false, err + } + for _, l := range s.labels { + // If the pod being scheduled has the label value specified, do not override it + if _, exists := affinityLabels[l]; exists { + continue + } + if labels.Set(otherMinion.Labels).Has(l) { + affinityLabels[l] = labels.Set(otherMinion.Labels).Get(l) + } + } + } + } + } + + // if there are no existing pods in the service, consider all minions + if len(affinityLabels) == 0 { + affinitySelector = labels.Everything() + } else { + affinitySelector = labels.Set(affinityLabels).AsSelector() + } + + minion, err := s.nodeInfo.GetNodeInfo(node) + if err != nil { + return false, err + } + + // check if the minion matches the selector + return affinitySelector.Matches(labels.Set(minion.Labels)), nil +} + func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 9903a94ba2e..521c82f99aa 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "fmt" "reflect" "testing" @@ -31,7 +32,22 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return &node, nil } +<<<<<<< HEAD func makeResources(milliCPU int64, memory int64) api.NodeResources { +======= +type FakeNodeListInfo []api.Node + +func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { + for _, node := range nodes { + if node.Name == nodeName { + return &node, nil + } + } + return nil, fmt.Errorf("Unable to find node: %s", nodeName) +} + +func makeResources(milliCPU int, memory int) api.NodeResources { +>>>>>>> e0101c2... Adding service affinity predicate return api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), @@ -446,3 +462,115 @@ func TestNodeLabelPresence(t *testing.T) { } } } + +func TestServiceAffinity(t *testing.T) { + selector := map[string]string{"foo": "bar"} + labels1 := map[string]string{ + "region": "r1", + "zone": "z11", + } + labels2 := map[string]string{ + "region": "r1", + "zone": "z12", + } + labels3 := map[string]string{ + "region": "r2", + "zone": "z21", + } + labels4 := map[string]string{ + "region": "r2", + "zone": "z22", + } + node1 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labels1}} + node2 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: labels2}} + node3 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: labels3}} + node4 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine4", Labels: labels4}} + node5 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine5", Labels: labels4}} + tests := []struct { + pod api.Pod + pods []api.Pod + services []api.Service + node string + labels []string + fits bool + test string + }{ + { + node: "machine1", + fits: true, + labels: []string{"region"}, + test: "nothing scheduled", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r1"}}}, + node: "machine1", + fits: true, + labels: []string{"region"}, + test: "pod with region label match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r2"}}}, + node: "machine1", + fits: false, + labels: []string{"region"}, + test: "pod with region label mismatch", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region"}, + test: "service pod on same minion", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region"}, + test: "service pod on different minion, region match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: false, + labels: []string{"region"}, + test: "service pod on different minion, region mismatch", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: false, + labels: []string{"region", "zone"}, + test: "service pod on different minion, multiple labels, not all match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine4", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region", "zone"}, + test: "service pod on different minion, multiple labels, all match", + }, + } + + for _, test := range tests { + nodes := []api.Node{node1, node2, node3, node4, node5} + serviceAffinity := ServiceAffinity{FakePodLister(test.pods), FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} + fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []api.Pod{}, test.node) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } +} diff --git a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go new file mode 100644 index 00000000000..719e4795175 --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go @@ -0,0 +1,46 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This algorithm provider has predicates and priorities related to affinity/anti-affinity for the scheduler. +package affinity + +import ( + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" +) + +const Provider string = "AffinityProvider" + +func init() { + factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities()) +} + +func defaultPredicates() util.StringSet { + return util.NewStringSet( + // Fit is defined based on whether the minion has the specified label values as the pod being scheduled + // Alternately, if the pod does not specify any/all labels, the other pods in the service are looked at + factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})), + ) +} + +func defaultPriorities() util.StringSet { + return util.NewStringSet( + // spreads pods belonging to the same service across minions in different zones + // region and zone can be nested infrastructure topology levels and defined by labels on minions + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1), + ) +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 4ebe7f7ccc5..c97f933884b 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -48,9 +48,6 @@ func defaultPriorities() util.StringSet { factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), - // spreads pods belonging to the same service across minions in different zones - // TODO: remove the hardcoding of the "zone" label and move it to a constant - factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) From 9e75a05df07a22d78db4aef4d9a1925e2c00c2ce Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Mon, 5 Jan 2015 14:51:22 -0800 Subject: [PATCH 4/8] Implementing PR feedback --- pkg/scheduler/listers.go | 18 ++++---- pkg/scheduler/predicates.go | 27 ++++++++--- pkg/scheduler/predicates_test.go | 4 +- pkg/scheduler/priorities.go | 7 ++- pkg/scheduler/spreading.go | 46 +++++++++---------- .../algorithmprovider/affinity/affinity.go | 24 +++++++--- .../labelchecker/labelchecker.go | 44 ------------------ .../scheduler/algorithmprovider/plugins.go | 1 + .../algorithmprovider/plugins_test.go | 2 + plugin/pkg/scheduler/factory/factory.go | 11 +++-- 10 files changed, 86 insertions(+), 98 deletions(-) delete mode 100644 plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index 9e7829a8efe..e6655ef9acf 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -59,8 +59,8 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) { type ServiceLister interface { // Lists all the services ListServices() (api.ServiceList, error) - // Gets the service for the given pod - GetPodService(api.Pod) (api.Service, error) + // Gets the services for the given pod + GetPodServices(api.Pod) ([]api.Service, error) } // FakeServiceLister implements ServiceLister on []api.Service for test purposes. @@ -71,10 +71,8 @@ func (f FakeServiceLister) ListServices() (api.ServiceList, error) { return api.ServiceList{Items: f}, nil } -// GetPodService gets the service that has the selector that can match the labels on the given pod -// We are assuming a single service per pod. -// In case of multiple services per pod, the first service found is returned -func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) { +// GetPodServices gets the services that have the selector that match the labels on the given pod +func (f FakeServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { var selector labels.Selector for _, service := range f { @@ -84,8 +82,12 @@ func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err } selector = labels.Set(service.Spec.Selector).AsSelector() if selector.Matches(labels.Set(pod.Labels)) { - return service, nil + services = append(services, service) } } - return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + if len(services) == 0 { + err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return } diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index e31a7f44205..a626f2910e6 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -182,7 +182,12 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPre return labelChecker.CheckNodeLabelPresence } -// CheckNodeLabelPresence checks whether a particular label exists on a minion or not, regardless of its value +// CheckNodeLabelPresence checks whether all of the specified labels exists on a minion or not, regardless of their value +// If "presence" is false, then returns false if any of the requested labels matches any of the minion's labels, +// otherwise returns true. +// If "presence" is true, then returns false if any of the requested labels does not match any of the minion's labels, +// otherwise returns true. +// // Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels // In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected // @@ -195,8 +200,9 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []ap if err != nil { return false, err } + minionLabels := labels.Set(minion.Labels) for _, label := range n.labels { - exists = labels.Set(minion.Labels).Has(label) + exists = minionLabels.Has(label) if (exists && !n.presence) || (!exists && n.presence) { return false, nil } @@ -221,15 +227,20 @@ func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceListe return affinity.CheckServiceAffinity } +// CheckServiceAffinity ensures that only the minions that match the specified labels are considered for scheduling. +// The set of labels to be considered are provided to the struct (ServiceAffinity). +// The pod is checked for the labels and any missing labels are then checked in the minion +// that hosts the service pods (peers) for the given pod. func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { var affinitySelector labels.Selector - // check if the pod being scheduled has the affinity labels specified + // check if the pod being scheduled has the affinity labels specified in its NodeSelector affinityLabels := map[string]string{} + nodeSelector := labels.Set(pod.Spec.NodeSelector) labelsExist := true for _, l := range s.labels { - if labels.Set(pod.Labels).Has(l) { - affinityLabels[l] = labels.Set(pod.Labels).Get(l) + if nodeSelector.Has(l) { + affinityLabels[l] = nodeSelector.Get(l) } else { // the current pod does not specify all the labels, look in the existing service pods labelsExist = false @@ -238,9 +249,11 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.P // skip looking at other pods in the service if the current pod defines all the required affinity labels if !labelsExist { - service, err := s.serviceLister.GetPodService(pod) + services, err := s.serviceLister.GetPodServices(pod) if err == nil { - selector := labels.SelectorFromSet(service.Spec.Selector) + // just use the first service and get the other pods within the service + // TODO: a separate predicate can be created that tries to handle all services for the pod + selector := labels.SelectorFromSet(services[0].Spec.Selector) servicePods, err := s.podLister.ListPods(selector) if err != nil { return false, err diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 521c82f99aa..27f3345da3f 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -502,14 +502,14 @@ func TestServiceAffinity(t *testing.T) { test: "nothing scheduled", }, { - pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r1"}}}, + pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r1"}}}, node: "machine1", fits: true, labels: []string{"region"}, test: "pod with region label match", }, { - pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r2"}}}, + pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r2"}}}, node: "machine1", fits: false, labels: []string{"region"}, diff --git a/pkg/scheduler/priorities.go b/pkg/scheduler/priorities.go index 47d04117766..6685358c949 100644 --- a/pkg/scheduler/priorities.go +++ b/pkg/scheduler/priorities.go @@ -103,9 +103,9 @@ func NewNodeLabelPriority(label string, presence bool) PriorityFunction { return labelPrioritizer.CalculateNodeLabelPriority } -// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value -// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels -// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected +// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value. +// If presence is true, prioritizes minions that have the specified label, regardless of value. +// If presence is false, prioritizes minions that do not have the specified label. func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { var score int minions, err := minionLister.List() @@ -113,7 +113,6 @@ func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod api.Pod, podLister return nil, err } - // find the zones that the minions belong to labeledMinions := map[string]bool{} for _, minion := range minions.Items { exists := labels.Set(minion.Labels).Has(n.label) diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 9ec98bb913a..a68896bf383 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -41,9 +41,11 @@ func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister var pods []api.Pod var err error - service, err := s.serviceLister.GetPodService(pod) + services, err := s.serviceLister.GetPodServices(pod) if err == nil { - selector := labels.SelectorFromSet(service.Spec.Selector) + // just use the first service and get the other pods within the service + // TODO: a separate predicate can be created that tries to handle all services for the pod + selector := labels.SelectorFromSet(services[0].Spec.Selector) pods, err = podLister.ListPods(selector) if err != nil { return nil, err @@ -94,13 +96,13 @@ func NewServiceAntiAffinityPriority(serviceLister ServiceLister, label string) P } func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { - var service api.Service var pods []api.Pod - var err error - service, err = s.serviceLister.GetPodService(pod) + services, err := s.serviceLister.GetPodServices(pod) if err == nil { - selector := labels.SelectorFromSet(service.Spec.Selector) + // just use the first service and get the other pods within the service + // TODO: a separate predicate can be created that tries to handle all services for the pod + selector := labels.SelectorFromSet(services[0].Spec.Selector) pods, err = podLister.ListPods(selector) if err != nil { return nil, err @@ -112,43 +114,41 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podList return nil, err } - // find the zones that the minions belong to - openMinions := []string{} - zonedMinions := map[string]string{} + // separate out the minions that have the label from the ones that don't + otherMinions := []string{} + labeledMinions := map[string]string{} for _, minion := range minions.Items { if labels.Set(minion.Labels).Has(s.label) { - zone := labels.Set(minion.Labels).Get(s.label) - zonedMinions[minion.Name] = zone + label := labels.Set(minion.Labels).Get(s.label) + labeledMinions[minion.Name] = label } else { - openMinions = append(openMinions, minion.Name) + otherMinions = append(otherMinions, minion.Name) } } podCounts := map[string]int{} - numServicePods := len(pods) - if numServicePods > 0 { - for _, pod := range pods { - zone, exists := zonedMinions[pod.Status.Host] - if !exists { - continue - } - podCounts[zone]++ + for _, pod := range pods { + zone, exists := labeledMinions[pod.Status.Host] + if !exists { + continue } + podCounts[zone]++ } + numServicePods := len(pods) result := []HostPriority{} //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest - for minion := range zonedMinions { + for minion := range labeledMinions { // initializing to the default/max minion score of 10 fScore := float32(10) if numServicePods > 0 { - fScore = 10 * (float32(numServicePods-podCounts[zonedMinions[minion]]) / float32(numServicePods)) + fScore = 10 * (float32(numServicePods-podCounts[labeledMinions[minion]]) / float32(numServicePods)) } result = append(result, HostPriority{host: minion, score: int(fScore)}) } // add the open minions with a score of 0 - for _, minion := range openMinions { + for _, minion := range otherMinions { result = append(result, HostPriority{host: minion, score: 0}) } diff --git a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go index 719e4795175..47db0acf0b4 100644 --- a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go +++ b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go @@ -23,24 +23,34 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" ) -const Provider string = "AffinityProvider" +const AffinityProvider string = "AffinityProvider" func init() { - factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities()) + factory.RegisterAlgorithmProvider(AffinityProvider, affinityPredicates(), affinityPriorities()) } -func defaultPredicates() util.StringSet { +func affinityPredicates() util.StringSet { return util.NewStringSet( - // Fit is defined based on whether the minion has the specified label values as the pod being scheduled - // Alternately, if the pod does not specify any/all labels, the other pods in the service are looked at + "HostName", + "MatchNodeSelector", + "PodFitsPorts", + "PodFitsResources", + "NoDiskConflict", + // Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})), + // Fit is defined based on the presence/absence of the "region" label on a minion, regardless of value. + factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)), ) } -func defaultPriorities() util.StringSet { +func affinityPriorities() util.StringSet { return util.NewStringSet( + "LeastRequestedPriority", + "ServiceSpreadingPriority", // spreads pods belonging to the same service across minions in different zones // region and zone can be nested infrastructure topology levels and defined by labels on minions - factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1), + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2), + // Prioritize nodes based on the presence/absence of a label on a minion, regardless of value. + factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1), ) } diff --git a/plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go b/plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go deleted file mode 100644 index 8419d24382e..00000000000 --- a/plugin/pkg/scheduler/algorithmprovider/labelchecker/labelchecker.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// This is the default algorithm provider for the scheduler. -package labelchecker - -import ( - algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" -) - -const Provider string = "LabelCheckerProvider" - -func init() { - factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities()) -} - -func defaultPredicates() util.StringSet { - return util.NewStringSet( - // Fit is defined based on the presence/absence of a label on a minion, regardless of value. - factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)), - ) -} - -func defaultPriorities() util.StringSet { - return util.NewStringSet( - // Prioritize nodes based on the presence/absence of a label on a minion, regardless of value. - factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("", true), 1), - ) -} diff --git a/plugin/pkg/scheduler/algorithmprovider/plugins.go b/plugin/pkg/scheduler/algorithmprovider/plugins.go index d534b05c0bf..ac7123efe26 100644 --- a/plugin/pkg/scheduler/algorithmprovider/plugins.go +++ b/plugin/pkg/scheduler/algorithmprovider/plugins.go @@ -18,5 +18,6 @@ limitations under the License. package algorithmprovider import ( + _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity" _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults" ) diff --git a/plugin/pkg/scheduler/algorithmprovider/plugins_test.go b/plugin/pkg/scheduler/algorithmprovider/plugins_test.go index 965635d8e7f..8a205961a72 100644 --- a/plugin/pkg/scheduler/algorithmprovider/plugins_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/plugins_test.go @@ -19,12 +19,14 @@ package algorithmprovider import ( "testing" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" ) var ( algorithmProviderNames = []string{ factory.DefaultProvider, + affinity.AffinityProvider, } ) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index b6a4afb14a6..f53314021b7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -238,8 +238,9 @@ func (s *storeToServiceLister) ListServices() (services api.ServiceList, err err return services, nil } -func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) { +func (s *storeToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { var selector labels.Selector + var service api.Service for _, m := range s.List() { service = *m.(*api.Service) @@ -249,10 +250,14 @@ func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service, } selector = labels.Set(service.Spec.Selector).AsSelector() if selector.Matches(labels.Set(pod.Labels)) { - return service, nil + services = append(services, service) } } - return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + if len(services) == 0 { + err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return } // Len returns the number of items in the node list. From 6fd0b181e3a899be283eae1195071432da7240af Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Wed, 7 Jan 2015 18:18:21 -0800 Subject: [PATCH 5/8] Rebased onto the latest changes to the scheduler code --- pkg/client/cache/listers.go | 32 +++++++++++-- pkg/scheduler/listers.go | 4 +- pkg/scheduler/predicates_test.go | 6 +-- .../algorithmprovider/affinity/affinity.go | 5 +- plugin/pkg/scheduler/factory/factory.go | 46 +++---------------- 5 files changed, 39 insertions(+), 54 deletions(-) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index f98dff6e4a2..a8eb8f57700 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -75,17 +75,41 @@ func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) { return nil, fmt.Errorf("minion '%v' is not in cache", id) } -// StoreToServiceLister makes a Store have the List method of the client.ServiceInterface +// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface // The Store must contain (only) Services. type StoreToServiceLister struct { Store } -func (s *StoreToServiceLister) List() (svcs api.ServiceList, err error) { +func (s *StoreToServiceLister) List() (services api.ServiceList, err error) { for _, m := range s.Store.List() { - svcs.Items = append(svcs.Items, *(m.(*api.Service))) + services.Items = append(services.Items, *(m.(*api.Service))) } - return svcs, nil + return services, nil +} + +// TODO: Move this back to scheduler as a helper function that takes a Store, +// rather than a method of StoreToServiceLister. +func (s *StoreToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { + var selector labels.Selector + var service api.Service + + for _, m := range s.Store.List() { + service = *m.(*api.Service) + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + services = append(services, service) + } + } + if len(services) == 0 { + err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return } // TODO: add StoreToEndpointsLister for use in kube-proxy. diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index e6655ef9acf..10d887c9417 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -58,7 +58,7 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) { // ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. type ServiceLister interface { // Lists all the services - ListServices() (api.ServiceList, error) + List() (api.ServiceList, error) // Gets the services for the given pod GetPodServices(api.Pod) ([]api.Service, error) } @@ -67,7 +67,7 @@ type ServiceLister interface { type FakeServiceLister []api.Service // FakeServiceLister returns api.ServiceList, the list of all services. -func (f FakeServiceLister) ListServices() (api.ServiceList, error) { +func (f FakeServiceLister) List() (api.ServiceList, error) { return api.ServiceList{Items: f}, nil } diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 27f3345da3f..5f91ba248f8 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -32,9 +32,6 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return &node, nil } -<<<<<<< HEAD -func makeResources(milliCPU int64, memory int64) api.NodeResources { -======= type FakeNodeListInfo []api.Node func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { @@ -46,8 +43,7 @@ func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return nil, fmt.Errorf("Unable to find node: %s", nodeName) } -func makeResources(milliCPU int, memory int) api.NodeResources { ->>>>>>> e0101c2... Adding service affinity predicate +func makeResources(milliCPU int64, memory int64) api.NodeResources { return api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), diff --git a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go index 47db0acf0b4..ee023355d47 100644 --- a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go +++ b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go @@ -38,7 +38,7 @@ func affinityPredicates() util.StringSet { "NoDiskConflict", // Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})), - // Fit is defined based on the presence/absence of the "region" label on a minion, regardless of value. + // Fit is defined based on the presence of the "region" label on a minion, regardless of value. factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)), ) } @@ -48,9 +48,8 @@ func affinityPriorities() util.StringSet { "LeastRequestedPriority", "ServiceSpreadingPriority", // spreads pods belonging to the same service across minions in different zones - // region and zone can be nested infrastructure topology levels and defined by labels on minions factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2), - // Prioritize nodes based on the presence/absence of a label on a minion, regardless of value. + // Prioritize nodes based on the presence of the "zone" label on a minion, regardless of value. factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1), ) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index f53314021b7..d538c181378 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -187,12 +187,12 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &nodeEnumerator{list}, nil } -// createServiceLW returns a listWatch that gets all changes to services. -func (factory *ConfigFactory) createServiceLW() *listWatch { - return &listWatch{ - client: factory.Client, - fieldSelector: parseSelectorOrDie(""), - resource: "services", +// createServiceLW returns a cache.ListWatch that gets all changes to services. +func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { + return &cache.ListWatch{ + Client: factory.Client, + FieldSelector: parseSelectorOrDie(""), + Resource: "services", } } @@ -226,40 +226,6 @@ type nodeEnumerator struct { *api.NodeList } -// storeToServiceLister turns a store into a service lister. The store must contain (only) services. -type storeToServiceLister struct { - cache.Store -} - -func (s *storeToServiceLister) ListServices() (services api.ServiceList, err error) { - for _, m := range s.List() { - services.Items = append(services.Items, *(m.(*api.Service))) - } - return services, nil -} - -func (s *storeToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { - var selector labels.Selector - var service api.Service - - for _, m := range s.List() { - service = *m.(*api.Service) - // consider only services that are in the same namespace as the pod - if service.Namespace != pod.Namespace { - continue - } - selector = labels.Set(service.Spec.Selector).AsSelector() - if selector.Matches(labels.Set(pod.Labels)) { - services = append(services, service) - } - } - if len(services) == 0 { - err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - - return -} - // Len returns the number of items in the node list. func (ne *nodeEnumerator) Len() int { if ne.NodeList == nil { From 40df5f6db804b8a5b3a4f6117011629d8fe29533 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Wed, 7 Jan 2015 22:18:22 -0800 Subject: [PATCH 6/8] Added a more formal comment for the service affinity predicate --- pkg/scheduler/predicates.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index a626f2910e6..babbd8037a0 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -231,6 +231,11 @@ func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceListe // The set of labels to be considered are provided to the struct (ServiceAffinity). // The pod is checked for the labels and any missing labels are then checked in the minion // that hosts the service pods (peers) for the given pod. +// +// We add an implicit selector requiring some particular value V for label L to a pod, if: +// - L is listed in the ServiceAffinity object that is passed into the function +// - the pod does not have any NodeSelector for L +// - some other pod from the same service is already scheduled onto a minion that has value V for label L func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { var affinitySelector labels.Selector From c20d062d16e49d538bd657843a0b20da458eff34 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Tue, 13 Jan 2015 09:30:16 -0800 Subject: [PATCH 7/8] Added comments and minor changes based on PR feedback --- pkg/scheduler/spreading.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index a68896bf383..8bd31cc5cb7 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -35,7 +35,6 @@ func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction { // CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. // Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority // may not provide optimal spreading for the members of that Service. -// TODO: consider if we want to include Service label sets in the scheduling priority. func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { var maxCount int var pods []api.Pod @@ -95,6 +94,9 @@ func NewServiceAntiAffinityPriority(serviceLister ServiceLister, label string) P return antiAffinity.CalculateAntiAffinityPriority } +// CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service +// on machines with the same value for a particular label. +// The label to be considered is provided to the struct (ServiceAntiAffinity). func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { var pods []api.Pod @@ -128,11 +130,11 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podList podCounts := map[string]int{} for _, pod := range pods { - zone, exists := labeledMinions[pod.Status.Host] + label, exists := labeledMinions[pod.Status.Host] if !exists { continue } - podCounts[zone]++ + podCounts[label]++ } numServicePods := len(pods) From dbac18a909b09f32bbee792fc748a7f97a4079f6 Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Tue, 13 Jan 2015 09:52:37 -0800 Subject: [PATCH 8/8] Rebasing onto latest code and fixing issues --- pkg/scheduler/predicates.go | 2 +- pkg/scheduler/spreading.go | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index babbd8037a0..a7137c30d60 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -259,7 +259,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.P // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector) - servicePods, err := s.podLister.ListPods(selector) + servicePods, err := s.podLister.List(selector) if err != nil { return false, err } diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 8bd31cc5cb7..f72f19c93ff 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -32,9 +32,8 @@ func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction { return serviceSpread.CalculateSpreadPriority } -// CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. -// Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority -// may not provide optimal spreading for the members of that Service. +// CalculateSpreadPriority spreads pods by minimizing the number of pods belonging to the same service +// on the same machine. func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { var maxCount int var pods []api.Pod @@ -45,7 +44,7 @@ func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector) - pods, err = podLister.ListPods(selector) + pods, err = podLister.List(selector) if err != nil { return nil, err } @@ -105,7 +104,7 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podList // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector) - pods, err = podLister.ListPods(selector) + pods, err = podLister.List(selector) if err != nil { return nil, err }