Migrate priority functions with non-trivial reduce function.

This commit is contained in:
Wojciech Tyczynski 2016-09-12 13:48:17 +02:00
parent e02b73ff67
commit 4ccb27202c
9 changed files with 128 additions and 74 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package priorities
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
@ -29,55 +31,75 @@ import (
// it will a get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
var maxCount float64
counts := make(map[string]float64, len(nodes))
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return nil, err
func CalculateNodeAffinityPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
var affinity *api.Affinity
if priorityMeta, ok := meta.(*priorityMetadata); ok {
affinity = priorityMeta.affinity
} else {
// We couldn't parse metadata - fallback to computing it.
var err error
affinity, err = api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return schedulerapi.HostPriority{}, err
}
}
var count int32
// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
// empty PreferredSchedulingTerm matches all objects.
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
for _, preferredSchedulingTerm := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
if preferredSchedulingTerm.Weight == 0 {
continue
}
// TODO: Avoid computing it for all nodes if this becomes a performance problem.
nodeSelector, err := api.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return nil, err
return schedulerapi.HostPriority{}, err
}
for _, node := range nodes {
if nodeSelector.Matches(labels.Set(node.Labels)) {
counts[node.Name] += float64(preferredSchedulingTerm.Weight)
}
if counts[node.Name] > maxCount {
maxCount = counts[node.Name]
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += preferredSchedulingTerm.Weight
}
}
}
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for _, node := range nodes {
if maxCount > 0 {
fScore := 10 * (counts[node.Name] / maxCount)
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
} else {
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
}
}
return result, nil
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(count),
}, nil
}
func CalculateNodeAffinityPriorityReduce(pod *api.Pod, result schedulerapi.HostPriorityList) error {
var maxCount int
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
maxCountFloat := float64(maxCount)
var fScore float64
for i := range result {
if maxCount > 0 {
fScore = 10 * (float64(result[i].Score) / maxCountFloat)
} else {
fScore = 0
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore))
}
result[i].Score = int(fScore)
}
return nil
}

View File

@ -155,7 +155,9 @@ func TestNodeAffinityPriority(t *testing.T) {
}
for _, test := range tests {
list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil, test.nodes), test.nodes)
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce)
list, err := nap(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -32,15 +32,27 @@ import (
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
nonZeroRequest *schedulercache.Resource
podTolerations []api.Toleration
affinity *api.Affinity
}
func PriorityMetadata(pod *api.Pod, nodes []*api.Node) interface{} {
func PriorityMetadata(pod *api.Pod) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
tolerations, err := getTolerationListFromPod(pod)
if err != nil {
return nil
}
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return nil
}
return &priorityMetadata{
nonZeroRequest: getNonZeroRequests(pod),
podTolerations: tolerations,
affinity: affinity,
}
}

View File

