diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 05088d8df25..60694ac5551 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -238,8 +238,6 @@ func PrioritizeNodes( nodes []*api.Node, extenders []algorithm.SchedulerExtender, ) (schedulerapi.HostPriorityList, error) { - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format if len(priorityConfigs) == 0 && len(extenders) == 0 { @@ -247,63 +245,82 @@ func PrioritizeNodes( } var ( - mu = sync.Mutex{} - wg = sync.WaitGroup{} - combinedScores = make(map[string]int, len(nodeNameToInfo)) - errs []error + mu = sync.Mutex{} + wg = sync.WaitGroup{} + errs []error ) + appendError := func(err error) { + mu.Lock() + defer mu.Unlock() + errs = append(errs, err) + } meta := priorities.PriorityMetadata(pod, nodes) - for _, priorityConfig := range priorityConfigs { - // skip the priority function if the weight is specified as 0 - if priorityConfig.Weight == 0 { - continue + results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs)) + for range priorityConfigs { + results = append(results, nil) + } + for i, priorityConfig := range priorityConfigs { + if priorityConfig.Function != nil { + // DEPRECATED + wg.Add(1) + go func(index int, config algorithm.PriorityConfig) { + defer wg.Done() + var err error + results[index], err = config.Function(pod, nodeNameToInfo, nodes) + if err != nil { + appendError(err) + } + }(i, priorityConfig) + } else { + results[i] = make(schedulerapi.HostPriorityList, len(nodes)) } - - wg.Add(1) - go func(config algorithm.PriorityConfig) { - defer wg.Done() - weight := config.Weight - - prioritizedList, err := func() (schedulerapi.HostPriorityList, error) { - if config.Function != nil { - return config.Function(pod, nodeNameToInfo, nodes) - } - prioritizedList := make(schedulerapi.HostPriorityList, 0, len(nodes)) - for i := range nodes { - hostResult, err := config.Map(pod, meta, nodeNameToInfo[nodes[i].Name]) - if err != nil { - return nil, err - } - prioritizedList = append(prioritizedList, hostResult) - } - if config.Reduce != nil { - if err := config.Reduce(prioritizedList); err != nil { - return nil, err - } - } - return prioritizedList, nil - }() - - mu.Lock() - defer mu.Unlock() + } + processNode := func(index int) { + nodeInfo := nodeNameToInfo[nodes[index].Name] + var err error + for i := range priorityConfigs { + if priorityConfigs[i].Function != nil { + continue + } + results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) if err != nil { - errs = append(errs, err) + appendError(err) return } - for i := range prioritizedList { - host, score := prioritizedList[i].Host, prioritizedList[i].Score - combinedScores[host] += score * weight - } - }(priorityConfig) + } } - // wait for all go routines to finish + workqueue.Parallelize(16, len(nodes), processNode) + for i, priorityConfig := range priorityConfigs { + if priorityConfig.Reduce == nil { + continue + } + wg.Add(1) + go func(index int, config algorithm.PriorityConfig) { + defer wg.Done() + if err := config.Reduce(results[index]); err != nil { + appendError(err) + } + }(i, priorityConfig) + } + // Wait for all computations to be finished. wg.Wait() if len(errs) != 0 { return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs) } + // Summarize all scores. + result := make(schedulerapi.HostPriorityList, 0, len(nodes)) + // TODO: Consider parallelizing it. + for i := range nodes { + result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0}) + for j := range priorityConfigs { + result[i].Score += results[j][i].Score * priorityConfigs[j].Weight + } + } + if len(extenders) != 0 && nodes != nil { + combinedScores := make(map[string]int, len(nodeNameToInfo)) for _, extender := range extenders { wg.Add(1) go func(ext algorithm.SchedulerExtender) { @@ -321,13 +338,17 @@ func PrioritizeNodes( mu.Unlock() }(extender) } + // wait for all go routines to finish + wg.Wait() + for i := range result { + result[i].Score += combinedScores[result[i].Host] + } } - // wait for all go routines to finish - wg.Wait() - for host, score := range combinedScores { - glog.V(10).Infof("Host %s Score %d", host, score) - result = append(result, schedulerapi.HostPriority{Host: host, Score: score}) + if glog.V(10) { + for i := range result { + glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score) + } } return result, nil }