Merge pull request #18413 from xiang90/p_schedule

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-12-21 11:32:53 -08:00
commit 29754318ad

View File

@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -159,7 +160,7 @@ func findNodesThatFit(pod *api.Pod, machineToPods map[string][]*api.Pod, predica
return api.NodeList{Items: filtered}, failedPredicateMap, nil
}
// Prioritizes the nodes by running the individual priority functions sequentially.
// Prioritizes the nodes by running the individual priority functions in parallel.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred node) and 10 is the highest
// Each priority function can also have its own weight
@ -174,40 +175,72 @@ func PrioritizeNodes(pod *api.Pod, machinesToPods map[string][]*api.Pod, podList
return EqualPriority(pod, machinesToPods, podLister, nodeLister)
}
combinedScores := map[string]int{}
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
combinedScores = map[string]int{}
errs []error
)
for _, priorityConfig := range priorityConfigs {
weight := priorityConfig.Weight
// skip the priority function if the weight is specified as 0
if weight == 0 {
if priorityConfig.Weight == 0 {
continue
}
priorityFunc := priorityConfig.Function
prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister)
if err != nil {
return schedulerapi.HostPriorityList{}, err
}
for _, hostEntry := range prioritizedList {
combinedScores[hostEntry.Host] += hostEntry.Score * weight
}
wg.Add(1)
go func(config algorithm.PriorityConfig) {
defer wg.Done()
weight := config.Weight
priorityFunc := config.Function
prioritizedList, err := priorityFunc(pod, machinesToPods, podLister, nodeLister)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return
}
mu.Lock()
for i := range prioritizedList {
host, score := prioritizedList[i].Host, prioritizedList[i].Score
combinedScores[host] += score * weight
}
mu.Unlock()
}(priorityConfig)
}
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}
// wait for all go routines to finish
wg.Wait()
if len(extenders) != 0 && nodeLister != nil {
nodes, err := nodeLister.List()
if err != nil {
return schedulerapi.HostPriorityList{}, err
}
for _, extender := range extenders {
prioritizedList, weight, err := extender.Prioritize(pod, &nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
continue
}
for _, hostEntry := range *prioritizedList {
combinedScores[hostEntry.Host] += hostEntry.Score * weight
}
wg.Add(1)
go func(ext algorithm.SchedulerExtender) {
defer wg.Done()
prioritizedList, weight, err := ext.Prioritize(pod, &nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
}
mu.Lock()
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
combinedScores[host] += score * weight
}
mu.Unlock()
}(extender)
}
}
// wait for all go routines to finish
wg.Wait()
for host, score := range combinedScores {
glog.V(10).Infof("Host %s Score %d", host, score)
result = append(result, schedulerapi.HostPriority{Host: host, Score: score})