diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index ba5654afe82..8c6101be6bb 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -159,7 +160,7 @@ func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predica return api.NodeList{Items: filtered}, failedPredicateMap, nil } -// Prioritizes the nodes by running the individual priority functions sequentially. +// Prioritizes the nodes by running the individual priority functions in parallel. // Each priority function is expected to set a score of 0-10 // 0 is the lowest priority score (least preferred node) and 10 is the highest // Each priority function can also have its own weight @@ -174,40 +175,72 @@ func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podList return EqualPriority(pod, machinesToPods, podLister, nodeLister) } - combinedScores := map[string]int{} + var ( + mu = sync.Mutex{} + wg = sync.WaitGroup{} + combinedScores = map[string]int{} + errs []error + ) + for _, priorityConfig := range priorityConfigs { - weight := priorityConfig.Weight // skip the priority function if the weight is specified as 0 - if weight == 0 { + if priorityConfig.Weight == 0 { continue } - priorityFunc := priorityConfig.Function - prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister) - if err != nil { - return schedulerapi.HostPriorityList{}, err - } - for _, hostEntry := range prioritizedList { - combinedScores[hostEntry.Host] += hostEntry.Score * weight - } + + wg.Add(1) + go func(config algorithm.PriorityConfig) { + defer wg.Done() + weight := config.Weight + priorityFunc := config.Function + prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister) + if err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + return + } + mu.Lock() + for i := range prioritizedList { + host, score := prioritizedList[i].Host, prioritizedList[i].Score + combinedScores[host] += score * weight + } + mu.Unlock() + }(priorityConfig) } + if len(errs) != 0 { + return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs) + } + + // wait for all go routines to finish + wg.Wait() + if len(extenders) != 0 && nodeLister != nil { nodes, err := nodeLister.List() if err != nil { return schedulerapi.HostPriorityList{}, err } - for _, extender := range extenders { - prioritizedList, weight, err := extender.Prioritize(pod, &nodes) - if err != nil { - // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities - continue - } - - for _, hostEntry := range *prioritizedList { - combinedScores[hostEntry.Host] += hostEntry.Score * weight - } + wg.Add(1) + go func(ext algorithm.SchedulerExtender) { + defer wg.Done() + prioritizedList, weight, err := ext.Prioritize(pod, &nodes) + if err != nil { + // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities + return + } + mu.Lock() + for i := range *prioritizedList { + host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score + combinedScores[host] += score * weight + } + mu.Unlock() + }(extender) } } + // wait for all go routines to finish + wg.Wait() + for host, score := range combinedScores { glog.V(10).Infof("Host %s Score %d", host, score) result = append(result, schedulerapi.HostPriority{Host: host, Score: score})