From 4aa92bac73f33dd88aa4135fb5c4699cd452569f Mon Sep 17 00:00:00 2001
From: Gavin
Date: Thu, 2 Nov 2017 15:08:38 +0800
Subject: [PATCH] Refactoring of priority function(CaculateSpreadPriority) by
using map/reduce pattern
---
.../priorities/selector_spreading.go | 192 ++++++++----------
1 file changed, 88 insertions(+), 104 deletions(-)
diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
index 721531f7e32..940813f2cb9 100644
--- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
+++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
@@ -17,12 +17,10 @@ limitations under the License.
package priorities
import (
- "sync"
+ "fmt"
"k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
- "k8s.io/client-go/util/workqueue"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@@ -46,149 +44,135 @@ func NewSelectorSpreadPriority(
serviceLister algorithm.ServiceLister,
controllerLister algorithm.ControllerLister,
replicaSetLister algorithm.ReplicaSetLister,
- statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction {
+ statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
selectorSpread := &SelectorSpread{
serviceLister: serviceLister,
controllerLister: controllerLister,
replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister,
}
- return selectorSpread.CalculateSpreadPriority
+ return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
}
-// Returns selectors of services, RCs and RSs matching the given pod.
-func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
- var selectors []labels.Selector
- if services, err := sl.GetPodServices(pod); err == nil {
- for _, service := range services {
- selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
- }
- }
- if rcs, err := cl.GetPodControllers(pod); err == nil {
- for _, rc := range rcs {
- selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
- }
- }
- if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
- for _, rs := range rss {
- if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
- selectors = append(selectors, selector)
- }
- }
- }
- if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
- for _, ss := range sss {
- if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
- selectors = append(selectors, selector)
- }
- }
- }
- return selectors
-}
-
-func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector {
- return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
-}
-
-// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
+// CalculateSpreadPriorityMap spreads pods across hosts, considering pods belonging to the same service or replication controller.
// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.
// i.e. it pushes the scheduler towards a node where there's the smallest number of
// pods which match the same service, RC or RS selectors as the pod being scheduled.
-// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
-func (s *SelectorSpread) CalculateSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
- selectors := s.getSelectors(pod)
+func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
+ var selectors []labels.Selector
+ node := nodeInfo.Node()
+ if node == nil {
+ return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
+ }
- // Count similar pods by node
- countsByNodeName := make(map[string]float64, len(nodes))
- countsByZone := make(map[string]float64, 10)
- maxCountByNodeName := float64(0)
- countsByNodeNameLock := sync.Mutex{}
+ priorityMeta, ok := meta.(*priorityMetadata)
+ if ok {
+ selectors = priorityMeta.podSelectors
+ } else {
+ selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
+ }
+
+ if len(selectors) == 0 {
+ return schedulerapi.HostPriority{
+ Host: node.Name,
+ Score: int(0),
+ }, nil
+ }
+
+ count := float64(0)
+ for _, nodePod := range nodeInfo.Pods() {
+ if pod.Namespace != nodePod.Namespace {
+ continue
+ }
+ // When we are replacing a failed pod, we often see the previous
+ // deleted version while scheduling the replacement.
+ // Ignore the previous deleted version for spreading purposes
+ // (it can still be considered for resource restrictions etc.)
+ if nodePod.DeletionTimestamp != nil {
+ glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
+ continue
+ }
+ matches := false
+ for _, selector := range selectors {
+ if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
+ matches = true
+ break
+ }
+ }
+ if matches {
+ count++
+ }
+ }
+ return schedulerapi.HostPriority{
+ Host: node.Name,
+ Score: int(count),
+ }, nil
+}
+
+// CalculateSpreadPriorityReduce calculates the source of each node based on the number of existing matching pods on the node
+// where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
+func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
+ var selectors []labels.Selector
+ countsByZone := make(map[string]int, 10)
+ maxCountByZone := int(0)
+ maxCountByNodeName := int(0)
+
+ priorityMeta, ok := meta.(*priorityMetadata)
+ if ok {
+ selectors = priorityMeta.podSelectors
+ } else {
+ selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
+ }
if len(selectors) > 0 {
- processNodeFunc := func(i int) {
- nodeName := nodes[i].Name
- count := float64(0)
- for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
- if pod.Namespace != nodePod.Namespace {
- continue
- }
- // When we are replacing a failed pod, we often see the previous
- // deleted version while scheduling the replacement.
- // Ignore the previous deleted version for spreading purposes
- // (it can still be considered for resource restrictions etc.)
- if nodePod.DeletionTimestamp != nil {
- glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
- continue
- }
- matches := false
- for _, selector := range selectors {
- if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
- matches = true
- break
- }
- }
- if matches {
- count++
- }
+ for i := range result {
+ if result[i].Score > maxCountByNodeName {
+ maxCountByNodeName = result[i].Score
}
- zoneId := utilnode.GetZoneKey(nodes[i])
-
- countsByNodeNameLock.Lock()
- defer countsByNodeNameLock.Unlock()
- countsByNodeName[nodeName] = count
- if count > maxCountByNodeName {
- maxCountByNodeName = count
- }
- if zoneId != "" {
- countsByZone[zoneId] += count
+ zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
+ if zoneId == "" {
+ continue
}
+ countsByZone[zoneId] += result[i].Score
+ }
+ }
+
+ for zoneId := range countsByZone {
+ if countsByZone[zoneId] > maxCountByZone {
+ maxCountByZone = countsByZone[zoneId]
}
- workqueue.Parallelize(16, len(nodes), processNodeFunc)
}
- // Aggregate by-zone information
- // Compute the maximum number of pods hosted in any zone
haveZones := len(countsByZone) != 0
- maxCountByZone := float64(0)
- for _, count := range countsByZone {
- if count > maxCountByZone {
- maxCountByZone = count
- }
- }
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
- //score int - scale of 0-maxPriority
- // 0 being the lowest priority and maxPriority being the highest
- for _, node := range nodes {
+ for i := range result {
// initializing to the default/max node score of maxPriority
fScore := float64(schedulerapi.MaxPriority)
if maxCountByNodeName > 0 {
- fScore = float64(schedulerapi.MaxPriority) * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName)
+ fScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByNodeName-result[i].Score) / float64(maxCountByNodeName))
}
-
// If there is zone information present, incorporate it
if haveZones {
- zoneId := utilnode.GetZoneKey(node)
+ zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
if zoneId != "" {
zoneScore := float64(schedulerapi.MaxPriority)
if maxCountByZone > 0 {
- zoneScore = float64(schedulerapi.MaxPriority) * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone)
+ zoneScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByZone-countsByZone[zoneId]) / float64(maxCountByZone))
}
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
}
}
-
- result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
+ result[i].Score = int(fScore)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof(
- "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
+ "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore),
)
}
}
- return result, nil
+ return nil
}
type ServiceAntiAffinity struct {