@ -61,7 +61,7 @@ func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.Pr
result = append(result, hostResult)
}
if reduceFn != nil {
if err := reduceFn(result); err != nil {
if err := reduceFn(pod, result); err != nil {
return nil, err
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package priorities
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
@ -24,7 +26,7 @@ import (
)
// CountIntolerableTaintsPreferNoSchedule gives the count of intolerable taints of a pod with effect PreferNoSchedule
func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []api.Toleration) (intolerableTaints float64) {
func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []api.Toleration) (intolerableTaints int) {
for i := range taints {
taint := &taints[i]
// check only on taints that have effect PreferNoSchedule
@ -50,53 +52,65 @@ func getAllTolerationPreferNoSchedule(tolerations []api.Toleration) (tolerationL
return
}
// ComputeTaintTolerationPriority prepares the priority list for all the nodes based on the number of intolerable taints on the node
func ComputeTaintTolerationPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
// the max value of counts
var maxCount float64
// counts hold the count of intolerable taints of a pod for a given node
counts := make(map[string]float64, len(nodes))
func getTolerationListFromPod(pod *api.Pod) ([]api.Toleration, error) {
tolerations, err := api.GetTolerationsFromPodAnnotations(pod.Annotations)
if err != nil {
return nil, err
}
// Fetch a list of all toleration with effect PreferNoSchedule
tolerationList := getAllTolerationPreferNoSchedule(tolerations)
return getAllTolerationPreferNoSchedule(tolerations), nil
}
// calculate the intolerable taints for all the nodes
for _, node := range nodes {
taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
// ComputeTaintTolerationPriority prepares the priority list for all the nodes based on the number of intolerable taints on the node
func ComputeTaintTolerationPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
var tolerationList []api.Toleration
if priorityMeta, ok := meta.(*priorityMetadata); ok {
tolerationList = priorityMeta.podTolerations
} else {
var err error
tolerationList, err = getTolerationListFromPod(pod)
if err != nil {
return nil, err
}
count := countIntolerableTaintsPreferNoSchedule(taints, tolerationList)
if count > 0 {
// 0 is default value, so avoid unnecessary map operations.
counts[node.Name] = count
if count > maxCount {
maxCount = count
}
return schedulerapi.HostPriority{}, err
}
}
taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
if err != nil {
return schedulerapi.HostPriority{}, err
}
return schedulerapi.HostPriority{
Host: node.Name,
Score: countIntolerableTaintsPreferNoSchedule(taints, tolerationList),
}, nil
}
func ComputeTaintTolerationPriorityReduce(pod *api.Pod, result schedulerapi.HostPriorityList) error {
var maxCount int
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
maxCountFloat := float64(maxCount)
// The maximum priority value to give to a node
// Priority values range from 0 - maxPriority
const maxPriority = float64(10)
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for _, node := range nodes {
for i := range result {
fScore := maxPriority
if maxCount > 0 {
fScore = (1.0 - counts[node.Name]/maxCount) * 10
if maxCountFloat > 0 {
fScore = (1.0 - float64(result[i].Score)/maxCountFloat) * 10
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, node.Name, int(fScore))
glog.Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, result[i].Host, int(fScore))
}
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
result[i].Score = int(fScore)
}
return result, nil
return nil
}

View File

@ -210,8 +210,9 @@ func TestTaintAndToleration(t *testing.T) {
},
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, nil)
list, err := ComputeTaintTolerationPriority(test.pod, nodeNameToInfo, test.nodes)
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce)
list, err := ttp(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("%s, unexpected error: %v", test.test, err)
}

View File

@ -24,16 +24,19 @@ import (
// FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
// TODO: Change interface{} to a specific type.
type FitPredicate func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
// PriorityMapFunction is a function that computes per-node results for a given node.
// TODO: Figure out the exact API of this method.
// TODO: Change interface{} to a specific type.
type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error)
// PriorityReduceFunction is a function that aggregated per-node results and computes
// final scores for all nodes.
// TODO: Figure out the exact API of this method.
type PriorityReduceFunction func(result schedulerapi.HostPriorityList) error
// TODO: Change interface{} to a specific type.
type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error
// DEPRECATED
// Use Map-Reduce pattern for priority functions.

View File

@ -181,8 +181,8 @@ func defaultPriorities() sets.String {
// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
factory.RegisterPriorityFunction("NodeAffinityPriority", priorities.CalculateNodeAffinityPriority, 1),
factory.RegisterPriorityFunction("TaintTolerationPriority", priorities.ComputeTaintTolerationPriority, 1),
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(

View File

@ -255,7 +255,7 @@ func PrioritizeNodes(
errs = append(errs, err)
}
meta := priorities.PriorityMetadata(pod, nodes)
meta := priorities.PriorityMetadata(pod)
results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs))
for range priorityConfigs {
results = append(results, nil)
@ -298,7 +298,7 @@ func PrioritizeNodes(
wg.Add(1)
go func(index int, config algorithm.PriorityConfig) {
defer wg.Done()
if err := config.Reduce(results[index]); err != nil {
if err := config.Reduce(pod, results[index]); err != nil {
appendError(err)
}
}(i, priorityConfig)