Merge pull request #32575 from wojtek-t/concurrent_priorities

Automatic merge from submit-queue

Compute priorities in parallel

Ref #24246
This commit is contained in:
Kubernetes Submit Queue 2016-09-26 09:31:47 -07:00 committed by GitHub
commit 234be5a1d0

View File

@ -238,8 +238,6 @@ func PrioritizeNodes(
nodes []*api.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
if len(priorityConfigs) == 0 && len(extenders) == 0 {
@ -247,63 +245,82 @@ func PrioritizeNodes(
}
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
combinedScores = make(map[string]int, len(nodeNameToInfo))
errs []error
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)
appendError := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
meta := priorities.PriorityMetadata(pod, nodes)
for _, priorityConfig := range priorityConfigs {
// skip the priority function if the weight is specified as 0
if priorityConfig.Weight == 0 {
continue
results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs))
for range priorityConfigs {
results = append(results, nil)
}
for i, priorityConfig := range priorityConfigs {
if priorityConfig.Function != nil {
// DEPRECATED
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
defer wg.Done()
var err error
results[index], err = config.Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}(i, priorityConfig)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
wg.Add(1)
go func(config algorithm.PriorityConfig) {
defer wg.Done()
weight := config.Weight
prioritizedList, err := func() (schedulerapi.HostPriorityList, error) {
if config.Function != nil {
return config.Function(pod, nodeNameToInfo, nodes)
}
prioritizedList := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostResult, err := config.Map(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
prioritizedList = append(prioritizedList, hostResult)
}
if config.Reduce != nil {
if err := config.Reduce(prioritizedList); err != nil {
return nil, err
}
}
return prioritizedList, nil
}()
mu.Lock()
defer mu.Unlock()
}
processNode := func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
var err error
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
}
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
errs = append(errs, err)
appendError(err)
return
}
for i := range prioritizedList {
host, score := prioritizedList[i].Host, prioritizedList[i].Score
combinedScores[host] += score * weight
}
}(priorityConfig)
}
}
// wait for all go routines to finish
workqueue.Parallelize(16, len(nodes), processNode)
for i, priorityConfig := range priorityConfigs {
if priorityConfig.Reduce == nil {
continue
}
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
defer wg.Done()
if err := config.Reduce(results[index]); err != nil {
appendError(err)
}
}(i, priorityConfig)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
// TODO: Consider parallelizing it.
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for _, extender := range extenders {
wg.Add(1)
go func(ext algorithm.SchedulerExtender) {
@ -321,13 +338,17 @@ func PrioritizeNodes(
mu.Unlock()
}(extender)
}
// wait for all go routines to finish
wg.Wait()
for i := range result {
result[i].Score += combinedScores[result[i].Host]
}
}
// wait for all go routines to finish
wg.Wait()
for host, score := range combinedScores {
glog.V(10).Infof("Host %s Score %d", host, score)
result = append(result, schedulerapi.HostPriority{Host: host, Score: score})
if glog.V(10) {
for i := range result {
glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
}
}
return result, nil
}