Refactor Priority Reduce functions

- Reducing the duplicated reduce function by introducing a reduce
  function generator that generates common reduce functions.
- Remove logs from reduce function, so it's purely calculating scores.
- Optimize the reduce functions by removing unnecessary conversion to
  float64.
This commit is contained in:
Yongkun Anfernee Gui 2017-11-07 11:34:20 -08:00
parent 45bdf707f0
commit 4be6a60df2
7 changed files with 75 additions and 52 deletions

View File

@ -18,6 +18,7 @@ go_library(
"node_affinity.go", "node_affinity.go",
"node_label.go", "node_label.go",
"node_prefer_avoid_pods.go", "node_prefer_avoid_pods.go",
"reduce.go",
"selector_spreading.go", "selector_spreading.go",
"taint_toleration.go", "taint_toleration.go",
"test_util.go", "test_util.go",

View File

@ -24,8 +24,6 @@ import (
v1helper "k8s.io/kubernetes/pkg/api/v1/helper" v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
) )
// CalculateNodeAffinityPriority prioritizes nodes according to node affinity scheduling preferences // CalculateNodeAffinityPriority prioritizes nodes according to node affinity scheduling preferences
@ -76,28 +74,4 @@ func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *s
}, nil }, nil
} }
func CalculateNodeAffinityPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error { var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)
var maxCount int
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
maxCountFloat := float64(maxCount)
var fScore float64
for i := range result {
if maxCount > 0 {
fScore = float64(schedulerapi.MaxPriority) * (float64(result[i].Score) / maxCountFloat)
} else {
fScore = 0
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore))
}
result[i].Score = int(fScore)
}
return nil
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// NormalizeReduce generates a PriorityReduceFunction that can normalize the result
// scores to [0, maxPriority]. If reverse is set to true, it reverses the scores by
// subtracting it from maxPriority.
func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
return func(
_ *v1.Pod,
_ interface{},
_ map[string]*schedulercache.NodeInfo,
result schedulerapi.HostPriorityList) error {
var maxCount int
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
if maxCount == 0 {
if reverse {
for i := range result {
result[i].Score = maxPriority
}
}
return nil
}
for i := range result {
score := result[i].Score
score = maxPriority * score / maxCount
if reverse {
score = maxPriority - score
}
result[i].Score = score
}
return nil
}
}

View File

@ -23,8 +23,6 @@ import (
v1helper "k8s.io/kubernetes/pkg/api/v1/helper" v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
) )
// CountIntolerableTaintsPreferNoSchedule gives the count of intolerable taints of a pod with effect PreferNoSchedule // CountIntolerableTaintsPreferNoSchedule gives the count of intolerable taints of a pod with effect PreferNoSchedule
@ -75,26 +73,4 @@ func ComputeTaintTolerationPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *
} }
// ComputeTaintTolerationPriorityReduce calculates the source of each node based on the number of intolerable taints on the node // ComputeTaintTolerationPriorityReduce calculates the source of each node based on the number of intolerable taints on the node
func ComputeTaintTolerationPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error { var ComputeTaintTolerationPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, true)
var maxCount int
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
maxCountFloat := float64(maxCount)
for i := range result {
fScore := float64(schedulerapi.MaxPriority)
if maxCountFloat > 0 {
fScore = (1.0 - float64(result[i].Score)/maxCountFloat) * float64(schedulerapi.MaxPriority)
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, result[i].Host, int(fScore))
}
result[i].Score = int(fScore)
}
return nil
}

View File

@ -53,6 +53,7 @@ type MetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercach
type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error)
type PriorityConfig struct { type PriorityConfig struct {
Name string
Map PriorityMapFunction Map PriorityMapFunction
Reduce PriorityReduceFunction Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to // TODO: Remove it after migrating all functions to

View File

@ -433,6 +433,11 @@ func PrioritizeNodes(
if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil { if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err) appendError(err)
} }
if glog.V(10) {
for _, hostPriority := range results[index] {
glog.Infof("%v -> %v: %v, Score: (%d)", pod.Name, hostPriority.Host, config.Name, hostPriority.Score)
}
}
}(i, priorityConfig) }(i, priorityConfig)
} }
// Wait for all computations to be finished. // Wait for all computations to be finished.

View File

@ -429,12 +429,14 @@ func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]al
} }
if factory.Function != nil { if factory.Function != nil {
configs = append(configs, algorithm.PriorityConfig{ configs = append(configs, algorithm.PriorityConfig{
Name: name,
Function: factory.Function(args), Function: factory.Function(args),
Weight: factory.Weight, Weight: factory.Weight,
}) })
} else { } else {
mapFunction, reduceFunction := factory.MapReduceFunction(args) mapFunction, reduceFunction := factory.MapReduceFunction(args)
configs = append(configs, algorithm.PriorityConfig{ configs = append(configs, algorithm.PriorityConfig{
Name: name,
Map: mapFunction, Map: mapFunction,
Reduce: reduceFunction, Reduce: reduceFunction,
Weight: factory.Weight, Weight: factory.Weight,