mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Compute priorities in parallel
This commit is contained in:
parent
13357bd653
commit
ff765ed43b
@ -238,8 +238,6 @@ func PrioritizeNodes(
|
||||
nodes []*api.Node,
|
||||
extenders []algorithm.SchedulerExtender,
|
||||
) (schedulerapi.HostPriorityList, error) {
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||
|
||||
// If no priority configs are provided, then the EqualPriority function is applied
|
||||
// This is required to generate the priority list in the required format
|
||||
if len(priorityConfigs) == 0 && len(extenders) == 0 {
|
||||
@ -247,63 +245,82 @@ func PrioritizeNodes(
|
||||
}
|
||||
|
||||
var (
|
||||
mu = sync.Mutex{}
|
||||
wg = sync.WaitGroup{}
|
||||
combinedScores = make(map[string]int, len(nodeNameToInfo))
|
||||
errs []error
|
||||
mu = sync.Mutex{}
|
||||
wg = sync.WaitGroup{}
|
||||
errs []error
|
||||
)
|
||||
appendError := func(err error) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
meta := priorities.PriorityMetadata(pod, nodes)
|
||||
for _, priorityConfig := range priorityConfigs {
|
||||
// skip the priority function if the weight is specified as 0
|
||||
if priorityConfig.Weight == 0 {
|
||||
continue
|
||||
results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs))
|
||||
for range priorityConfigs {
|
||||
results = append(results, nil)
|
||||
}
|
||||
for i, priorityConfig := range priorityConfigs {
|
||||
if priorityConfig.Function != nil {
|
||||
// DEPRECATED
|
||||
wg.Add(1)
|
||||
go func(index int, config algorithm.PriorityConfig) {
|
||||
defer wg.Done()
|
||||
var err error
|
||||
results[index], err = config.Function(pod, nodeNameToInfo, nodes)
|
||||
if err != nil {
|
||||
appendError(err)
|
||||
}
|
||||
}(i, priorityConfig)
|
||||
} else {
|
||||
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(config algorithm.PriorityConfig) {
|
||||
defer wg.Done()
|
||||
weight := config.Weight
|
||||
|
||||
prioritizedList, err := func() (schedulerapi.HostPriorityList, error) {
|
||||
if config.Function != nil {
|
||||
return config.Function(pod, nodeNameToInfo, nodes)
|
||||
}
|
||||
prioritizedList := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||
for i := range nodes {
|
||||
hostResult, err := config.Map(pod, meta, nodeNameToInfo[nodes[i].Name])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prioritizedList = append(prioritizedList, hostResult)
|
||||
}
|
||||
if config.Reduce != nil {
|
||||
if err := config.Reduce(prioritizedList); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return prioritizedList, nil
|
||||
}()
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
}
|
||||
processNode := func(index int) {
|
||||
nodeInfo := nodeNameToInfo[nodes[index].Name]
|
||||
var err error
|
||||
for i := range priorityConfigs {
|
||||
if priorityConfigs[i].Function != nil {
|
||||
continue
|
||||
}
|
||||
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
appendError(err)
|
||||
return
|
||||
}
|
||||
for i := range prioritizedList {
|
||||
host, score := prioritizedList[i].Host, prioritizedList[i].Score
|
||||
combinedScores[host] += score * weight
|
||||
}
|
||||
}(priorityConfig)
|
||||
}
|
||||
}
|
||||
// wait for all go routines to finish
|
||||
workqueue.Parallelize(16, len(nodes), processNode)
|
||||
for i, priorityConfig := range priorityConfigs {
|
||||
if priorityConfig.Reduce == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(index int, config algorithm.PriorityConfig) {
|
||||
defer wg.Done()
|
||||
if err := config.Reduce(results[index]); err != nil {
|
||||
appendError(err)
|
||||
}
|
||||
}(i, priorityConfig)
|
||||
}
|
||||
// Wait for all computations to be finished.
|
||||
wg.Wait()
|
||||
if len(errs) != 0 {
|
||||
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// Summarize all scores.
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||
// TODO: Consider parallelizing it.
|
||||
for i := range nodes {
|
||||
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
|
||||
for j := range priorityConfigs {
|
||||
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
|
||||
}
|
||||
}
|
||||
|
||||
if len(extenders) != 0 && nodes != nil {
|
||||
combinedScores := make(map[string]int, len(nodeNameToInfo))
|
||||
for _, extender := range extenders {
|
||||
wg.Add(1)
|
||||
go func(ext algorithm.SchedulerExtender) {
|
||||
@ -321,13 +338,17 @@ func PrioritizeNodes(
|
||||
mu.Unlock()
|
||||
}(extender)
|
||||
}
|
||||
// wait for all go routines to finish
|
||||
wg.Wait()
|
||||
for i := range result {
|
||||
result[i].Score += combinedScores[result[i].Host]
|
||||
}
|
||||
}
|
||||
// wait for all go routines to finish
|
||||
wg.Wait()
|
||||
|
||||
for host, score := range combinedScores {
|
||||
glog.V(10).Infof("Host %s Score %d", host, score)
|
||||
result = append(result, schedulerapi.HostPriority{Host: host, Score: score})
|
||||
if glog.V(10) {
|
||||
for i := range result {
|
||||
glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user