diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index ddcc05198ae..a8c1ce6b71d 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/labels" utilnode "k8s.io/kubernetes/pkg/util/node" - utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -32,7 +32,7 @@ import ( // The maximum priority value to give to a node // Prioritiy values range from 0-maxPriority -const maxPriority = 10 +const maxPriority float32 = 10 // When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading // TODO: Any way to justify this weighting? @@ -62,21 +62,18 @@ func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algo // pods which match the same service selectors or RC selectors as the pod being scheduled. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { - selectors := make([]labels.Selector, 0) - services, err := s.serviceLister.GetPodServices(pod) - if err == nil { + selectors := make([]labels.Selector, 0, 3) + if services, err := s.serviceLister.GetPodServices(pod); err == nil { for _, service := range services { selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) } } - rcs, err := s.controllerLister.GetPodControllers(pod) - if err == nil { + if rcs, err := s.controllerLister.GetPodControllers(pod); err == nil { for _, rc := range rcs { selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) } } - rss, err := s.replicaSetLister.GetPodReplicaSets(pod) - if err == nil { + if rss, err := s.replicaSetLister.GetPodReplicaSets(pod); err == nil { for _, rs := range rss { if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { selectors = append(selectors, selector) @@ -90,96 +87,57 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma } // Count similar pods by node - countsByNodeName := map[string]int{} + countsByNodeName := make(map[string]float32, len(nodes)) + countsByZone := make(map[string]float32, 10) + maxCountByNodeName := float32(0) countsByNodeNameLock := sync.Mutex{} if len(selectors) > 0 { - // Create a number of go-routines that will be computing number - // of "similar" pods for given nodes. - workers := 16 - toProcess := make(chan string, len(nodes)) - for i := range nodes { - toProcess <- nodes[i].Name - } - close(toProcess) - - // TODO: Use Parallelize. - wg := sync.WaitGroup{} - wg.Add(workers) - for i := 0; i < workers; i++ { - go func() { - defer utilruntime.HandleCrash() - defer wg.Done() - for { - nodeName, ok := <-toProcess - if !ok { - return - } - count := 0 - for _, nodePod := range nodeNameToInfo[nodeName].Pods() { - if pod.Namespace != nodePod.Namespace { - continue - } - // When we are replacing a failed pod, we often see the previous - // deleted version while scheduling the replacement. - // Ignore the previous deleted version for spreading purposes - // (it can still be considered for resource restrictions etc.) - if nodePod.DeletionTimestamp != nil { - glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) - continue - } - matches := false - for _, selector := range selectors { - if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { - matches = true - break - } - } - if matches { - count++ - } - } - - func() { - countsByNodeNameLock.Lock() - defer countsByNodeNameLock.Unlock() - countsByNodeName[nodeName] = count - }() + processNodeFunc := func(i int) { + nodeName := nodes[i].Name + count := float32(0) + for _, nodePod := range nodeNameToInfo[nodeName].Pods() { + if pod.Namespace != nodePod.Namespace { + continue } - }() - } - wg.Wait() - } + // When we are replacing a failed pod, we often see the previous + // deleted version while scheduling the replacement. + // Ignore the previous deleted version for spreading purposes + // (it can still be considered for resource restrictions etc.) + if nodePod.DeletionTimestamp != nil { + glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) + continue + } + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + count++ + } + } + zoneId := utilnode.GetZoneKey(nodes[i]) - // Aggregate by-node information - // Compute the maximum number of pods hosted on any node - maxCountByNodeName := 0 - for _, count := range countsByNodeName { - if count > maxCountByNodeName { - maxCountByNodeName = count + countsByNodeNameLock.Lock() + defer countsByNodeNameLock.Unlock() + countsByNodeName[nodeName] = count + if count > maxCountByNodeName { + maxCountByNodeName = count + } + if zoneId != "" { + countsByZone[zoneId] += count + } } - } - - // Count similar pods by zone, if zone information is present - countsByZone := map[string]int{} - for _, node := range nodes { - count, found := countsByNodeName[node.Name] - if !found { - continue - } - - zoneId := utilnode.GetZoneKey(node) - if zoneId == "" { - continue - } - - countsByZone[zoneId] += count + workqueue.Parallelize(16, len(nodes), processNodeFunc) } // Aggregate by-zone information // Compute the maximum number of pods hosted in any zone haveZones := len(countsByZone) != 0 - maxCountByZone := 0 + maxCountByZone := float32(0) for _, count := range countsByZone { if count > maxCountByZone { maxCountByZone = count @@ -191,24 +149,28 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma // 0 being the lowest priority and maxPriority being the highest for _, node := range nodes { // initializing to the default/max node score of maxPriority - fScore := float32(maxPriority) + fScore := maxPriority if maxCountByNodeName > 0 { - fScore = maxPriority * (float32(maxCountByNodeName-countsByNodeName[node.Name]) / float32(maxCountByNodeName)) + fScore = maxPriority * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName) } // If there is zone information present, incorporate it if haveZones { zoneId := utilnode.GetZoneKey(node) if zoneId != "" { - zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone)) + zoneScore := maxPriority * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone) fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } } result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) - glog.V(10).Infof( - "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), - ) + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.V(10).Infof( + "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), + ) + } } return result, nil } @@ -234,8 +196,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsServicePods []*api.Pod - services, err := s.serviceLister.GetPodServices(pod) - if err == nil { + if services, err := s.serviceLister.GetPodServices(pod); err == nil { // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector)