From 4ccb27202c2e356437cf1c0b53173dcf34492a8d Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 12 Sep 2016 13:48:17 +0200 Subject: [PATCH] Migrate priority functions with non-trivial reduce function. --- .../algorithm/priorities/node_affinity.go | 88 ++++++++++++------- .../priorities/node_affinity_test.go | 4 +- .../algorithm/priorities/priorities.go | 14 ++- .../algorithm/priorities/priorities_test.go | 2 +- .../algorithm/priorities/taint_toleration.go | 76 +++++++++------- .../priorities/taint_toleration_test.go | 5 +- plugin/pkg/scheduler/algorithm/types.go | 5 +- .../algorithmprovider/defaults/defaults.go | 4 +- plugin/pkg/scheduler/generic_scheduler.go | 4 +- 9 files changed, 128 insertions(+), 74 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go index 97a2c9b6544..4c843a62ba7 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity.go @@ -17,6 +17,8 @@ limitations under the License. package priorities import ( + "fmt" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" @@ -29,55 +31,75 @@ import ( // it will a get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms // the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher // score the node gets. -func CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - var maxCount float64 - counts := make(map[string]float64, len(nodes)) - - affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) - if err != nil { - return nil, err +func CalculateNodeAffinityPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") } + var affinity *api.Affinity + if priorityMeta, ok := meta.(*priorityMetadata); ok { + affinity = priorityMeta.affinity + } else { + // We couldn't parse metadata - fallback to computing it. + var err error + affinity, err = api.GetAffinityFromPodAnnotations(pod.Annotations) + if err != nil { + return schedulerapi.HostPriority{}, err + } + } + + var count int32 // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects. // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an // empty PreferredSchedulingTerm matches all objects. if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { // Match PreferredDuringSchedulingIgnoredDuringExecution term by term. - for _, preferredSchedulingTerm := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i] if preferredSchedulingTerm.Weight == 0 { continue } + // TODO: Avoid computing it for all nodes if this becomes a performance problem. nodeSelector, err := api.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions) if err != nil { - return nil, err + return schedulerapi.HostPriority{}, err } - - for _, node := range nodes { - if nodeSelector.Matches(labels.Set(node.Labels)) { - counts[node.Name] += float64(preferredSchedulingTerm.Weight) - } - - if counts[node.Name] > maxCount { - maxCount = counts[node.Name] - } + if nodeSelector.Matches(labels.Set(node.Labels)) { + count += preferredSchedulingTerm.Weight } } } - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - for _, node := range nodes { - if maxCount > 0 { - fScore := 10 * (counts[node.Name] / maxCount) - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) - if glog.V(10) { - // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is - // not logged. There is visible performance gain from it. - glog.Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore)) - } - } else { - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0}) - } - } - return result, nil + return schedulerapi.HostPriority{ + Host: node.Name, + Score: int(count), + }, nil +} + +func CalculateNodeAffinityPriorityReduce(pod *api.Pod, result schedulerapi.HostPriorityList) error { + var maxCount int + for i := range result { + if result[i].Score > maxCount { + maxCount = result[i].Score + } + } + maxCountFloat := float64(maxCount) + + var fScore float64 + for i := range result { + if maxCount > 0 { + fScore = 10 * (float64(result[i].Score) / maxCountFloat) + } else { + fScore = 0 + } + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore)) + } + result[i].Score = int(fScore) + } + return nil } diff --git a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go index be327078904..85c390c903e 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/node_affinity_test.go @@ -155,7 +155,9 @@ func TestNodeAffinityPriority(t *testing.T) { } for _, test := range tests { - list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil, test.nodes), test.nodes) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) + nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce) + list, err := nap(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index fd720d4cdc6..f798dd4d5ee 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -32,15 +32,27 @@ import ( // priorityMetadata is a type that is passed as metadata for priority functions type priorityMetadata struct { nonZeroRequest *schedulercache.Resource + podTolerations []api.Toleration + affinity *api.Affinity } -func PriorityMetadata(pod *api.Pod, nodes []*api.Node) interface{} { +func PriorityMetadata(pod *api.Pod) interface{} { // If we cannot compute metadata, just return nil if pod == nil { return nil } + tolerations, err := getTolerationListFromPod(pod) + if err != nil { + return nil + } + affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) + if err != nil { + return nil + } return &priorityMetadata{ nonZeroRequest: getNonZeroRequests(pod), + podTolerations: tolerations, + affinity: affinity, } } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index bd967932576..7ec0f6f211e 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -61,7 +61,7 @@ func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.Pr result = append(result, hostResult) } if reduceFn != nil { - if err := reduceFn(result); err != nil { + if err := reduceFn(pod, result); err != nil { return nil, err } } diff --git a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration.go b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration.go index b299044d212..36fa0312fdb 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration.go +++ b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration.go @@ -17,6 +17,8 @@ limitations under the License. package priorities import ( + "fmt" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" @@ -24,7 +26,7 @@ import ( ) // CountIntolerableTaintsPreferNoSchedule gives the count of intolerable taints of a pod with effect PreferNoSchedule -func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []api.Toleration) (intolerableTaints float64) { +func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []api.Toleration) (intolerableTaints int) { for i := range taints { taint := &taints[i] // check only on taints that have effect PreferNoSchedule @@ -50,53 +52,65 @@ func getAllTolerationPreferNoSchedule(tolerations []api.Toleration) (tolerationL return } -// ComputeTaintTolerationPriority prepares the priority list for all the nodes based on the number of intolerable taints on the node -func ComputeTaintTolerationPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - // the max value of counts - var maxCount float64 - // counts hold the count of intolerable taints of a pod for a given node - counts := make(map[string]float64, len(nodes)) - +func getTolerationListFromPod(pod *api.Pod) ([]api.Toleration, error) { tolerations, err := api.GetTolerationsFromPodAnnotations(pod.Annotations) if err != nil { return nil, err } - // Fetch a list of all toleration with effect PreferNoSchedule - tolerationList := getAllTolerationPreferNoSchedule(tolerations) + return getAllTolerationPreferNoSchedule(tolerations), nil +} - // calculate the intolerable taints for all the nodes - for _, node := range nodes { - taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations) +// ComputeTaintTolerationPriority prepares the priority list for all the nodes based on the number of intolerable taints on the node +func ComputeTaintTolerationPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + + var tolerationList []api.Toleration + if priorityMeta, ok := meta.(*priorityMetadata); ok { + tolerationList = priorityMeta.podTolerations + } else { + var err error + tolerationList, err = getTolerationListFromPod(pod) if err != nil { - return nil, err - } - - count := countIntolerableTaintsPreferNoSchedule(taints, tolerationList) - if count > 0 { - // 0 is default value, so avoid unnecessary map operations. - counts[node.Name] = count - if count > maxCount { - maxCount = count - } + return schedulerapi.HostPriority{}, err } } + taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations) + if err != nil { + return schedulerapi.HostPriority{}, err + } + return schedulerapi.HostPriority{ + Host: node.Name, + Score: countIntolerableTaintsPreferNoSchedule(taints, tolerationList), + }, nil +} + +func ComputeTaintTolerationPriorityReduce(pod *api.Pod, result schedulerapi.HostPriorityList) error { + var maxCount int + for i := range result { + if result[i].Score > maxCount { + maxCount = result[i].Score + } + } + maxCountFloat := float64(maxCount) + // The maximum priority value to give to a node // Priority values range from 0 - maxPriority const maxPriority = float64(10) - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - for _, node := range nodes { + for i := range result { fScore := maxPriority - if maxCount > 0 { - fScore = (1.0 - counts[node.Name]/maxCount) * 10 + if maxCountFloat > 0 { + fScore = (1.0 - float64(result[i].Score)/maxCountFloat) * 10 } if glog.V(10) { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. - glog.Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, node.Name, int(fScore)) + glog.Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, result[i].Host, int(fScore)) } - - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) + result[i].Score = int(fScore) } - return result, nil + return nil } diff --git a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go index f0b19a71204..93277ea59b3 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/taint_toleration_test.go @@ -210,8 +210,9 @@ func TestTaintAndToleration(t *testing.T) { }, } for _, test := range tests { - nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, nil) - list, err := ComputeTaintTolerationPriority(test.pod, nodeNameToInfo, test.nodes) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) + ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce) + list, err := ttp(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("%s, unexpected error: %v", test.test, err) } diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index 3ddb5a68ff9..9e552083990 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -24,16 +24,19 @@ import ( // FitPredicate is a function that indicates if a pod fits into an existing node. // The failure information is given by the error. +// TODO: Change interface{} to a specific type. type FitPredicate func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error) // PriorityMapFunction is a function that computes per-node results for a given node. // TODO: Figure out the exact API of this method. +// TODO: Change interface{} to a specific type. type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) // PriorityReduceFunction is a function that aggregated per-node results and computes // final scores for all nodes. // TODO: Figure out the exact API of this method. -type PriorityReduceFunction func(result schedulerapi.HostPriorityList) error +// TODO: Change interface{} to a specific type. +type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error // DEPRECATED // Use Map-Reduce pattern for priority functions. diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 7c5707ed0aa..ecb53bf9125 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -181,8 +181,8 @@ func defaultPriorities() sets.String { // Set this weight large enough to override all other priority functions. // TODO: Figure out a better way to do this, maybe at same time as fixing #24720. factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000), - factory.RegisterPriorityFunction("NodeAffinityPriority", priorities.CalculateNodeAffinityPriority, 1), - factory.RegisterPriorityFunction("TaintTolerationPriority", priorities.ComputeTaintTolerationPriority, 1), + factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1), + factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1), // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods. factory.RegisterPriorityConfigFactory( diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 60694ac5551..0e3d53713a4 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -255,7 +255,7 @@ func PrioritizeNodes( errs = append(errs, err) } - meta := priorities.PriorityMetadata(pod, nodes) + meta := priorities.PriorityMetadata(pod) results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs)) for range priorityConfigs { results = append(results, nil) @@ -298,7 +298,7 @@ func PrioritizeNodes( wg.Add(1) go func(index int, config algorithm.PriorityConfig) { defer wg.Done() - if err := config.Reduce(results[index]); err != nil { + if err := config.Reduce(pod, results[index]); err != nil { appendError(err) } }(i, priorityConfig)