Merge pull request #2906 from abhgupta/abhgupta-dev

Enhancements to scheduler priority functions
This commit is contained in:
davidopp 2015-01-14 21:47:28 -08:00
commit 2675cfa16b
13 changed files with 919 additions and 49 deletions

View File

@ -75,17 +75,41 @@ func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) {
return nil, fmt.Errorf("minion '%v' is not in cache", id)
}
// StoreToServiceLister makes a Store have the List method of the client.ServiceInterface
// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
// The Store must contain (only) Services.
type StoreToServiceLister struct {
Store
}
func (s *StoreToServiceLister) List() (svcs api.ServiceList, err error) {
func (s *StoreToServiceLister) List() (services api.ServiceList, err error) {
for _, m := range s.Store.List() {
svcs.Items = append(svcs.Items, *(m.(*api.Service)))
services.Items = append(services.Items, *(m.(*api.Service)))
}
return svcs, nil
return services, nil
}
// TODO: Move this back to scheduler as a helper function that takes a Store,
// rather than a method of StoreToServiceLister.
func (s *StoreToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) {
var selector labels.Selector
var service api.Service
for _, m := range s.Store.List() {
service = *m.(*api.Service)
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service)
}
}
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// TODO: add StoreToEndpointsLister for use in kube-proxy.

View File

@ -17,6 +17,8 @@ limitations under the License.
package scheduler
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
@ -52,3 +54,40 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) {
}
return selected, nil
}
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.
type ServiceLister interface {
// Lists all the services
List() (api.ServiceList, error)
// Gets the services for the given pod
GetPodServices(api.Pod) ([]api.Service, error)
}
// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
type FakeServiceLister []api.Service
// FakeServiceLister returns api.ServiceList, the list of all services.
func (f FakeServiceLister) List() (api.ServiceList, error) {
return api.ServiceList{Items: f}, nil
}
// GetPodServices gets the services that have the selector that match the labels on the given pod
func (f FakeServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) {
var selector labels.Selector
for _, service := range f {
// consider only services that are in the same namespace as the pod
if service.Namespace != pod.Namespace {
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service)
}
}
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -167,6 +167,137 @@ func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error)
return pod.Spec.Host == node, nil
}
type NodeLabelChecker struct {
info NodeInfo
labels []string
presence bool
}
func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPredicate {
labelChecker := &NodeLabelChecker{
info: info,
labels: labels,
presence: presence,
}
return labelChecker.CheckNodeLabelPresence
}
// CheckNodeLabelPresence checks whether all of the specified labels exists on a minion or not, regardless of their value
// If "presence" is false, then returns false if any of the requested labels matches any of the minion's labels,
// otherwise returns true.
// If "presence" is true, then returns false if any of the requested labels does not match any of the minion's labels,
// otherwise returns true.
//
// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels
// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected
//
// Alternately, eliminating minions that have a certain label, regardless of value, is also useful
// A minion may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this minion
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
var exists bool
minion, err := n.info.GetNodeInfo(node)
if err != nil {
return false, err
}
minionLabels := labels.Set(minion.Labels)
for _, label := range n.labels {
exists = minionLabels.Has(label)
if (exists && !n.presence) || (!exists && n.presence) {
return false, nil
}
}
return true, nil
}
type ServiceAffinity struct {
podLister PodLister
serviceLister ServiceLister
nodeInfo NodeInfo
labels []string
}
func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceLister, nodeInfo NodeInfo, labels []string) FitPredicate {
affinity := &ServiceAffinity{
podLister: podLister,
serviceLister: serviceLister,
nodeInfo: nodeInfo,
labels: labels,
}
return affinity.CheckServiceAffinity
}
// CheckServiceAffinity ensures that only the minions that match the specified labels are considered for scheduling.
// The set of labels to be considered are provided to the struct (ServiceAffinity).
// The pod is checked for the labels and any missing labels are then checked in the minion
// that hosts the service pods (peers) for the given pod.
//
// We add an implicit selector requiring some particular value V for label L to a pod, if:
// - L is listed in the ServiceAffinity object that is passed into the function
// - the pod does not have any NodeSelector for L
// - some other pod from the same service is already scheduled onto a minion that has value V for label L
func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
var affinitySelector labels.Selector
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
affinityLabels := map[string]string{}
nodeSelector := labels.Set(pod.Spec.NodeSelector)
labelsExist := true
for _, l := range s.labels {
if nodeSelector.Has(l) {
affinityLabels[l] = nodeSelector.Get(l)
} else {
// the current pod does not specify all the labels, look in the existing service pods
labelsExist = false
}
}
// skip looking at other pods in the service if the current pod defines all the required affinity labels
if !labelsExist {
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
servicePods, err := s.podLister.List(selector)
if err != nil {
return false, err
}
if len(servicePods) > 0 {
// consider any service pod and fetch the minion its hosted on
otherMinion, err := s.nodeInfo.GetNodeInfo(servicePods[0].Status.Host)
if err != nil {
return false, err
}
for _, l := range s.labels {
// If the pod being scheduled has the label value specified, do not override it
if _, exists := affinityLabels[l]; exists {
continue
}
if labels.Set(otherMinion.Labels).Has(l) {
affinityLabels[l] = labels.Set(otherMinion.Labels).Get(l)
}
}
}
}
}
// if there are no existing pods in the service, consider all minions
if len(affinityLabels) == 0 {
affinitySelector = labels.Everything()
} else {
affinitySelector = labels.Set(affinityLabels).AsSelector()
}
minion, err := s.nodeInfo.GetNodeInfo(node)
if err != nil {
return false, err
}
// check if the minion matches the selector
return affinitySelector.Matches(labels.Set(minion.Labels)), nil
}
func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
existingPorts := getUsedPorts(existingPods...)
wantPorts := getUsedPorts(pod)

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"fmt"
"reflect"
"testing"
@ -31,6 +32,17 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) {
return &node, nil
}
type FakeNodeListInfo []api.Node
func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) {
for _, node := range nodes {
if node.Name == nodeName {
return &node, nil
}
}
return nil, fmt.Errorf("Unable to find node: %s", nodeName)
}
func makeResources(milliCPU int64, memory int64) api.NodeResources {
return api.NodeResources{
Capacity: api.ResourceList{
@ -386,3 +398,175 @@ func TestPodFitsSelector(t *testing.T) {
}
}
}
func TestNodeLabelPresence(t *testing.T) {
label := map[string]string{"foo": "bar", "bar": "foo"}
tests := []struct {
pod api.Pod
existingPods []api.Pod
labels []string
presence bool
fits bool
test string
}{
{
labels: []string{"baz"},
presence: true,
fits: false,
test: "label does not match, presence true",
},
{
labels: []string{"baz"},
presence: false,
fits: true,
test: "label does not match, presence false",
},
{
labels: []string{"foo", "baz"},
presence: true,
fits: false,
test: "one label matches, presence true",
},
{
labels: []string{"foo", "baz"},
presence: false,
fits: false,
test: "one label matches, presence false",
},
{
labels: []string{"foo", "bar"},
presence: true,
fits: true,
test: "all labels match, presence true",
},
{
labels: []string{"foo", "bar"},
presence: false,
fits: false,
test: "all labels match, presence false",
},
}
for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}}
labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, test.existingPods, "machine")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
}
}
}
func TestServiceAffinity(t *testing.T) {
selector := map[string]string{"foo": "bar"}
labels1 := map[string]string{
"region": "r1",
"zone": "z11",
}
labels2 := map[string]string{
"region": "r1",
"zone": "z12",
}
labels3 := map[string]string{
"region": "r2",
"zone": "z21",
}
labels4 := map[string]string{
"region": "r2",
"zone": "z22",
}
node1 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labels1}}
node2 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: labels2}}
node3 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: labels3}}
node4 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine4", Labels: labels4}}
node5 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine5", Labels: labels4}}
tests := []struct {
pod api.Pod
pods []api.Pod
services []api.Service
node string
labels []string
fits bool
test string
}{
{
node: "machine1",
fits: true,
labels: []string{"region"},
test: "nothing scheduled",
},
{
pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r1"}}},
node: "machine1",
fits: true,
labels: []string{"region"},
test: "pod with region label match",
},
{
pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r2"}}},
node: "machine1",
fits: false,
labels: []string{"region"},
test: "pod with region label mismatch",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region"},
test: "service pod on same minion",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region"},
test: "service pod on different minion, region match",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: false,
labels: []string{"region"},
test: "service pod on different minion, region mismatch",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: false,
labels: []string{"region", "zone"},
test: "service pod on different minion, multiple labels, not all match",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine4",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region", "zone"},
test: "service pod on different minion, multiple labels, all match",
},
}
for _, test := range tests {
nodes := []api.Node{node1, node2, node3, node4, node5}
serviceAffinity := ServiceAffinity{FakePodLister(test.pods), FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []api.Pod{}, test.node)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
}
}
}

View File

@ -18,6 +18,7 @@ package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
)
@ -88,3 +89,46 @@ func LeastRequestedPriority(pod api.Pod, podLister PodLister, minionLister Minio
}
return list, nil
}
type NodeLabelPrioritizer struct {
label string
presence bool
}
func NewNodeLabelPriority(label string, presence bool) PriorityFunction {
labelPrioritizer := &NodeLabelPrioritizer{
label: label,
presence: presence,
}
return labelPrioritizer.CalculateNodeLabelPriority
}
// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value.
// If presence is true, prioritizes minions that have the specified label, regardless of value.
// If presence is false, prioritizes minions that do not have the specified label.
func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var score int
minions, err := minionLister.List()
if err != nil {
return nil, err
}
labeledMinions := map[string]bool{}
for _, minion := range minions.Items {
exists := labels.Set(minion.Labels).Has(n.label)
labeledMinions[minion.Name] = (exists && n.presence) || (!exists && !n.presence)
}
result := []HostPriority{}
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for minionName, success := range labeledMinions {
if success {
score = 10
} else {
score = 0
}
result = append(result, HostPriority{host: minionName, score: score})
}
return result, nil
}

View File

@ -18,6 +18,7 @@ package scheduler
import (
"reflect"
"sort"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -238,3 +239,102 @@ func TestLeastRequested(t *testing.T) {
}
}
}
func TestNewNodeLabelPriority(t *testing.T) {
label1 := map[string]string{"foo": "bar"}
label2 := map[string]string{"bar": "foo"}
label3 := map[string]string{"bar": "baz"}
tests := []struct {
pod api.Pod
pods []api.Pod
nodes []api.Node
label string
presence bool
expectedList HostPriorityList
test string
}{
{
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}, {"machine3", 0}},
label: "baz",
presence: true,
test: "no match found, presence true",
},
{
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}, {"machine3", 10}},
label: "baz",
presence: false,
test: "no match found, presence false",
},
{
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}},
label: "foo",
presence: true,
test: "one match found, presence true",
},
{
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}},
label: "foo",
presence: false,
test: "one match found, presence false",
},
{
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}},
label: "bar",
presence: true,
test: "two matches found, presence true",
},
{
nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}},
{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}},
{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}},
},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}},
label: "bar",
presence: false,
test: "two matches found, presence false",
},
}
for _, test := range tests {
prioritizer := NodeLabelPrioritizer{
label: test.label,
presence: test.presence,
}
list, err := prioritizer.CalculateNodeLabelPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort the two lists to avoid failures on account of different ordering
sort.Sort(test.expectedList)
sort.Sort(list)
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
}
}
}

View File

@ -17,28 +17,44 @@ limitations under the License.
package scheduler
import (
"math/rand"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
// CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels.
// Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority
// may not provide optimal spreading for the members of that Service.
// TODO: consider if we want to include Service label sets in the scheduling priority.
func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
pods, err := podLister.List(labels.SelectorFromSet(pod.Labels))
if err != nil {
return nil, err
type ServiceSpread struct {
serviceLister ServiceLister
}
func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction {
serviceSpread := &ServiceSpread{
serviceLister: serviceLister,
}
return serviceSpread.CalculateSpreadPriority
}
// CalculateSpreadPriority spreads pods by minimizing the number of pods belonging to the same service
// on the same machine.
func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var maxCount int
var pods []api.Pod
var err error
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
pods, err = podLister.List(selector)
if err != nil {
return nil, err
}
}
minions, err := minionLister.List()
if err != nil {
return nil, err
}
var maxCount int
var fScore float32 = 10.0
counts := map[string]int{}
if len(pods) > 0 {
for _, pod := range pods {
@ -54,6 +70,8 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for _, minion := range minions.Items {
// initializing to the default/max minion score of 10
fScore := float32(10)
if maxCount > 0 {
fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount))
}
@ -62,6 +80,78 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
return result, nil
}
func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler {
return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random)
type ServiceAntiAffinity struct {
serviceLister ServiceLister
label string
}
func NewServiceAntiAffinityPriority(serviceLister ServiceLister, label string) PriorityFunction {
antiAffinity := &ServiceAntiAffinity{
serviceLister: serviceLister,
label: label,
}
return antiAffinity.CalculateAntiAffinityPriority
}
// CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service
// on machines with the same value for a particular label.
// The label to be considered is provided to the struct (ServiceAntiAffinity).
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var pods []api.Pod
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
pods, err = podLister.List(selector)
if err != nil {
return nil, err
}
}
minions, err := minionLister.List()
if err != nil {
return nil, err
}
// separate out the minions that have the label from the ones that don't
otherMinions := []string{}
labeledMinions := map[string]string{}
for _, minion := range minions.Items {
if labels.Set(minion.Labels).Has(s.label) {
label := labels.Set(minion.Labels).Get(s.label)
labeledMinions[minion.Name] = label
} else {
otherMinions = append(otherMinions, minion.Name)
}
}
podCounts := map[string]int{}
for _, pod := range pods {
label, exists := labeledMinions[pod.Status.Host]
if !exists {
continue
}
podCounts[label]++
}
numServicePods := len(pods)
result := []HostPriority{}
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for minion := range labeledMinions {
// initializing to the default/max minion score of 10
fScore := float32(10)
if numServicePods > 0 {
fScore = 10 * (float32(numServicePods-podCounts[labeledMinions[minion]]) / float32(numServicePods))
}
result = append(result, HostPriority{host: minion, score: int(fScore)})
}
// add the open minions with a score of 0
for _, minion := range otherMinions {
result = append(result, HostPriority{host: minion, score: 0})
}
return result, nil
}

