Enhancements to scheduler priority functions

- Modified the existing spreading priority to consider service pods explicitly
 - Added a new priority function to spread pods across zones
This commit is contained in:
Abhishek Gupta 2014-12-12 14:29:20 -08:00
parent 3ca6c231b2
commit 04db076e5f
5 changed files with 399 additions and 41 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package scheduler package scheduler
import ( import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
) )
@ -52,3 +54,38 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) {
} }
return selected, nil return selected, nil
} }
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.
type ServiceLister interface {
// Lists all the services
ListServices() (api.ServiceList, error)
// Gets the service for the given pod
GetPodService(api.Pod) (api.Service, error)
}
// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
type FakeServiceLister []api.Service
// FakeServiceLister returns api.ServiceList, the list of all services.
func (f FakeServiceLister) ListServices() (api.ServiceList, error) {
return api.ServiceList{Items: f}, nil
}
// GetPodService gets the service that has the selector that can match the labels on the given pod
// We are assuming a single service per pod.
// In case of multiple services per pod, the first service found is returned
func (f FakeServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) {
var selector labels.Selector
for _, service := range f {
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
return service, nil
}
}
return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

View File

@ -17,28 +17,44 @@ limitations under the License.
package scheduler package scheduler
import ( import (
"math/rand"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
) )
type ServiceSpread struct {
serviceLister ServiceLister
}
func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction {
serviceSpread := &ServiceSpread{
serviceLister: serviceLister,
}
return serviceSpread.CalculateSpreadPriority
}
// CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. // CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels.
// Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority // Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority
// may not provide optimal spreading for the members of that Service. // may not provide optimal spreading for the members of that Service.
// TODO: consider if we want to include Service label sets in the scheduling priority. // TODO: consider if we want to include Service label sets in the scheduling priority.
func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
pods, err := podLister.List(labels.SelectorFromSet(pod.Labels)) var maxCount int
if err != nil { var pods []api.Pod
return nil, err var err error
service, err := s.serviceLister.GetPodService(pod)
if err == nil {
selector := labels.SelectorFromSet(service.Spec.Selector)
pods, err = podLister.ListPods(selector)
if err != nil {
return nil, err
}
} }
minions, err := minionLister.List() minions, err := minionLister.List()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var maxCount int
var fScore float32 = 10.0
counts := map[string]int{} counts := map[string]int{}
if len(pods) > 0 { if len(pods) > 0 {
for _, pod := range pods { for _, pod := range pods {
@ -54,6 +70,8 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
//score int - scale of 0-10 //score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest // 0 being the lowest priority and 10 being the highest
for _, minion := range minions.Items { for _, minion := range minions.Items {
// initializing to the default/max minion score of 10
fScore := float32(10)
if maxCount > 0 { if maxCount > 0 {
fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount)) fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount))
} }
@ -62,6 +80,77 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
return result, nil return result, nil
} }
func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { type ZoneSpread struct {
return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random) serviceLister ServiceLister
zoneLabel string
}
func NewZoneSpreadPriority(serviceLister ServiceLister, zoneLabel string) PriorityFunction {
zoneSpread := &ZoneSpread{
serviceLister: serviceLister,
zoneLabel: zoneLabel,
}
return zoneSpread.ZoneSpreadPriority
}
func (z *ZoneSpread) ZoneSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var service api.Service
var pods []api.Pod
var err error
service, err = z.serviceLister.GetPodService(pod)
if err == nil {
selector := labels.SelectorFromSet(service.Spec.Selector)
pods, err = podLister.ListPods(selector)
if err != nil {
return nil, err
}
}
minions, err := minionLister.List()
if err != nil {
return nil, err
}
// find the zones that the minions belong to
openMinions := []string{}
zonedMinions := map[string]string{}
for _, minion := range minions.Items {
if labels.Set(minion.Labels).Has(z.zoneLabel) {
zone := labels.Set(minion.Labels).Get(z.zoneLabel)
zonedMinions[minion.Name] = zone
} else {
openMinions = append(openMinions, minion.Name)
}
}
podCounts := map[string]int{}
numServicePods := len(pods)
if numServicePods > 0 {
for _, pod := range pods {
zone, exists := zonedMinions[pod.Status.Host]
if !exists {
continue
}
podCounts[zone]++
}
}
result := []HostPriority{}
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for minion := range zonedMinions {
// initializing to the default/max minion score of 10
fScore := float32(10)
if numServicePods > 0 {
fScore = 10 * (float32(numServicePods-podCounts[zonedMinions[minion]]) / float32(numServicePods))
}
result = append(result, HostPriority{host: minion, score: int(fScore)})
}
// add the open minions with a score of 0
for _, minion := range openMinions {
result = append(result, HostPriority{host: minion, score: 0})
}
return result, nil
} }

