diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index f98dff6e4a2..a8eb8f57700 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -75,17 +75,41 @@ func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) { return nil, fmt.Errorf("minion '%v' is not in cache", id) } -// StoreToServiceLister makes a Store have the List method of the client.ServiceInterface +// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface // The Store must contain (only) Services. type StoreToServiceLister struct { Store } -func (s *StoreToServiceLister) List() (svcs api.ServiceList, err error) { +func (s *StoreToServiceLister) List() (services api.ServiceList, err error) { for _, m := range s.Store.List() { - svcs.Items = append(svcs.Items, *(m.(*api.Service))) + services.Items = append(services.Items, *(m.(*api.Service))) } - return svcs, nil + return services, nil +} + +// TODO: Move this back to scheduler as a helper function that takes a Store, +// rather than a method of StoreToServiceLister. +func (s *StoreToServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { + var selector labels.Selector + var service api.Service + + for _, m := range s.Store.List() { + service = *m.(*api.Service) + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + services = append(services, service) + } + } + if len(services) == 0 { + err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return } // TODO: add StoreToEndpointsLister for use in kube-proxy. diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index 6c893dcce93..10d887c9417 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -17,6 +17,8 @@ limitations under the License. package scheduler import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) @@ -52,3 +54,40 @@ func (f FakePodLister) List(s labels.Selector) (selected []api.Pod, err error) { } return selected, nil } + +// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler. +type ServiceLister interface { + // Lists all the services + List() (api.ServiceList, error) + // Gets the services for the given pod + GetPodServices(api.Pod) ([]api.Service, error) +} + +// FakeServiceLister implements ServiceLister on []api.Service for test purposes. +type FakeServiceLister []api.Service + +// FakeServiceLister returns api.ServiceList, the list of all services. +func (f FakeServiceLister) List() (api.ServiceList, error) { + return api.ServiceList{Items: f}, nil +} + +// GetPodServices gets the services that have the selector that match the labels on the given pod +func (f FakeServiceLister) GetPodServices(pod api.Pod) (services []api.Service, err error) { + var selector labels.Selector + + for _, service := range f { + // consider only services that are in the same namespace as the pod + if service.Namespace != pod.Namespace { + continue + } + selector = labels.Set(service.Spec.Selector).AsSelector() + if selector.Matches(labels.Set(pod.Labels)) { + services = append(services, service) + } + } + if len(services) == 0 { + err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return +} diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index fe27f8ceb73..65698c6ced6 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -167,6 +167,137 @@ func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error) return pod.Spec.Host == node, nil } +type NodeLabelChecker struct { + info NodeInfo + labels []string + presence bool +} + +func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPredicate { + labelChecker := &NodeLabelChecker{ + info: info, + labels: labels, + presence: presence, + } + return labelChecker.CheckNodeLabelPresence +} + +// CheckNodeLabelPresence checks whether all of the specified labels exists on a minion or not, regardless of their value +// If "presence" is false, then returns false if any of the requested labels matches any of the minion's labels, +// otherwise returns true. +// If "presence" is true, then returns false if any of the requested labels does not match any of the minion's labels, +// otherwise returns true. +// +// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels +// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected +// +// Alternately, eliminating minions that have a certain label, regardless of value, is also useful +// A minion may have a label with "retiring" as key and the date as the value +// and it may be desirable to avoid scheduling new pods on this minion +func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + var exists bool + minion, err := n.info.GetNodeInfo(node) + if err != nil { + return false, err + } + minionLabels := labels.Set(minion.Labels) + for _, label := range n.labels { + exists = minionLabels.Has(label) + if (exists && !n.presence) || (!exists && n.presence) { + return false, nil + } + } + return true, nil +} + +type ServiceAffinity struct { + podLister PodLister + serviceLister ServiceLister + nodeInfo NodeInfo + labels []string +} + +func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceLister, nodeInfo NodeInfo, labels []string) FitPredicate { + affinity := &ServiceAffinity{ + podLister: podLister, + serviceLister: serviceLister, + nodeInfo: nodeInfo, + labels: labels, + } + return affinity.CheckServiceAffinity +} + +// CheckServiceAffinity ensures that only the minions that match the specified labels are considered for scheduling. +// The set of labels to be considered are provided to the struct (ServiceAffinity). +// The pod is checked for the labels and any missing labels are then checked in the minion +// that hosts the service pods (peers) for the given pod. +// +// We add an implicit selector requiring some particular value V for label L to a pod, if: +// - L is listed in the ServiceAffinity object that is passed into the function +// - the pod does not have any NodeSelector for L +// - some other pod from the same service is already scheduled onto a minion that has value V for label L +func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + var affinitySelector labels.Selector + + // check if the pod being scheduled has the affinity labels specified in its NodeSelector + affinityLabels := map[string]string{} + nodeSelector := labels.Set(pod.Spec.NodeSelector) + labelsExist := true + for _, l := range s.labels { + if nodeSelector.Has(l) { + affinityLabels[l] = nodeSelector.Get(l) + } else { + // the current pod does not specify all the labels, look in the existing service pods + labelsExist = false + } + } + + // skip looking at other pods in the service if the current pod defines all the required affinity labels + if !labelsExist { + services, err := s.serviceLister.GetPodServices(pod) + if err == nil { + // just use the first service and get the other pods within the service + // TODO: a separate predicate can be created that tries to handle all services for the pod + selector := labels.SelectorFromSet(services[0].Spec.Selector) + servicePods, err := s.podLister.List(selector) + if err != nil { + return false, err + } + if len(servicePods) > 0 { + // consider any service pod and fetch the minion its hosted on + otherMinion, err := s.nodeInfo.GetNodeInfo(servicePods[0].Status.Host) + if err != nil { + return false, err + } + for _, l := range s.labels { + // If the pod being scheduled has the label value specified, do not override it + if _, exists := affinityLabels[l]; exists { + continue + } + if labels.Set(otherMinion.Labels).Has(l) { + affinityLabels[l] = labels.Set(otherMinion.Labels).Get(l) + } + } + } + } + } + + // if there are no existing pods in the service, consider all minions + if len(affinityLabels) == 0 { + affinitySelector = labels.Everything() + } else { + affinitySelector = labels.Set(affinityLabels).AsSelector() + } + + minion, err := s.nodeInfo.GetNodeInfo(node) + if err != nil { + return false, err + } + + // check if the minion matches the selector + return affinitySelector.Matches(labels.Set(minion.Labels)), nil +} + func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 5fe556207f1..5f91ba248f8 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "fmt" "reflect" "testing" @@ -31,6 +32,17 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return &node, nil } +type FakeNodeListInfo []api.Node + +func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { + for _, node := range nodes { + if node.Name == nodeName { + return &node, nil + } + } + return nil, fmt.Errorf("Unable to find node: %s", nodeName) +} + func makeResources(milliCPU int64, memory int64) api.NodeResources { return api.NodeResources{ Capacity: api.ResourceList{ @@ -386,3 +398,175 @@ func TestPodFitsSelector(t *testing.T) { } } } + +func TestNodeLabelPresence(t *testing.T) { + label := map[string]string{"foo": "bar", "bar": "foo"} + tests := []struct { + pod api.Pod + existingPods []api.Pod + labels []string + presence bool + fits bool + test string + }{ + { + labels: []string{"baz"}, + presence: true, + fits: false, + test: "label does not match, presence true", + }, + { + labels: []string{"baz"}, + presence: false, + fits: true, + test: "label does not match, presence false", + }, + { + labels: []string{"foo", "baz"}, + presence: true, + fits: false, + test: "one label matches, presence true", + }, + { + labels: []string{"foo", "baz"}, + presence: false, + fits: false, + test: "one label matches, presence false", + }, + { + labels: []string{"foo", "bar"}, + presence: true, + fits: true, + test: "all labels match, presence true", + }, + { + labels: []string{"foo", "bar"}, + presence: false, + fits: false, + test: "all labels match, presence false", + }, + } + for _, test := range tests { + node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}} + labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence} + fits, err := labelChecker.CheckNodeLabelPresence(test.pod, test.existingPods, "machine") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } +} + +func TestServiceAffinity(t *testing.T) { + selector := map[string]string{"foo": "bar"} + labels1 := map[string]string{ + "region": "r1", + "zone": "z11", + } + labels2 := map[string]string{ + "region": "r1", + "zone": "z12", + } + labels3 := map[string]string{ + "region": "r2", + "zone": "z21", + } + labels4 := map[string]string{ + "region": "r2", + "zone": "z22", + } + node1 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labels1}} + node2 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: labels2}} + node3 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: labels3}} + node4 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine4", Labels: labels4}} + node5 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine5", Labels: labels4}} + tests := []struct { + pod api.Pod + pods []api.Pod + services []api.Service + node string + labels []string + fits bool + test string + }{ + { + node: "machine1", + fits: true, + labels: []string{"region"}, + test: "nothing scheduled", + }, + { + pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r1"}}}, + node: "machine1", + fits: true, + labels: []string{"region"}, + test: "pod with region label match", + }, + { + pod: api.Pod{Spec: api.PodSpec{NodeSelector: map[string]string{"region": "r2"}}}, + node: "machine1", + fits: false, + labels: []string{"region"}, + test: "pod with region label mismatch", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region"}, + test: "service pod on same minion", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region"}, + test: "service pod on different minion, region match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: false, + labels: []string{"region"}, + test: "service pod on different minion, region mismatch", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: false, + labels: []string{"region", "zone"}, + test: "service pod on different minion, multiple labels, not all match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine4", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region", "zone"}, + test: "service pod on different minion, multiple labels, all match", + }, + } + + for _, test := range tests { + nodes := []api.Node{node1, node2, node3, node4, node5} + serviceAffinity := ServiceAffinity{FakePodLister(test.pods), FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} + fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []api.Pod{}, test.node) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } +} diff --git a/pkg/scheduler/priorities.go b/pkg/scheduler/priorities.go index dc3250cb8a2..6685358c949 100644 --- a/pkg/scheduler/priorities.go +++ b/pkg/scheduler/priorities.go @@ -18,6 +18,7 @@ package scheduler import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/golang/glog" ) @@ -88,3 +89,46 @@ func LeastRequestedPriority(pod api.Pod, podLister PodLister, minionLister Minio } return list, nil } + +type NodeLabelPrioritizer struct { + label string + presence bool +} + +func NewNodeLabelPriority(label string, presence bool) PriorityFunction { + labelPrioritizer := &NodeLabelPrioritizer{ + label: label, + presence: presence, + } + return labelPrioritizer.CalculateNodeLabelPriority +} + +// CalculateNodeLabelPriority checks whether a particular label exists on a minion or not, regardless of its value. +// If presence is true, prioritizes minions that have the specified label, regardless of value. +// If presence is false, prioritizes minions that do not have the specified label. +func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var score int + minions, err := minionLister.List() + if err != nil { + return nil, err + } + + labeledMinions := map[string]bool{} + for _, minion := range minions.Items { + exists := labels.Set(minion.Labels).Has(n.label) + labeledMinions[minion.Name] = (exists && n.presence) || (!exists && !n.presence) + } + + result := []HostPriority{} + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest + for minionName, success := range labeledMinions { + if success { + score = 10 + } else { + score = 0 + } + result = append(result, HostPriority{host: minionName, score: score}) + } + return result, nil +} diff --git a/pkg/scheduler/priorities_test.go b/pkg/scheduler/priorities_test.go index 23f932e0a0d..8aa878586b4 100644 --- a/pkg/scheduler/priorities_test.go +++ b/pkg/scheduler/priorities_test.go @@ -18,6 +18,7 @@ package scheduler import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -238,3 +239,102 @@ func TestLeastRequested(t *testing.T) { } } } + +func TestNewNodeLabelPriority(t *testing.T) { + label1 := map[string]string{"foo": "bar"} + label2 := map[string]string{"bar": "foo"} + label3 := map[string]string{"bar": "baz"} + tests := []struct { + pod api.Pod + pods []api.Pod + nodes []api.Node + label string + presence bool + expectedList HostPriorityList + test string + }{ + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}, {"machine3", 0}}, + label: "baz", + presence: true, + test: "no match found, presence true", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}, {"machine3", 10}}, + label: "baz", + presence: false, + test: "no match found, presence false", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, + label: "foo", + presence: true, + test: "one match found, presence true", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, + label: "foo", + presence: false, + test: "one match found, presence false", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, + label: "bar", + presence: true, + test: "two matches found, presence true", + }, + { + nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: label1}}, + {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, + {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, + }, + expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, + label: "bar", + presence: false, + test: "two matches found, presence false", + }, + } + + for _, test := range tests { + prioritizer := NodeLabelPrioritizer{ + label: test.label, + presence: test.presence, + } + list, err := prioritizer.CalculateNodeLabelPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(api.NodeList{Items: test.nodes})) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort the two lists to avoid failures on account of different ordering + sort.Sort(test.expectedList) + sort.Sort(list) + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } +} diff --git a/pkg/scheduler/spreading.go b/pkg/scheduler/spreading.go index 0c831484104..f72f19c93ff 100644 --- a/pkg/scheduler/spreading.go +++ b/pkg/scheduler/spreading.go @@ -17,28 +17,44 @@ limitations under the License. package scheduler import ( - "math/rand" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" ) -// CalculateSpreadPriority spreads pods by minimizing the number of pods on the same machine with the same labels. -// Importantly, if there are services in the system that span multiple heterogenous sets of pods, this spreading priority -// may not provide optimal spreading for the members of that Service. -// TODO: consider if we want to include Service label sets in the scheduling priority. -func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { - pods, err := podLister.List(labels.SelectorFromSet(pod.Labels)) - if err != nil { - return nil, err +type ServiceSpread struct { + serviceLister ServiceLister +} + +func NewServiceSpreadPriority(serviceLister ServiceLister) PriorityFunction { + serviceSpread := &ServiceSpread{ + serviceLister: serviceLister, } + return serviceSpread.CalculateSpreadPriority +} + +// CalculateSpreadPriority spreads pods by minimizing the number of pods belonging to the same service +// on the same machine. +func (s *ServiceSpread) CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var maxCount int + var pods []api.Pod + var err error + + services, err := s.serviceLister.GetPodServices(pod) + if err == nil { + // just use the first service and get the other pods within the service + // TODO: a separate predicate can be created that tries to handle all services for the pod + selector := labels.SelectorFromSet(services[0].Spec.Selector) + pods, err = podLister.List(selector) + if err != nil { + return nil, err + } + } + minions, err := minionLister.List() if err != nil { return nil, err } - var maxCount int - var fScore float32 = 10.0 counts := map[string]int{} if len(pods) > 0 { for _, pod := range pods { @@ -54,6 +70,8 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest for _, minion := range minions.Items { + // initializing to the default/max minion score of 10 + fScore := float32(10) if maxCount > 0 { fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount)) } @@ -62,6 +80,78 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini return result, nil } -func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler { - return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random) +type ServiceAntiAffinity struct { + serviceLister ServiceLister + label string +} + +func NewServiceAntiAffinityPriority(serviceLister ServiceLister, label string) PriorityFunction { + antiAffinity := &ServiceAntiAffinity{ + serviceLister: serviceLister, + label: label, + } + return antiAffinity.CalculateAntiAffinityPriority +} + +// CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service +// on machines with the same value for a particular label. +// The label to be considered is provided to the struct (ServiceAntiAffinity). +func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) { + var pods []api.Pod + + services, err := s.serviceLister.GetPodServices(pod) + if err == nil { + // just use the first service and get the other pods within the service + // TODO: a separate predicate can be created that tries to handle all services for the pod + selector := labels.SelectorFromSet(services[0].Spec.Selector) + pods, err = podLister.List(selector) + if err != nil { + return nil, err + } + } + + minions, err := minionLister.List() + if err != nil { + return nil, err + } + + // separate out the minions that have the label from the ones that don't + otherMinions := []string{} + labeledMinions := map[string]string{} + for _, minion := range minions.Items { + if labels.Set(minion.Labels).Has(s.label) { + label := labels.Set(minion.Labels).Get(s.label) + labeledMinions[minion.Name] = label + } else { + otherMinions = append(otherMinions, minion.Name) + } + } + + podCounts := map[string]int{} + for _, pod := range pods { + label, exists := labeledMinions[pod.Status.Host] + if !exists { + continue + } + podCounts[label]++ + } + + numServicePods := len(pods) + result := []HostPriority{} + //score int - scale of 0-10 + // 0 being the lowest priority and 10 being the highest + for minion := range labeledMinions { + // initializing to the default/max minion score of 10 + fScore := float32(10) + if numServicePods > 0 { + fScore = 10 * (float32(numServicePods-podCounts[labeledMinions[minion]]) / float32(numServicePods)) + } + result = append(result, HostPriority{host: minion, score: int(fScore)}) + } + // add the open minions with a score of 0 + for _, minion := range otherMinions { + result = append(result, HostPriority{host: minion, score: 0}) + } + + return result, nil } diff --git a/pkg/scheduler/spreading_test.go b/pkg/scheduler/spreading_test.go index 0e9c9093464..fe891cf2504 100644 --- a/pkg/scheduler/spreading_test.go +++ b/pkg/scheduler/spreading_test.go @@ -18,12 +18,13 @@ package scheduler import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -func TestSpreadPriority(t *testing.T) { +func TestServiceSpreadPriority(t *testing.T) { labels1 := map[string]string{ "foo": "bar", "baz": "blah", @@ -32,16 +33,17 @@ func TestSpreadPriority(t *testing.T) { "bar": "foo", "baz": "blah", } - machine1Status := api.PodStatus{ + zone1Status := api.PodStatus{ Host: "machine1", } - machine2Status := api.PodStatus{ + zone2Status := api.PodStatus{ Host: "machine2", } tests := []struct { pod api.Pod pods []api.Pod nodes []string + services []api.Service expectedList HostPriorityList test string }{ @@ -52,55 +54,72 @@ func TestSpreadPriority(t *testing.T) { }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, - pods: []api.Pod{{Status: machine1Status}}, + pods: []api.Pod{{Status: zone1Status}}, nodes: []string{"machine1", "machine2"}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "no labels", + test: "no services", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, - pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, + pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}}, - test: "different labels", + test: "different services", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}}, - test: "one label match", + test: "two pods, one service pod", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}}, - test: "two label matches on different machines", + test: "three pods, two service pods on different machines", }, { pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []api.Pod{ - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, - {Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, - {Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, }, nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}}, - test: "three label matches", + test: "four pods, three service pods", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "service with partial pod label matches", }, } for _, test := range tests { - list, err := CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) + serviceSpread := ServiceSpread{serviceLister: FakeServiceLister(test.services)} + list, err := serviceSpread.CalculateSpreadPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeMinionList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -109,3 +128,166 @@ func TestSpreadPriority(t *testing.T) { } } } + +func TestZoneSpreadPriority(t *testing.T) { + labels1 := map[string]string{ + "foo": "bar", + "baz": "blah", + } + labels2 := map[string]string{ + "bar": "foo", + "baz": "blah", + } + zone1 := map[string]string{ + "zone": "zone1", + } + zone2 := map[string]string{ + "zone": "zone2", + } + nozone := map[string]string{ + "name": "value", + } + zone0Status := api.PodStatus{ + Host: "machine01", + } + zone1Status := api.PodStatus{ + Host: "machine11", + } + zone2Status := api.PodStatus{ + Host: "machine21", + } + labeledNodes := map[string]map[string]string{ + "machine01": nozone, "machine02": nozone, + "machine11": zone1, "machine12": zone1, + "machine21": zone2, "machine22": zone2, + } + tests := []struct { + pod api.Pod + pods []api.Pod + nodes map[string]map[string]string + services []api.Service + expectedList HostPriorityList + test string + }{ + { + nodes: labeledNodes, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "nothing scheduled", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{{Status: zone1Status}}, + nodes: labeledNodes, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "no services", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{{Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 10}, {"machine22", 10}, + {"machine01", 0}, {"machine02", 0}}, + test: "different services", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 10}, {"machine12", 10}, + {"machine21", 0}, {"machine22", 0}, + {"machine01", 0}, {"machine02", 0}}, + test: "three pods, one service pod", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 5}, {"machine12", 5}, + {"machine21", 5}, {"machine22", 5}, + {"machine01", 0}, {"machine02", 0}}, + test: "three pods, two service pods on different machines", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 6}, {"machine12", 6}, + {"machine21", 3}, {"machine22", 3}, + {"machine01", 0}, {"machine02", 0}}, + test: "four pods, three service pods", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + expectedList: []HostPriority{{"machine11", 3}, {"machine12", 3}, + {"machine21", 6}, {"machine22", 6}, + {"machine01", 0}, {"machine02", 0}}, + test: "service with partial pod label matches", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []api.Pod{ + {Status: zone0Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone1Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Status: zone2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: labeledNodes, + services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, + expectedList: []HostPriority{{"machine11", 7}, {"machine12", 7}, + {"machine21", 5}, {"machine22", 5}, + {"machine01", 0}, {"machine02", 0}}, + test: "service pod on non-zoned minion", + }, + } + + for _, test := range tests { + zoneSpread := ServiceAntiAffinity{serviceLister: FakeServiceLister(test.services), label: "zone"} + list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, FakePodLister(test.pods), FakeMinionLister(makeLabeledMinionList(test.nodes))) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort the two lists to avoid failures on account of different ordering + sort.Sort(test.expectedList) + sort.Sort(list) + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } +} + +func makeLabeledMinionList(nodeMap map[string]map[string]string) (result api.NodeList) { + nodes := []api.Node{} + for nodeName, labels := range nodeMap { + nodes = append(nodes, api.Node{ObjectMeta: api.ObjectMeta{Name: nodeName, Labels: labels}}) + } + return api.NodeList{Items: nodes} +} diff --git a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go new file mode 100644 index 00000000000..ee023355d47 --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go @@ -0,0 +1,55 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This algorithm provider has predicates and priorities related to affinity/anti-affinity for the scheduler. +package affinity + +import ( + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" +) + +const AffinityProvider string = "AffinityProvider" + +func init() { + factory.RegisterAlgorithmProvider(AffinityProvider, affinityPredicates(), affinityPriorities()) +} + +func affinityPredicates() util.StringSet { + return util.NewStringSet( + "HostName", + "MatchNodeSelector", + "PodFitsPorts", + "PodFitsResources", + "NoDiskConflict", + // Ensures that all pods within the same service are hosted on minions within the same region as defined by the "region" label + factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})), + // Fit is defined based on the presence of the "region" label on a minion, regardless of value. + factory.RegisterFitPredicate("NodeLabelPredicate", algorithm.NewNodeLabelPredicate(factory.MinionLister, []string{"region"}, true)), + ) +} + +func affinityPriorities() util.StringSet { + return util.NewStringSet( + "LeastRequestedPriority", + "ServiceSpreadingPriority", + // spreads pods belonging to the same service across minions in different zones + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 2), + // Prioritize nodes based on the presence of the "zone" label on a minion, regardless of value. + factory.RegisterPriorityFunction("NodeLabelPriority", algorithm.NewNodeLabelPriority("zone", true), 1), + ) +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index e93af9a4e32..c97f933884b 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -31,11 +31,11 @@ func defaultPredicates() util.StringSet { return util.NewStringSet( // Fit is defined based on the absence of port conflicts. factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts), - // Fit is determined by resource availability + // Fit is determined by resource availability. factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)), - // Fit is determined by non-conflicting disk volumes + // Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict), - // Fit is determined by node selector query + // Fit is determined by node selector query. factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)), // Fit is determined by the presence of the Host parameter and a string match factory.RegisterFitPredicate("HostName", algorithm.PodFitsHost), @@ -46,8 +46,8 @@ func defaultPriorities() util.StringSet { return util.NewStringSet( // Prioritize nodes by least requested utilization. factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), - // spreads pods by minimizing the number of pods on the same minion with the same labels. - factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1), + // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. + factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) diff --git a/plugin/pkg/scheduler/algorithmprovider/plugins.go b/plugin/pkg/scheduler/algorithmprovider/plugins.go index d534b05c0bf..ac7123efe26 100644 --- a/plugin/pkg/scheduler/algorithmprovider/plugins.go +++ b/plugin/pkg/scheduler/algorithmprovider/plugins.go @@ -18,5 +18,6 @@ limitations under the License. package algorithmprovider import ( + _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity" _ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults" ) diff --git a/plugin/pkg/scheduler/algorithmprovider/plugins_test.go b/plugin/pkg/scheduler/algorithmprovider/plugins_test.go index 965635d8e7f..8a205961a72 100644 --- a/plugin/pkg/scheduler/algorithmprovider/plugins_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/plugins_test.go @@ -19,12 +19,14 @@ package algorithmprovider import ( "testing" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/affinity" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" ) var ( algorithmProviderNames = []string{ factory.DefaultProvider, + affinity.AffinityProvider, } ) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 14fcb5c2201..d8410c4a2f4 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -35,8 +35,9 @@ import ( ) var ( - PodLister = &cache.StoreToPodLister{cache.NewStore()} - MinionLister = &cache.StoreToNodeLister{cache.NewStore()} + PodLister = &cache.StoreToPodLister{cache.NewStore()} + MinionLister = &cache.StoreToNodeLister{cache.NewStore()} + ServiceLister = &cache.StoreToServiceLister{cache.NewStore()} ) // ConfigFactory knows how to fill out a scheduler config with its support functions. @@ -48,15 +49,18 @@ type ConfigFactory struct { PodLister *cache.StoreToPodLister // a means to list all minions MinionLister *cache.StoreToNodeLister + // a means to list all services + ServiceLister *cache.StoreToServiceLister } // NewConfigFactory initializes the factory. func NewConfigFactory(client *client.Client) *ConfigFactory { return &ConfigFactory{ - Client: client, - PodQueue: cache.NewFIFO(), - PodLister: PodLister, - MinionLister: MinionLister, + Client: client, + PodQueue: cache.NewFIFO(), + PodLister: PodLister, + MinionLister: MinionLister, + ServiceLister: ServiceLister, } } @@ -106,6 +110,11 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run() } + // Watch and cache all service objects. Scheduler needs to find all pods + // created by the same service, so that it can spread them correctly. + // Cache this locally. + cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store).Run() + r := rand.New(rand.NewSource(time.Now().UnixNano())) algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) @@ -205,6 +214,15 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &nodeEnumerator{nodes}, nil } +// createServiceLW returns a cache.ListWatch that gets all changes to services. +func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { + return &cache.ListWatch{ + Client: factory.Client, + FieldSelector: parseSelectorOrDie(""), + Resource: "services", + } +} + func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)