View File

@ -18,12 +18,13 @@ package scheduler
import (
"reflect"
"sort"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestSpreadPriority(t *testing.T) {
func TestServiceSpreadPriority(t *testing.T) {
labels1 := map[string]string{
"foo": "bar",
"baz": "blah",
@ -32,16 +33,17 @@ func TestSpreadPriority(t *testing.T) {
"bar": "foo",
"baz": "blah",
}
machine1Status := api.PodStatus{
zone1Status := api.PodStatus{
Host: "machine1",
}
machine2Status := api.PodStatus{
zone2Status := api.PodStatus{
Host: "machine2",
}
tests := []struct {
pod api.Pod
pods []api.Pod
nodes []string
services []api.Service
expectedList HostPriorityList
test string
}{
@ -52,55 +54,72 @@ func TestSpreadPriority(t *testing.T) {
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: machine1Status}},
pods: []api.Pod{{Status: zone1Status}},
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "no labels",
test: "no services",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "different labels",
test: "different services",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}},
test: "one label match",
test: "two pods, one service pod",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "two label matches on different machines",
test: "three pods, two service pods on different machines",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}},
test: "three label matches",
test: "four pods, three service pods",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 5}},
test: "service with partial pod label matches",
},
}
for _, test := range tests {
list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes)))
serviceSpread := ServiceSpread{serviceLister: FakeServiceLister(test.services)}
list, err := serviceSpread.CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -109,3 +128,166 @@ func TestSpreadPriority(t *testing.T) {
}
}
}
func TestZoneSpreadPriority(t *testing.T) {
labels1 := map[string]string{
"foo": "bar",
"baz": "blah",
}
labels2 := map[string]string{
"bar": "foo",
"baz": "blah",
}
zone1 := map[string]string{
"zone": "zone1",
}
zone2 := map[string]string{
"zone": "zone2",
}
nozone := map[string]string{
"name": "value",
}
zone0Status := api.PodStatus{
Host: "machine01",
}
zone1Status := api.PodStatus{
Host: "machine11",
}
zone2Status := api.PodStatus{
Host: "machine21",
}
labeledNodes := map[string]map[string]string{
"machine01": nozone, "machine02": nozone,
"machine11": zone1, "machine12": zone1,
"machine21": zone2, "machine22": zone2,
}
tests := []struct {
pod api.Pod
pods []api.Pod
nodes map[string]map[string]string
services []api.Service
expectedList HostPriorityList
test string
}{
{
nodes: labeledNodes,
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 10}, {"machine22", 10},
{"machine01", 0}, {"machine02", 0}},
test: "nothing scheduled",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: zone1Status}},
nodes: labeledNodes,
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 10}, {"machine22", 10},
{"machine01", 0}, {"machine02", 0}},
test: "no services",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 10}, {"machine22", 10},
{"machine01", 0}, {"machine02", 0}},
test: "different services",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10},
{"machine21", 0}, {"machine22", 0},
{"machine01", 0}, {"machine02", 0}},
test: "three pods, one service pod",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 5}, {"machine12", 5},
{"machine21", 5}, {"machine22", 5},
{"machine01", 0}, {"machine02", 0}},
test: "three pods, two service pods on different machines",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 6}, {"machine12", 6},
{"machine21", 3}, {"machine22", 3},
{"machine01", 0}, {"machine02", 0}},
test: "four pods, three service pods",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []HostPriority{{"machine11", 3}, {"machine12", 3},
{"machine21", 6}, {"machine22", 6},
{"machine01", 0}, {"machine02", 0}},
test: "service with partial pod label matches",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{
{Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
{Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: labeledNodes,
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
expectedList: []HostPriority{{"machine11", 7}, {"machine12", 7},
{"machine21", 5}, {"machine22", 5},
{"machine01", 0}, {"machine02", 0}},
test: "service pod on non-zoned minion",
},
}
for _, test := range tests {
zoneSpread := ServiceAntiAffinity{serviceLister: FakeServiceLister(test.services), label: "zone"}
list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes)))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort the two lists to avoid failures on account of different ordering
sort.Sort(test.expectedList)
sort.Sort(list)
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
}
}
}
func makeLabeledMinionList(nodeMap map[string]map[string]string) (result api.NodeList) {
nodes := []api.Node{}
for nodeName, labels := range nodeMap {
nodes = append(nodes, api.Node{ObjectMeta: api.ObjectMeta{Name: nodeName, Labels: labels}})
}
return api.NodeList{Items: nodes}
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This algorithm provider has predicates and priorities related to affinity/anti-affinity for the scheduler.
package affinity
import (
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
const AffinityProvider string = "AffinityProvider"
func init() {
factory.RegisterAlgorithmProvider(AffinityProvider, affinityPredicates(), affinityPriorities())
}
func affinityPredicates() util.StringSet {
return util.NewStringSet(
"HostName",
"MatchNodeSelector",
"PodFitsPorts",
"PodFitsResources",
"NoDiskConflict",
// Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label
factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})),
// Fit is defined based on the presence of the "region" label on a minion, regardless of value.
factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)),
)
}
func affinityPriorities() util.StringSet {
return util.NewStringSet(
"LeastRequestedPriority",
"ServiceSpreadingPriority",
// spreads pods belonging to the same service across minions in different zones
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2),
// Prioritize nodes based on the presence of the "zone" label on a minion, regardless of value.
factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1),
)
}

