From 94d75929b68a6c65ab4ab7b28df3e356e06a2a79 Mon Sep 17 00:00:00 2001
From: Gavin
Date: Wed, 3 Jan 2018 15:22:32 +0800
Subject: [PATCH] refactor function CalculateAntiAffinityPriority by using
map/reduce pattern
---
.../algorithm/priorities/metadata.go | 37 ++++--
.../priorities/selector_spreading.go | 105 +++++++++++-------
.../priorities/selector_spreading_test.go | 24 +++-
plugin/pkg/scheduler/factory/plugins.go | 2 +-
4 files changed, 111 insertions(+), 57 deletions(-)
diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata.go b/plugin/pkg/scheduler/algorithm/priorities/metadata.go
index fb561241798..3a4d7831182 100644
--- a/plugin/pkg/scheduler/algorithm/priorities/metadata.go
+++ b/plugin/pkg/scheduler/algorithm/priorities/metadata.go
@@ -44,11 +44,12 @@ func NewPriorityMetadataFactory(serviceLister algorithm.ServiceLister, controlle
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
- nonZeroRequest *schedulercache.Resource
- podTolerations []v1.Toleration
- affinity *v1.Affinity
- podSelectors []labels.Selector
- controllerRef *metav1.OwnerReference
+ nonZeroRequest *schedulercache.Resource
+ podTolerations []v1.Toleration
+ affinity *v1.Affinity
+ podSelectors []labels.Selector
+ controllerRef *metav1.OwnerReference
+ podFirstServiceSelector labels.Selector
}
// PriorityMetadata is a MetadataProducer. Node info can be nil.
@@ -57,30 +58,40 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo
if pod == nil {
return nil
}
- tolerationsPreferNoSchedule := getAllTolerationPreferNoSchedule(pod.Spec.Tolerations)
- podSelectors := getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister)
return &priorityMetadata{
- nonZeroRequest: getNonZeroRequests(pod),
- podTolerations: tolerationsPreferNoSchedule,
- affinity: pod.Spec.Affinity,
- podSelectors: podSelectors,
- controllerRef: priorityutil.GetControllerRef(pod),
+ nonZeroRequest: getNonZeroRequests(pod),
+ podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
+ affinity: pod.Spec.Affinity,
+ podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
+ controllerRef: priorityutil.GetControllerRef(pod),
+ podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
}
}
+// getFirstServiceSelector returns one selector of services the given pod.
+func getFirstServiceSelector(pod *v1.Pod, sl algorithm.ServiceLister) (firstServiceSelector labels.Selector) {
+ if services, err := sl.GetPodServices(pod); err == nil && len(services) > 0 {
+ return labels.SelectorFromSet(services[0].Spec.Selector)
+ }
+ return nil
+}
+
// getSelectors returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
var selectors []labels.Selector
+
if services, err := sl.GetPodServices(pod); err == nil {
for _, service := range services {
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
}
}
+
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
}
}
+
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
@@ -88,6 +99,7 @@ func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.Controll
}
}
}
+
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
@@ -95,5 +107,6 @@ func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.Controll
}
}
}
+
return selectors
}
diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
index 7258d52ea7b..53c5c3719ed 100644
--- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
+++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
@@ -177,13 +177,13 @@ type ServiceAntiAffinity struct {
label string
}
-func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) algorithm.PriorityFunction {
+func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
antiAffinity := &ServiceAntiAffinity{
podLister: podLister,
serviceLister: serviceLister,
label: label,
}
- return antiAffinity.CalculateAntiAffinityPriority
+ return antiAffinity.CalculateAntiAffinityPriorityMap, antiAffinity.CalculateAntiAffinityPriorityReduce
}
// Classifies nodes into ones with labels and without labels.
@@ -201,52 +201,79 @@ func (s *ServiceAntiAffinity) getNodeClassificationByLabels(nodes []*v1.Node) (m
return labeledNodes, nonLabeledNodes
}
-// CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service
-// on machines with the same value for a particular label.
-// The label to be considered is provided to the struct (ServiceAntiAffinity).
-func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
- var nsServicePods []*v1.Pod
- if services, err := s.serviceLister.GetPodServices(pod); err == nil && len(services) > 0 {
- // just use the first service and get the other pods within the service
- // TODO: a separate predicate can be created that tries to handle all services for the pod
- selector := labels.SelectorFromSet(services[0].Spec.Selector)
- pods, err := s.podLister.List(selector)
- if err != nil {
- return nil, err
- }
- // consider only the pods that belong to the same namespace
- for _, nsPod := range pods {
- if nsPod.Namespace == pod.Namespace {
- nsServicePods = append(nsServicePods, nsPod)
- }
+// filteredPod get pods based on namespace and selector
+func filteredPod(namespace string, selector labels.Selector, nodeInfo *schedulercache.NodeInfo) (pods []*v1.Pod) {
+ if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || selector == nil {
+ return []*v1.Pod{}
+ }
+ for _, pod := range nodeInfo.Pods() {
+ if namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
+ pods = append(pods, pod)
}
}
+ return
+}
- // separate out the nodes that have the label from the ones that don't
- labeledNodes, nonLabeledNodes := s.getNodeClassificationByLabels(nodes)
+// CalculateAntiAffinityPriorityMap spreads pods by minimizing the number of pods belonging to the same service
+// on given machine
+func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
+ var firstServiceSelector labels.Selector
+
+ node := nodeInfo.Node()
+ if node == nil {
+ return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
+ }
+ priorityMeta, ok := meta.(*priorityMetadata)
+ if ok {
+ firstServiceSelector = priorityMeta.podFirstServiceSelector
+ } else {
+ firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
+ }
+ //pods matched namespace,selector on current node
+ matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)
+
+ return schedulerapi.HostPriority{
+ Host: node.Name,
+ Score: int(len(matchedPodsOfNode)),
+ }, nil
+}
+
+// CalculateAntiAffinityPriorityReduce computes each node score with the same value for a particular label.
+// The label to be considered is provided to the struct (ServiceAntiAffinity).
+func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
+ var numServicePods int
+ var label string
podCounts := map[string]int{}
- for _, pod := range nsServicePods {
- label, exists := labeledNodes[pod.Spec.NodeName]
- if !exists {
+ labelNodesStatus := map[string]string{}
+ maxPriorityFloat64 := float64(schedulerapi.MaxPriority)
+
+ for _, hostPriority := range result {
+ numServicePods += hostPriority.Score
+ if !labels.Set(nodeNameToInfo[hostPriority.Host].Node().Labels).Has(s.label) {
continue
}
- podCounts[label]++
+ label = labels.Set(nodeNameToInfo[hostPriority.Host].Node().Labels).Get(s.label)
+ labelNodesStatus[hostPriority.Host] = label
+ podCounts[label] += hostPriority.Score
}
- numServicePods := len(nsServicePods)
- result := []schedulerapi.HostPriority{}
+
//score int - scale of 0-maxPriority
// 0 being the lowest priority and maxPriority being the highest
- for node := range labeledNodes {
- // initializing to the default/max node score of maxPriority
- fScore := float64(schedulerapi.MaxPriority)
- if numServicePods > 0 {
- fScore = float64(schedulerapi.MaxPriority) * (float64(numServicePods-podCounts[labeledNodes[node]]) / float64(numServicePods))
+ for i, hostPriority := range result {
+ label, ok := labelNodesStatus[hostPriority.Host]
+ if !ok {
+ result[i].Host = hostPriority.Host
+ result[i].Score = int(0)
+ continue
}
- result = append(result, schedulerapi.HostPriority{Host: node, Score: int(fScore)})
+ // initializing to the default/max node score of maxPriority
+ fScore := maxPriorityFloat64
+ if numServicePods > 0 {
+ fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[label]) / float64(numServicePods))
+ }
+ result[i].Host = hostPriority.Host
+ result[i].Score = int(fScore)
}
- // add the open nodes with a score of 0
- for _, node := range nonLabeledNodes {
- result = append(result, schedulerapi.HostPriority{Host: node, Score: 0})
- }
- return result, nil
+
+ return nil
}
diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go
index d3cb19cb635..85e547dae4a 100644
--- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go
+++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go
@@ -757,19 +757,33 @@ func TestZoneSpreadPriority(t *testing.T) {
test: "service pod on non-zoned node",
},
}
+ // these local variables just make sure controllerLister\replicaSetLister\statefulSetLister not nil
+ // when construct mataDataProducer
+ sss := []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}
+ rcs := []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}
+ rss := []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}
- for _, test := range tests {
- nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
+ for i, test := range tests {
+ nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(test.nodes))
zoneSpread := ServiceAntiAffinity{podLister: schedulertesting.FakePodLister(test.pods), serviceLister: schedulertesting.FakeServiceLister(test.services), label: "zone"}
- list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes))
+
+ mataDataProducer := NewPriorityMetadataFactory(
+ schedulertesting.FakeServiceLister(test.services),
+ schedulertesting.FakeControllerLister(rcs),
+ schedulertesting.FakeReplicaSetLister(rss),
+ schedulertesting.FakeStatefulSetLister(sss))
+ mataData := mataDataProducer(test.pod, nodeNameToInfo)
+ ttp := priorityFunction(zoneSpread.CalculateAntiAffinityPriorityMap, zoneSpread.CalculateAntiAffinityPriorityReduce, mataData)
+ list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes))
if err != nil {
- t.Errorf("unexpected error: %v", err)
+ t.Errorf("unexpected error: %v index : %d", err, i)
}
+
// sort the two lists to avoid failures on account of different ordering
sort.Sort(test.expectedList)
sort.Sort(list)
if !reflect.DeepEqual(test.expectedList, list) {
- t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
+ t.Errorf("test index %d (%s): expected %#v, got %#v", i, test.test, test.expectedList, list)
}
}
}
diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go
index 6c7a7ab7d5f..3bae125c351 100644
--- a/plugin/pkg/scheduler/factory/plugins.go
+++ b/plugin/pkg/scheduler/factory/plugins.go
@@ -305,7 +305,7 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
pcf = &PriorityConfigFactory{
- Function: func(args PluginFactoryArgs) algorithm.PriorityFunction {
+ MapReduceFunction: func(args PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewServiceAntiAffinityPriority(
args.PodLister,
args.ServiceLister,