From 94eb52de33d513958bcdb70cc7595d71ecfa1fe3 Mon Sep 17 00:00:00 2001 From: gmarek Date: Wed, 1 Jul 2015 16:26:54 +0200 Subject: [PATCH] Add spreading by controllers --- plugin/pkg/scheduler/algorithm/listers.go | 38 ++++++++++- .../algorithm/priorities/priorities_test.go | 2 +- ...ice_spreading.go => selector_spreading.go} | 66 ++++++++++++------- ...ing_test.go => selector_spreading_test.go} | 61 ++++++++++++++++- .../algorithmprovider/defaults/defaults.go | 4 +- plugin/pkg/scheduler/factory/factory.go | 26 ++++++-- plugin/pkg/scheduler/factory/plugins.go | 1 + 7 files changed, 163 insertions(+), 35 deletions(-) rename plugin/pkg/scheduler/algorithm/priorities/{service_spreading.go => selector_spreading.go} (69%) rename plugin/pkg/scheduler/algorithm/priorities/{service_spreading_test.go => selector_spreading_test.go} (79%) diff --git a/plugin/pkg/scheduler/algorithm/listers.go b/plugin/pkg/scheduler/algorithm/listers.go index 50ecc996b08..66edac545b6 100644 --- a/plugin/pkg/scheduler/algorithm/listers.go +++ b/plugin/pkg/scheduler/algorithm/listers.go @@ -66,7 +66,7 @@ type ServiceLister interface { // FakeServiceLister implements ServiceLister on []api.Service for test purposes. type FakeServiceLister []api.Service -// FakeServiceLister returns api.ServiceList, the list of all services. +// List returns api.ServiceList, the list of all services. func (f FakeServiceLister) List() (api.ServiceList, error) { return api.ServiceList{Items: f}, nil } @@ -91,3 +91,39 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, return } + +// ControllerLister interface represents anything that can produce a list of ReplicationController; the list is consumed by a scheduler. +type ControllerLister interface { + // Lists all the replication controllers + List() ([]api.ReplicationController, error) + // Gets the services for the given pod + GetPodControllers(*api.Pod) ([]api.ReplicationController, error) +} + +// FakeControllerLister implements ControllerLister on []api.ReplicationController for test purposes. +type FakeControllerLister []api.ReplicationController + +// List returns []api.ReplicationController, the list of all ReplicationControllers. +func (f FakeControllerLister) List() ([]api.ReplicationController, error) { + return f, nil +} + +// GetPodControllers gets the ReplicationControllers that have the selector that match the labels on the given pod +func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { + var selector labels.Selector + + for _, controller := range f { + if controller.Namespace != pod.Namespace { + continue + } + selector = labels.Set(controller.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + controllers = append(controllers, controller) + } + } + if len(controllers) == 0 { + err = fmt.Errorf("Could not find Replication Controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 08fcfaf12c2..253095a4b09 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -132,7 +132,7 @@ func TestZeroLimit(t *testing.T) { // This should match the configuration in defaultPriorities() in // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // to test what's actually in production. - []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewServiceSpreadPriority(algorithm.FakeServiceLister([]api.Service{})), Weight: 1}}, + []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}}, algorithm.FakeMinionLister(api.NodeList{Items: test.nodes})) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/service_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go similarity index 69% rename from plugin/pkg/scheduler/algorithm/priorities/service_spreading.go rename to plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 663f638ea19..0a37739f4c0 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/service_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -23,36 +23,49 @@ import ( "github.com/golang/glog" ) -type ServiceSpread struct { - serviceLister algorithm.ServiceLister +type SelectorSpread struct { + serviceLister algorithm.ServiceLister + controllerLister algorithm.ControllerLister } -func NewServiceSpreadPriority(serviceLister algorithm.ServiceLister) algorithm.PriorityFunction { - serviceSpread := &ServiceSpread{ - serviceLister: serviceLister, +func NewSelectorSpreadPriority(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction { + selectorSpread := &SelectorSpread{ + serviceLister: serviceLister, + controllerLister: controllerLister, } - return serviceSpread.CalculateSpreadPriority + return selectorSpread.CalculateSpreadPriority } -// CalculateSpreadPriority spreads pods by minimizing the number of pods belonging to the same service -// on the same machine. -func (s *ServiceSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorithm.PodLister, minionLister algorithm.MinionLister) (algorithm.HostPriorityList, error) { +// CalculateSpreadPriority spreads pods by minimizing the number of pods belonging to the same service or replication controller. It counts number of pods that run under +// Services or RCs as the pod being scheduled and tries to minimize the number of conflicts. I.e. pushes scheduler towards a Node where there's a smallest number of +// pods which match the same selectors of Services and RCs as current pod. +func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorithm.PodLister, minionLister algorithm.MinionLister) (algorithm.HostPriorityList, error) { var maxCount int - var nsServicePods []*api.Pod + var nsPods []*api.Pod + selectors := make([]labels.Selector, 0) services, err := s.serviceLister.GetPodServices(pod) if err == nil { - // just use the first service and get the other pods within the service - // TODO: a separate predicate can be created that tries to handle all services for the pod - selector := labels.SelectorFromSet(services[0].Spec.Selector) - pods, err := podLister.List(selector) + for _, service := range services { + selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) + } + } + controllers, err := s.controllerLister.GetPodControllers(pod) + if err == nil { + for _, controller := range controllers { + selectors = append(selectors, labels.SelectorFromSet(controller.Spec.Selector)) + } + } + + if len(selectors) > 0 { + pods, err := podLister.List(labels.Everything()) if err != nil { return nil, err } // consider only the pods that belong to the same namespace for _, nsPod := range pods { if nsPod.Namespace == pod.Namespace { - nsServicePods = append(nsServicePods, nsPod) + nsPods = append(nsPods, nsPod) } } } @@ -63,12 +76,21 @@ func (s *ServiceSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorith } counts := map[string]int{} - if len(nsServicePods) > 0 { - for _, pod := range nsServicePods { - counts[pod.Spec.NodeName]++ - // Compute the maximum number of pods hosted on any minion - if counts[pod.Spec.NodeName] > maxCount { - maxCount = counts[pod.Spec.NodeName] + if len(nsPods) > 0 { + for _, pod := range nsPods { + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(pod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + counts[pod.Spec.NodeName]++ + // Compute the maximum number of pods hosted on any minion + if counts[pod.Spec.NodeName] > maxCount { + maxCount = counts[pod.Spec.NodeName] + } } } } @@ -84,7 +106,7 @@ func (s *ServiceSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorith } result = append(result, algorithm.HostPriority{Host: minion.Name, Score: int(fScore)}) glog.V(10).Infof( - "%v -> %v: ServiceSpreadPriority, Score: (%d)", pod.Name, minion.Name, int(fScore), + "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, minion.Name, int(fScore), ) } return result, nil diff --git a/plugin/pkg/scheduler/algorithm/priorities/service_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go similarity index 79% rename from plugin/pkg/scheduler/algorithm/priorities/service_spreading_test.go rename to plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 1e5a2660b17..369c1fbc6ef 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/service_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -25,7 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithm" ) -func TestServiceSpreadPriority(t *testing.T) { +func TestSelectorSpreadPriority(t *testing.T) { labels1 := map[string]string{ "foo": "bar", "baz": "blah", @@ -44,6 +44,7 @@ func TestServiceSpreadPriority(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []string + rcs []api.ReplicationController services []api.Service expectedList algorithm.HostPriorityList test string @@ -158,11 +159,65 @@ func TestServiceSpreadPriority(t *testing.T) { expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "service with partial pod label matches", }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + // "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to + // do spreading between all pods. The result should be exactly as above. + expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "service with partial pod label matches with service and replication controller", + }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, + rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + // Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above. + expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "disjoined service and replication controller should be treated equally", + }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + // Both Nodes have one pod from the given RC, hence both get 0 score. + expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}}, + test: "Replication controller with partial pod label matches", + }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "Replication controller with partial pod label matches", + }, } for _, test := range tests { - serviceSpread := ServiceSpread{serviceLister: algorithm.FakeServiceLister(test.services)} - list, err := serviceSpread.CalculateSpreadPriority(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeMinionLister(makeNodeList(test.nodes))) + selectorSpread := SelectorSpread{serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} + list, err := selectorSpread.CalculateSpreadPriority(test.pod, algorithm.FakePodLister(test.pods), algorithm.FakeMinionLister(makeNodeList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 1546412b78a..8d9cc9be9ce 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -67,10 +67,10 @@ func defaultPriorities() util.StringSet { factory.RegisterPriorityFunction("BalancedResourceAllocation", priorities.BalancedResourceAllocation, 1), // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. factory.RegisterPriorityConfigFactory( - "ServiceSpreadingPriority", + "SelectorSpreadPriority", factory.PriorityConfigFactory{ Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { - return priorities.NewServiceSpreadPriority(args.ServiceLister) + return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister) }, Weight: 1, }, diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index b5faa39c4d5..a1485d1c7e8 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -58,6 +58,8 @@ type ConfigFactory struct { NodeLister *cache.StoreToNodeLister // a means to list all services ServiceLister *cache.StoreToServiceLister + // a means to list all controllers + ControllerLister *cache.StoreToReplicationControllerLister // Close this to stop all reflectors StopEverything chan struct{} @@ -75,9 +77,10 @@ func NewConfigFactory(client *client.Client) *ConfigFactory { PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), ScheduledPodLister: &cache.StoreToPodLister{}, // Only nodes in the "Ready" condition with status == "True" are schedulable - NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, - ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, - StopEverything: make(chan struct{}), + NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ControllerLister: &cache.StoreToReplicationControllerLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + StopEverything: make(chan struct{}), } modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) c.modeler = modeler @@ -160,8 +163,9 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) pluginArgs := PluginFactoryArgs{ - PodLister: f.PodLister, - ServiceLister: f.ServiceLister, + PodLister: f.PodLister, + ServiceLister: f.ServiceLister, + ControllerLister: f.ControllerLister, // All fit predicates only need to consider schedulable nodes. NodeLister: f.NodeLister.NodeCondition(api.NodeReady, api.ConditionTrue), NodeInfo: f.NodeLister, @@ -187,10 +191,15 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe cache.NewReflector(f.createMinionLW(), &api.Node{}, f.NodeLister.Store, 0).RunUntil(f.StopEverything) // Watch and cache all service objects. Scheduler needs to find all pods - // created by the same service, so that it can spread them correctly. + // created by the same services or ReplicationControllers, so that it can spread them correctly. // Cache this locally. cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything) + // Watch and cache all ReplicationController objects. Scheduler needs to find all pods + // created by the same services or ReplicationControllers, so that it can spread them correctly. + // Cache this locally. + cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything) + r := rand.New(rand.NewSource(time.Now().UnixNano())) algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) @@ -254,6 +263,11 @@ func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { return cache.NewListWatchFromClient(factory.Client, "services", api.NamespaceAll, parseSelectorOrDie("")) } +// Returns a cache.ListWatch that gets all changes to controllers. +func (factory *ConfigFactory) createControllerLW() *cache.ListWatch { + return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, parseSelectorOrDie("")) +} + func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { if err == scheduler.ErrNoNodesAvailable { diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 78d206db4e0..16d34d8027d 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -35,6 +35,7 @@ import ( type PluginFactoryArgs struct { algorithm.PodLister algorithm.ServiceLister + algorithm.ControllerLister NodeLister algorithm.MinionLister NodeInfo predicates.NodeInfo }