diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 721531f7e32..940813f2cb9 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -17,12 +17,10 @@ limitations under the License. package priorities import ( - "sync" + "fmt" "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/util/workqueue" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" @@ -46,149 +44,135 @@ func NewSelectorSpreadPriority( serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, - statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction { + statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { selectorSpread := &SelectorSpread{ serviceLister: serviceLister, controllerLister: controllerLister, replicaSetLister: replicaSetLister, statefulSetLister: statefulSetLister, } - return selectorSpread.CalculateSpreadPriority + return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce } -// Returns selectors of services, RCs and RSs matching the given pod. -func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector { - var selectors []labels.Selector - if services, err := sl.GetPodServices(pod); err == nil { - for _, service := range services { - selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) - } - } - if rcs, err := cl.GetPodControllers(pod); err == nil { - for _, rc := range rcs { - selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) - } - } - if rss, err := rsl.GetPodReplicaSets(pod); err == nil { - for _, rs := range rss { - if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { - selectors = append(selectors, selector) - } - } - } - if sss, err := ssl.GetPodStatefulSets(pod); err == nil { - for _, ss := range sss { - if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil { - selectors = append(selectors, selector) - } - } - } - return selectors -} - -func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector { - return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) -} - -// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller. +// CalculateSpreadPriorityMap spreads pods across hosts, considering pods belonging to the same service or replication controller. // When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors. // It favors nodes that have fewer existing matching pods. // i.e. it pushes the scheduler towards a node where there's the smallest number of // pods which match the same service, RC or RS selectors as the pod being scheduled. -// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. -func (s *SelectorSpread) CalculateSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) { - selectors := s.getSelectors(pod) +func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + var selectors []labels.Selector + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } - // Count similar pods by node - countsByNodeName := make(map[string]float64, len(nodes)) - countsByZone := make(map[string]float64, 10) - maxCountByNodeName := float64(0) - countsByNodeNameLock := sync.Mutex{} + priorityMeta, ok := meta.(*priorityMetadata) + if ok { + selectors = priorityMeta.podSelectors + } else { + selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) + } + + if len(selectors) == 0 { + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(0), + }, nil + } + + count := float64(0) + for _, nodePod := range nodeInfo.Pods() { + if pod.Namespace != nodePod.Namespace { + continue + } + // When we are replacing a failed pod, we often see the previous + // deleted version while scheduling the replacement. + // Ignore the previous deleted version for spreading purposes + // (it can still be considered for resource restrictions etc.) + if nodePod.DeletionTimestamp != nil { + glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) + continue + } + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + count++ + } + } + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(count), + }, nil +} + +// CalculateSpreadPriorityReduce calculates the source of each node based on the number of existing matching pods on the node +// where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. +func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error { + var selectors []labels.Selector + countsByZone := make(map[string]int, 10) + maxCountByZone := int(0) + maxCountByNodeName := int(0) + + priorityMeta, ok := meta.(*priorityMetadata) + if ok { + selectors = priorityMeta.podSelectors + } else { + selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister) + } if len(selectors) > 0 { - processNodeFunc := func(i int) { - nodeName := nodes[i].Name - count := float64(0) - for _, nodePod := range nodeNameToInfo[nodeName].Pods() { - if pod.Namespace != nodePod.Namespace { - continue - } - // When we are replacing a failed pod, we often see the previous - // deleted version while scheduling the replacement. - // Ignore the previous deleted version for spreading purposes - // (it can still be considered for resource restrictions etc.) - if nodePod.DeletionTimestamp != nil { - glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) - continue - } - matches := false - for _, selector := range selectors { - if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { - matches = true - break - } - } - if matches { - count++ - } + for i := range result { + if result[i].Score > maxCountByNodeName { + maxCountByNodeName = result[i].Score } - zoneId := utilnode.GetZoneKey(nodes[i]) - - countsByNodeNameLock.Lock() - defer countsByNodeNameLock.Unlock() - countsByNodeName[nodeName] = count - if count > maxCountByNodeName { - maxCountByNodeName = count - } - if zoneId != "" { - countsByZone[zoneId] += count + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) + if zoneId == "" { + continue } + countsByZone[zoneId] += result[i].Score + } + } + + for zoneId := range countsByZone { + if countsByZone[zoneId] > maxCountByZone { + maxCountByZone = countsByZone[zoneId] } - workqueue.Parallelize(16, len(nodes), processNodeFunc) } - // Aggregate by-zone information - // Compute the maximum number of pods hosted in any zone haveZones := len(countsByZone) != 0 - maxCountByZone := float64(0) - for _, count := range countsByZone { - if count > maxCountByZone { - maxCountByZone = count - } - } - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - //score int - scale of 0-maxPriority - // 0 being the lowest priority and maxPriority being the highest - for _, node := range nodes { + for i := range result { // initializing to the default/max node score of maxPriority fScore := float64(schedulerapi.MaxPriority) if maxCountByNodeName > 0 { - fScore = float64(schedulerapi.MaxPriority) * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName) + fScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByNodeName-result[i].Score) / float64(maxCountByNodeName)) } - // If there is zone information present, incorporate it if haveZones { - zoneId := utilnode.GetZoneKey(node) + zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node()) if zoneId != "" { zoneScore := float64(schedulerapi.MaxPriority) if maxCountByZone > 0 { - zoneScore = float64(schedulerapi.MaxPriority) * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone) + zoneScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByZone-countsByZone[zoneId]) / float64(maxCountByZone)) } fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } } - - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) + result[i].Score = int(fScore) if glog.V(10) { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. glog.V(10).Infof( - "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), + "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore), ) } } - return result, nil + return nil } type ServiceAntiAffinity struct {