Parallelize computing selector spreading priority in scheduler.

This commit is contained in:
Wojciech Tyczynski 2016-03-21 13:16:06 +01:00
parent a690c2ca76
commit ebcc8f737c

View File

@ -17,10 +17,13 @@ limitations under the License.
package priorities
import (
"sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -79,8 +82,6 @@ func getZoneKey(node *api.Node) string {
// pods which match the same service selectors or RC selectors as the pod being scheduled.
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var nsPods []*api.Pod
selectors := make([]labels.Selector, 0)
services, err := s.serviceLister.GetPodServices(pod)
if err == nil {
@ -103,19 +104,6 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
}
}
if len(selectors) > 0 {
pods, err := s.podLister.List(labels.Everything())
if err != nil {
return nil, err
}
// consider only the pods that belong to the same namespace
for _, nsPod := range pods {
if nsPod.Namespace == pod.Namespace {
nsPods = append(nsPods, nsPod)
}
}
}
nodes, err := nodeLister.List()
if err != nil {
return nil, err
@ -123,26 +111,63 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
// Count similar pods by node
countsByNodeName := map[string]int{}
for _, pod := range nsPods {
// When we are replacing a failed pod, we often see the previous deleted version
// while scheduling the replacement. Ignore the previous deleted version for spreading
// purposes (it can still be considered for resource restrictions etc.)
if pod.DeletionTimestamp != nil {
glog.V(2).Infof("skipping pending-deleted pod: %s/%s", pod.Namespace, pod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(pod.ObjectMeta.Labels)) {
matches = true
break
}
}
if !matches {
continue
}
countsByNodeNameLock := sync.Mutex{}
countsByNodeName[pod.Spec.NodeName]++
if len(selectors) > 0 {
// Create a number of go-routines that will be computing number
// of "similar" pods for given nodes.
workers := 16
toProcess := make(chan string, len(nodes.Items))
for i := range nodes.Items {
toProcess <- nodes.Items[i].Name
}
close(toProcess)
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for {
nodeName, ok := <-toProcess
if !ok {
return
}
count := 0
for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
if pod.Namespace != nodePod.Namespace {
continue
}
// When we are replacing a failed pod, we often see the previous
// deleted version while scheduling the replacement.
// Ignore the previous deleted version for spreading purposes
// (it can still be considered for resource restrictions etc.)
if nodePod.DeletionTimestamp != nil {
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
continue
}
matches := false
for _, selector := range selectors {
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
matches = true
break
}
}
if matches {
count++
}
}
func() {
countsByNodeNameLock.Lock()
defer countsByNodeNameLock.Unlock()
countsByNodeName[nodeName] = count
}()
}
}()
}
wg.Wait()
}
// Aggregate by-node information