View File

@ -18,12 +18,13 @@ package scheduler
import ( import (
"reflect" "reflect"
"sort"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
) )
func TestSpreadPriority(t *testing.T) { func TestServiceSpreadPriority(t *testing.T) {
labels1 := map[string]string{ labels1 := map[string]string{
"foo": "bar", "foo": "bar",
"baz": "blah", "baz": "blah",
@ -32,16 +33,17 @@ func TestSpreadPriority(t *testing.T) {
"bar": "foo", "bar": "foo",
"baz": "blah", "baz": "blah",
} }
machine1Status := api.PodStatus{ zone1Status := api.PodStatus{
Host: "machine1", Host: "machine1",
} }
machine2Status := api.PodStatus{ zone2Status := api.PodStatus{
Host: "machine2", Host: "machine2",
} }
tests := []struct { tests := []struct {
pod api.Pod pod api.Pod
pods []api.Pod pods []api.Pod
nodes []string nodes []string
services []api.Service
expectedList HostPriorityList expectedList HostPriorityList
test string test string
}{ }{
@ -52,55 +54,72 @@ func TestSpreadPriority(t *testing.T) {
}, },
{ {
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: machine1Status}}, pods: []api.Pod{{Status: zone1Status}},
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "no labels", test: "no services",
}, },
{ {
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "different labels", test: "different services",
}, },
{ {
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{ pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}},
test: "one label match", test: "two pods, one service pod",
}, },
{ {
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{ pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "two label matches on different machines", test: "three pods, two service pods on different machines",
}, },
{ {
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{ pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
}, },
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}}, expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}},
test: "three label matches", test: "four pods, three service pods",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "service with partial pod label matches",
}, },
} }
for _, test := range tests { for _, test := range tests {
list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) serviceSpread := ServiceSpread{serviceLister: FakeServiceLister(test.services)}
list, err := serviceSpread.CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes)))
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -109,3 +128,166 @@ func TestSpreadPriority(t *testing.T) {
} }
} }
} }
func TestZoneSpreadPriority(t *testing.T) {
labels1 := map[string]string{
"foo": "bar",
"baz": "blah",
}
labels2 := map[string]string{
"bar": "foo",
"baz": "blah",
}
zone1 := map[string]string{
"zone": "zone1",
}
zone2 := map[string]string{
"zone": "zone2",
}
nozone := map[string]string{
"name": "value",
}
zone0Status := api.PodStatus{
Host: "machine01",
}
zone1Status := api.PodStatus{
Host: "machine11",
}
zone2Status := api.PodStatus{
Host: "machine21",
}
labeledNodes := map[string]map[string]string{
"machine01": nozone, "machine02": nozone,
"machine11": zone1, "machine12": zone1,
"machine21": zone2, "machine22": zone2,
}
tests := []struct {
pod api.Pod
pods []api.Pod
nodes map[string]map[string]string
services []api.Service
expectedList HostPriorityList
test string
}{
{
nodes: labeledNodes,
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 10}, {"machine22", 10},
{"machine01", 0}, {"machine02", 0}},
test: "nothing scheduled",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: zone1Status}},
nodes: labeledNodes,
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 10}, {"machine22", 10},
{"machine01", 0}, {"machine02", 0}},
test: "no services",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 10}, {"machine22", 10},
{"machine01", 0}, {"machine02", 0}},
test: "different services",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 0}, {"machine22", 0},
{"machine01", 0}, {"machine02", 0}},
test: "three pods, one service pod",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 5}, {"machine12", 5},
{"machine21", 5}, {"machine22", 5},
{"machine01", 0}, {"machine02", 0}},
test: "three pods, two service pods on different machines",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 6}, {"machine12", 6},
{"machine21", 3}, {"machine22", 3},
{"machine01", 0}, {"machine02", 0}},
test: "four pods, three service pods",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []HostPriority{{"machine11", 3}, {"machine12", 3},
{"machine21", 6}, {"machine22", 6},
{"machine01", 0}, {"machine02", 0}},
test: "service with partial pod label matches",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 7}, {"machine12", 7},
{"machine21", 5}, {"machine22", 5},
{"machine01", 0}, {"machine02", 0}},
test: "service pod on non-zoned minion",
},
}
for _, test := range tests {
zoneSpread := ZoneSpread{serviceLister: FakeServiceLister(test.services), zoneLabel: "zone"}
list, err := zoneSpread.ZoneSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort the two lists to avoid failures on account of different ordering
sort.Sort(test.expectedList)
sort.Sort(list)
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
}
}
}
func makeLabeledMinionList(nodeMap map[string]map[string]string) (result api.NodeList) {
nodes := []api.Node{}
for nodeName, labels := range nodeMap {
nodes = append(nodes, api.Node{ObjectMeta: api.ObjectMeta{Name: nodeName, Labels: labels}})
}
return api.NodeList{Items: nodes}
}

View File

@ -31,11 +31,11 @@ func defaultPredicates() util.StringSet {
return util.NewStringSet( return util.NewStringSet(
// Fit is defined based on the absence of port conflicts. // Fit is defined based on the absence of port conflicts.
factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts), factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts),
// Fit is determined by resource availability // Fit is determined by resource availability.
factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)), factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)),
// Fit is determined by non-conflicting disk volumes // Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict), factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict),
// Fit is determined by node selector query // Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)), factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)),
// Fit is determined by the presence of the Host parameter and a string match // Fit is determined by the presence of the Host parameter and a string match
factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost), factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost),
@ -46,8 +46,11 @@ func defaultPriorities() util.StringSet {
return util.NewStringSet( return util.NewStringSet(
// Prioritize nodes by least requested utilization. // Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1),
// spreads pods by minimizing the number of pods on the same minion with the same labels. // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion.
factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1), factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1),
// spreads pods belonging to the same service across minions in different zones
// TODO: remove the hardcoding of the "zone" label and move it to a constant
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewZoneSpreadPriority(factory.ServiceLister, "zone"), 1),
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions // EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0),
) )

View File

@ -35,8 +35,9 @@ import (
) )
var ( var (
PodLister = &cache.StoreToPodLister{cache.NewStore()} PodLister = &cache.StoreToPodLister{cache.NewStore()}
MinionLister = &cache.StoreToNodeLister{cache.NewStore()} MinionLister = &cache.StoreToNodeLister{cache.NewStore()}
ServiceLister = &cache.StoreToServiceLister{cache.NewStore()}
) )
// ConfigFactory knows how to fill out a scheduler config with its support functions. // ConfigFactory knows how to fill out a scheduler config with its support functions.
@ -48,15 +49,18 @@ type ConfigFactory struct {
PodLister *cache.StoreToPodLister PodLister *cache.StoreToPodLister
// a means to list all minions // a means to list all minions
MinionLister *cache.StoreToNodeLister MinionLister *cache.StoreToNodeLister
// a means to list all services
ServiceLister *cache.StoreToServiceLister
} }
// NewConfigFactory initializes the factory. // NewConfigFactory initializes the factory.
func NewConfigFactory(client *client.Client) *ConfigFactory { func NewConfigFactory(client *client.Client) *ConfigFactory {
return &ConfigFactory{ return &ConfigFactory{
Client: client, Client: client,
PodQueue: cache.NewFIFO(), PodQueue: cache.NewFIFO(),
PodLister: PodLister, PodLister: PodLister,
MinionLister: MinionLister, MinionLister: MinionLister,
ServiceLister: ServiceLister,
} }
} }
@ -105,6 +109,11 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
} }
// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same service, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run()
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r)
@ -178,6 +187,15 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
return &nodeEnumerator{list}, nil return &nodeEnumerator{list}, nil
} }
// createServiceLW returns a listWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie(""),
resource: "services",
}
}
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
return func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) {
glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err) glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)
@ -208,6 +226,35 @@ type nodeEnumerator struct {
*api.NodeList *api.NodeList
} }
// storeToServiceLister turns a store into a service lister. The store must contain (only) services.
type storeToServiceLister struct {
cache.Store
}
func (s *storeToServiceLister) ListServices() (services api.ServiceList, err error) {
for _, m := range s.List() {
services.Items = append(services.Items, *(m.(*api.Service)))
}
return services, nil
}
func (s *storeToServiceLister) GetPodService(pod api.Pod) (service api.Service, err error) {
var selector labels.Selector
for _, m := range s.List() {
service = *m.(*api.Service)
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
return service, nil
}
}
return service, fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
// Len returns the number of items in the node list. // Len returns the number of items in the node list.
func (ne *nodeEnumerator) Len() int { func (ne *nodeEnumerator) Len() int {
if ne.NodeList == nil { if ne.NodeList == nil {