Merge pull request #71722 from bsalamat/fix_priorityconfig

Avoid copying PriorityConfig and SchedulerExtender structs for every node while running priority functions
This commit is contained in:
Kubernetes Prow Robot 2018-12-05 20:31:51 -08:00 committed by GitHub
commit 720c10282c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -657,24 +657,26 @@ func PrioritizeNodes(
// DEPRECATED: we can remove this when all priorityConfigs implement the // DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern. // Map-Reduce pattern.
workqueue.ParallelizeUntil(context.TODO(), 16, len(priorityConfigs), func(i int) { for i := range priorityConfigs {
priorityConfig := priorityConfigs[i] if priorityConfigs[i].Function != nil {
if priorityConfig.Function == nil { wg.Add(1)
results[i] = make(schedulerapi.HostPriorityList, len(nodes)) go func(index int) {
return defer wg.Done()
}
var err error var err error
results[i], err = priorityConfig.Function(pod, nodeNameToInfo, nodes) results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
if err != nil { if err != nil {
appendError(err) appendError(err)
} }
}) }(i)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name] nodeInfo := nodeNameToInfo[nodes[index].Name]
for i, priorityConfig := range priorityConfigs { for i := range priorityConfigs {
if priorityConfig.Function != nil { if priorityConfigs[i].Function != nil {
continue continue
} }
@ -687,22 +689,22 @@ func PrioritizeNodes(
} }
}) })
for i, priorityConfig := range priorityConfigs { for i := range priorityConfigs {
if priorityConfig.Reduce == nil { if priorityConfigs[i].Reduce == nil {
continue continue
} }
wg.Add(1) wg.Add(1)
go func(index int, config algorithm.PriorityConfig) { go func(index int) {
defer wg.Done() defer wg.Done()
if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil { if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err) appendError(err)
} }
if klog.V(10) { if klog.V(10) {
for _, hostPriority := range results[index] { for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, config.Name, hostPriority.Score) klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
} }
} }
}(i, priorityConfig) }(i)
} }
// Wait for all computations to be finished. // Wait for all computations to be finished.
wg.Wait() wg.Wait()
@ -722,14 +724,14 @@ func PrioritizeNodes(
if len(extenders) != 0 && nodes != nil { if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo)) combinedScores := make(map[string]int, len(nodeNameToInfo))
for _, extender := range extenders { for i := range extenders {
if !extender.IsInterested(pod) { if !extenders[i].IsInterested(pod) {
continue continue
} }
wg.Add(1) wg.Add(1)
go func(ext algorithm.SchedulerExtender) { go func(extIndex int) {
defer wg.Done() defer wg.Done()
prioritizedList, weight, err := ext.Prioritize(pod, nodes) prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil { if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return return
@ -738,12 +740,12 @@ func PrioritizeNodes(
for i := range *prioritizedList { for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
if klog.V(10) { if klog.V(10) {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, ext.Name(), score) klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
} }
combinedScores[host] += score * weight combinedScores[host] += score * weight
} }
mu.Unlock() mu.Unlock()
}(extender) }(i)
} }
// wait for all go routines to finish // wait for all go routines to finish
wg.Wait() wg.Wait()