View File

@ -31,11 +31,11 @@ func defaultPredicates() util.StringSet {
return util.NewStringSet(
// Fit is defined based on the absence of port conflicts.
factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts),
// Fit is determined by resource availability
// Fit is determined by resource availability.
factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)),
// Fit is determined by non-conflicting disk volumes
// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict),
// Fit is determined by node selector query
// Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)),
// Fit is determined by the presence of the Host parameter and a string match
factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost),
@ -46,8 +46,8 @@ func defaultPriorities() util.StringSet {
return util.NewStringSet(
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1),
// spreads pods by minimizing the number of pods on the same minion with the same labels.
factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1),
// spreads pods by minimizing the number of pods (belonging to the same service) on the same minion.
factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1),
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0),
)

View File

@ -18,5 +18,6 @@ limitations under the License.
package algorithmprovider
import (
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
)

View File

@ -19,12 +19,14 @@ package algorithmprovider
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
var (
algorithmProviderNames = []string{
factory.DefaultProvider,
affinity.AffinityProvider,
}
)

View File

@ -35,8 +35,9 @@ import (
)
var (
PodLister = &cache.StoreToPodLister{cache.NewStore()}
MinionLister = &cache.StoreToNodeLister{cache.NewStore()}
PodLister = &cache.StoreToPodLister{cache.NewStore()}
MinionLister = &cache.StoreToNodeLister{cache.NewStore()}
ServiceLister = &cache.StoreToServiceLister{cache.NewStore()}
)
// ConfigFactory knows how to fill out a scheduler config with its support functions.
@ -48,15 +49,18 @@ type ConfigFactory struct {
PodLister *cache.StoreToPodLister
// a means to list all minions
MinionLister *cache.StoreToNodeLister
// a means to list all services
ServiceLister *cache.StoreToServiceLister
}
// NewConfigFactory initializes the factory.
func NewConfigFactory(client *client.Client) *ConfigFactory {
return &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(),
PodLister: PodLister,
MinionLister: MinionLister,
Client: client,
PodQueue: cache.NewFIFO(),
PodLister: PodLister,
MinionLister: MinionLister,
ServiceLister: ServiceLister,
}
}
@ -106,6 +110,11 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
}
// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same service, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r)
@ -205,6 +214,15 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
return &nodeEnumerator{nodes}, nil
}
// createServiceLW returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "services",
}
}
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
return func(pod *api.Pod, err error) {
glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)