From 3f722a3d8ec5855fa8e7b7f674793f991950cbcb Mon Sep 17 00:00:00 2001 From: Abhishek Gupta Date: Mon, 22 Dec 2014 15:55:31 -0800 Subject: [PATCH] Adding service affinity predicate --- pkg/scheduler/predicates.go | 78 ++++++++++- pkg/scheduler/predicates_test.go | 128 ++++++++++++++++++ .../algorithmprovider/affinity/affinity.go | 46 +++++++ .../algorithmprovider/defaults/defaults.go | 3 - 4 files changed, 251 insertions(+), 4 deletions(-) create mode 100644 plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index 5d721b58943..e31a7f44205 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -183,7 +183,7 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPre } // CheckNodeLabelPresence checks whether a particular label exists on a minion or not, regardless of its value -// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels +// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels // In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected // // Alternately, eliminating minions that have a certain label, regardless of value, is also useful @@ -204,6 +204,82 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []ap return true, nil } +type ServiceAffinity struct { + podLister PodLister + serviceLister ServiceLister + nodeInfo NodeInfo + labels []string +} + +func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceLister, nodeInfo NodeInfo, labels []string) FitPredicate { + affinity := &ServiceAffinity{ + podLister: podLister, + serviceLister: serviceLister, + nodeInfo: nodeInfo, + labels: labels, + } + return affinity.CheckServiceAffinity +} + +func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { + var affinitySelector labels.Selector + + // check if the pod being scheduled has the affinity labels specified + affinityLabels := map[string]string{} + labelsExist := true + for _, l := range s.labels { + if labels.Set(pod.Labels).Has(l) { + affinityLabels[l] = labels.Set(pod.Labels).Get(l) + } else { + // the current pod does not specify all the labels, look in the existing service pods + labelsExist = false + } + } + + // skip looking at other pods in the service if the current pod defines all the required affinity labels + if !labelsExist { + service, err := s.serviceLister.GetPodService(pod) + if err == nil { + selector := labels.SelectorFromSet(service.Spec.Selector) + servicePods, err := s.podLister.ListPods(selector) + if err != nil { + return false, err + } + if len(servicePods) > 0 { + // consider any service pod and fetch the minion its hosted on + otherMinion, err := s.nodeInfo.GetNodeInfo(servicePods[0].Status.Host) + if err != nil { + return false, err + } + for _, l := range s.labels { + // If the pod being scheduled has the label value specified, do not override it + if _, exists := affinityLabels[l]; exists { + continue + } + if labels.Set(otherMinion.Labels).Has(l) { + affinityLabels[l] = labels.Set(otherMinion.Labels).Get(l) + } + } + } + } + } + + // if there are no existing pods in the service, consider all minions + if len(affinityLabels) == 0 { + affinitySelector = labels.Everything() + } else { + affinitySelector = labels.Set(affinityLabels).AsSelector() + } + + minion, err := s.nodeInfo.GetNodeInfo(node) + if err != nil { + return false, err + } + + // check if the minion matches the selector + return affinitySelector.Matches(labels.Set(minion.Labels)), nil +} + func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { existingPorts := getUsedPorts(existingPods...) wantPorts := getUsedPorts(pod) diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 9903a94ba2e..521c82f99aa 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "fmt" "reflect" "testing" @@ -31,7 +32,22 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) { return &node, nil } +<<<<<<< HEAD func makeResources(milliCPU int64, memory int64) api.NodeResources { +======= +type FakeNodeListInfo []api.Node + +func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) { + for _, node := range nodes { + if node.Name == nodeName { + return &node, nil + } + } + return nil, fmt.Errorf("Unable to find node: %s", nodeName) +} + +func makeResources(milliCPU int, memory int) api.NodeResources { +>>>>>>> e0101c2... Adding service affinity predicate return api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), @@ -446,3 +462,115 @@ func TestNodeLabelPresence(t *testing.T) { } } } + +func TestServiceAffinity(t *testing.T) { + selector := map[string]string{"foo": "bar"} + labels1 := map[string]string{ + "region": "r1", + "zone": "z11", + } + labels2 := map[string]string{ + "region": "r1", + "zone": "z12", + } + labels3 := map[string]string{ + "region": "r2", + "zone": "z21", + } + labels4 := map[string]string{ + "region": "r2", + "zone": "z22", + } + node1 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labels1}} + node2 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: labels2}} + node3 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: labels3}} + node4 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine4", Labels: labels4}} + node5 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine5", Labels: labels4}} + tests := []struct { + pod api.Pod + pods []api.Pod + services []api.Service + node string + labels []string + fits bool + test string + }{ + { + node: "machine1", + fits: true, + labels: []string{"region"}, + test: "nothing scheduled", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r1"}}}, + node: "machine1", + fits: true, + labels: []string{"region"}, + test: "pod with region label match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r2"}}}, + node: "machine1", + fits: false, + labels: []string{"region"}, + test: "pod with region label mismatch", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region"}, + test: "service pod on same minion", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region"}, + test: "service pod on different minion, region match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: false, + labels: []string{"region"}, + test: "service pod on different minion, region mismatch", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine1", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: false, + labels: []string{"region", "zone"}, + test: "service pod on different minion, multiple labels, not all match", + }, + { + pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}}, + pods: []api.Pod{{Status: api.PodStatus{Host: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}}, + node: "machine4", + services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}}, + fits: true, + labels: []string{"region", "zone"}, + test: "service pod on different minion, multiple labels, all match", + }, + } + + for _, test := range tests { + nodes := []api.Node{node1, node2, node3, node4, node5} + serviceAffinity := ServiceAffinity{FakePodLister(test.pods), FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} + fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []api.Pod{}, test.node) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } +} diff --git a/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go new file mode 100644 index 00000000000..719e4795175 --- /dev/null +++ b/plugin/pkg/scheduler/algorithmprovider/affinity/affinity.go @@ -0,0 +1,46 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This algorithm provider has predicates and priorities related to affinity/anti-affinity for the scheduler. +package affinity + +import ( + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" +) + +const Provider string = "AffinityProvider" + +func init() { + factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities()) +} + +func defaultPredicates() util.StringSet { + return util.NewStringSet( + // Fit is defined based on whether the minion has the specified label values as the pod being scheduled + // Alternately, if the pod does not specify any/all labels, the other pods in the service are looked at + factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})), + ) +} + +func defaultPriorities() util.StringSet { + return util.NewStringSet( + // spreads pods belonging to the same service across minions in different zones + // region and zone can be nested infrastructure topology levels and defined by labels on minions + factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1), + ) +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 4ebe7f7ccc5..c97f933884b 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -48,9 +48,6 @@ func defaultPriorities() util.StringSet { factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1), // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1), - // spreads pods belonging to the same service across minions in different zones - // TODO: remove the hardcoding of the "zone" label and move it to a constant - factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1), // EqualPriority is a prioritizer function that gives an equal weight of one to all minions factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), )