From ea943d825ebf5710030b82d75e8ba6dd0596ddaa Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 9 Sep 2016 14:42:10 +0200 Subject: [PATCH] Migrate a bunch of priority functions to map-reduce framework --- .../algorithm/priorities/priorities.go | 167 ++++++++---------- .../algorithm/priorities/priorities_test.go | 19 +- .../algorithmprovider/defaults/defaults.go | 8 +- plugin/pkg/scheduler/factory/plugins.go | 2 +- .../pkg/scheduler/generic_scheduler_test.go | 2 +- 5 files changed, 87 insertions(+), 111 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index 6d28c868178..fd720d4cdc6 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -125,8 +125,12 @@ func calculateUnusedPriority(pod *api.Pod, podRequests *schedulercache.Resource, // Calculate the resource used on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. -// TODO: Use Node() from nodeInfo instead of passing it. -func calculateUsedPriority(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { +func calculateUsedPriority(pod *api.Pod, podRequests *schedulercache.Resource, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + allocatableResources := nodeInfo.AllocatableResource() totalResources := *podRequests totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU @@ -149,7 +153,7 @@ func calculateUsedPriority(pod *api.Pod, podRequests *schedulercache.Resource, n return schedulerapi.HostPriority{ Host: node.Name, Score: int((cpuScore + memoryScore) / 2), - } + }, nil } // LeastRequestedPriority is a priority function that favors nodes with fewer requested resources. @@ -171,13 +175,15 @@ func LeastRequestedPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedul // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes // based on the maximum of the average of the fraction of requested to capacity. // Details: (cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2 -func MostRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - podResources := getNonZeroRequests(pod) - list := make(schedulerapi.HostPriorityList, 0, len(nodes)) - for _, node := range nodes { - list = append(list, calculateUsedPriority(pod, podResources, node, nodeNameToInfo[node.Name])) +func MostRequestedPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + var nonZeroRequest *schedulercache.Resource + if priorityMeta, ok := meta.(*priorityMetadata); ok { + nonZeroRequest = priorityMeta.nonZeroRequest + } else { + // We couldn't parse metadatat - fallback to computing it. + nonZeroRequest = getNonZeroRequests(pod) } - return list, nil + return calculateUsedPriority(pod, nonZeroRequest, nodeInfo) } type NodeLabelPrioritizer struct { @@ -185,37 +191,32 @@ type NodeLabelPrioritizer struct { presence bool } -func NewNodeLabelPriority(label string, presence bool) algorithm.PriorityFunction { +func NewNodeLabelPriority(label string, presence bool) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { labelPrioritizer := &NodeLabelPrioritizer{ label: label, presence: presence, } - return labelPrioritizer.CalculateNodeLabelPriority + return labelPrioritizer.CalculateNodeLabelPriorityMap, nil } // CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value. // If presence is true, prioritizes nodes that have the specified label, regardless of value. // If presence is false, prioritizes nodes that do not have the specified label. -func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - var score int - labeledNodes := map[string]bool{} - for _, node := range nodes { - exists := labels.Set(node.Labels).Has(n.label) - labeledNodes[node.Name] = (exists && n.presence) || (!exists && !n.presence) +func (n *NodeLabelPrioritizer) CalculateNodeLabelPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") } - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - //score int - scale of 0-10 - // 0 being the lowest priority and 10 being the highest - for nodeName, success := range labeledNodes { - if success { - score = 10 - } else { - score = 0 - } - result = append(result, schedulerapi.HostPriority{Host: nodeName, Score: score}) + exists := labels.Set(node.Labels).Has(n.label) + score := 0 + if (exists && n.presence) || (!exists && !n.presence) { + score = 10 } - return result, nil + return schedulerapi.HostPriority{ + Host: node.Name, + Score: score, + }, nil } // This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range. @@ -230,25 +231,20 @@ const ( // based on the total size of those images. // - If none of the images are present, this node will be given the lowest priority. // - If some of the images are present on a node, the larger their sizes' sum, the higher the node's priority. -func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - sumSizeMap := make(map[string]int64) - for i := range pod.Spec.Containers { - for _, node := range nodes { - // Check if this container's image is present and get its size. - imageSize := checkContainerImageOnNode(node, &pod.Spec.Containers[i]) - // Add this size to the total result of this node. - sumSizeMap[node.Name] += imageSize - } +func ImageLocalityPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") } - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - // score int - scale of 0-10 - // 0 being the lowest priority and 10 being the highest. - for nodeName, sumSize := range sumSizeMap { - result = append(result, schedulerapi.HostPriority{Host: nodeName, - Score: calculateScoreFromSize(sumSize)}) + var sumSize int64 + for i := range pod.Spec.Containers { + sumSize += checkContainerImageOnNode(node, &pod.Spec.Containers[i]) } - return result, nil + return schedulerapi.HostPriority{ + Host: node.Name, + Score: calculateScoreFromSize(sumSize), + }, nil } // checkContainerImageOnNode checks if a container image is present on a node and returns its size. @@ -290,17 +286,23 @@ func calculateScoreFromSize(sumSize int64) int { // close the two metrics are to each other. // Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by: // "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization" -func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { - podResources := getNonZeroRequests(pod) - list := make(schedulerapi.HostPriorityList, 0, len(nodes)) - for _, node := range nodes { - list = append(list, calculateBalancedResourceAllocation(pod, podResources, node, nodeNameToInfo[node.Name])) +func BalancedResourceAllocationMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + var nonZeroRequest *schedulercache.Resource + if priorityMeta, ok := meta.(*priorityMetadata); ok { + nonZeroRequest = priorityMeta.nonZeroRequest + } else { + // We couldn't parse metadatat - fallback to computing it. + nonZeroRequest = getNonZeroRequests(pod) } - return list, nil + return calculateBalancedResourceAllocation(pod, nonZeroRequest, nodeInfo) } -// TODO: Use Node() from nodeInfo instead of passing it. -func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { +func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercache.Resource, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + allocatableResources := nodeInfo.AllocatableResource() totalResources := *podRequests totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU @@ -335,7 +337,7 @@ func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercac return schedulerapi.HostPriority{ Host: node.Name, Score: score, - } + }, nil } func fractionOfCapacity(requested, capacity int64) float64 { @@ -345,7 +347,12 @@ func fractionOfCapacity(requested, capacity int64) float64 { return float64(requested) / float64(capacity) } -func CalculateNodePreferAvoidPodsPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { +func CalculateNodePreferAvoidPodsPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + controllerRef := priorityutil.GetControllerRef(pod) if controllerRef != nil { // Ignore pods that are owned by other controller than ReplicationController @@ -355,49 +362,21 @@ func CalculateNodePreferAvoidPodsPriority(pod *api.Pod, nodeNameToInfo map[strin } } if controllerRef == nil { - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - for _, node := range nodes { - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 10}) - } - return result, nil + return schedulerapi.HostPriority{Host: node.Name, Score: 10}, nil } - avoidNodes := make(map[string]bool, len(nodes)) - avoidNode := false - for _, node := range nodes { - avoids, err := api.GetAvoidPodsFromNodeAnnotations(node.Annotations) - if err != nil { - continue - } - - avoidNode = false - for i := range avoids.PreferAvoidPods { - avoid := &avoids.PreferAvoidPods[i] - if controllerRef != nil { - if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID { - avoidNode = true - } - } - if avoidNode { - // false is default value, so we don't even need to set it - // to avoid unnecessary map operations. - avoidNodes[node.Name] = true - break + avoids, err := api.GetAvoidPodsFromNodeAnnotations(node.Annotations) + if err != nil { + // If we cannot get annotation, assume it's schedulable there. + return schedulerapi.HostPriority{Host: node.Name, Score: 10}, nil + } + for i := range avoids.PreferAvoidPods { + avoid := &avoids.PreferAvoidPods[i] + if controllerRef != nil { + if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID { + return schedulerapi.HostPriority{Host: node.Name, Score: 0}, nil } } } - - var score int - result := make(schedulerapi.HostPriorityList, 0, len(nodes)) - //score int - scale of 0-10 - // 0 being the lowest priority and 10 being the highest - for _, node := range nodes { - if avoidNodes[node.Name] { - score = 0 - } else { - score = 10 - } - result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: score}) - } - return result, nil + return schedulerapi.HostPriority{Host: node.Name, Score: 10}, nil } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 0b6f0b5702a..e534246952a 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -295,8 +295,7 @@ func TestLeastRequested(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - lrp := priorityFunction(LeastRequestedPriorityMap, nil) - list, err := lrp(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(LeastRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -451,7 +450,7 @@ func TestMostRequested(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := MostRequestedPriority(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(MostRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -541,11 +540,8 @@ func TestNewNodeLabelPriority(t *testing.T) { } for _, test := range tests { - prioritizer := NodeLabelPrioritizer{ - label: test.label, - presence: test.presence, - } - list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string]*schedulercache.NodeInfo{}, test.nodes) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) + list, err := priorityFunction(NewNodeLabelPriority(test.label, test.presence))(nil, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -784,7 +780,7 @@ func TestBalancedResourceAllocation(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(BalancedResourceAllocationMap, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -928,7 +924,7 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := ImageLocalityPriority(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(ImageLocalityPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1152,7 +1148,8 @@ func TestNodePreferAvoidPriority(t *testing.T) { } for _, test := range tests { - list, err := CalculateNodePreferAvoidPodsPriority(test.pod, map[string]*schedulercache.NodeInfo{}, test.nodes) + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) + list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index daa20248501..7c5707ed0aa 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -82,7 +82,7 @@ func init() { // ImageLocalityPriority prioritizes nodes based on locality of images requested by a pod. Nodes with larger size // of already-installed packages required by the pod will be preferred over nodes with no already-installed // packages required by the pod or a small total size of already-installed packages required by the pod. - factory.RegisterPriorityFunction("ImageLocalityPriority", priorities.ImageLocalityPriority, 1) + factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1) // Fit is defined based on the absence of port conflicts. // This predicate is actually a default predicate, because it is invoked from // predicates.GeneralPredicates() @@ -98,7 +98,7 @@ func init() { // Fit is determined by node selector query. factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches) // Optional, cluster-autoscaler friendly priority function - give used nodes higher priority. - factory.RegisterPriorityFunction("MostRequestedPriority", priorities.MostRequestedPriority, 1) + factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1) } func replace(set sets.String, replaceWhat, replaceWith string) sets.String { @@ -167,7 +167,7 @@ func defaultPriorities() sets.String { // Prioritize nodes by least requested utilization. factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1), // Prioritizes nodes to help achieve balanced resource usage - factory.RegisterPriorityFunction("BalancedResourceAllocation", priorities.BalancedResourceAllocation, 1), + factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1), // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node. factory.RegisterPriorityConfigFactory( "SelectorSpreadPriority", @@ -180,7 +180,7 @@ func defaultPriorities() sets.String { ), // Set this weight large enough to override all other priority functions. // TODO: Figure out a better way to do this, maybe at same time as fixing #24720. - factory.RegisterPriorityFunction("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriority, 10000), + factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000), factory.RegisterPriorityFunction("NodeAffinityPriority", priorities.CalculateNodeAffinityPriority, 1), factory.RegisterPriorityFunction("TaintTolerationPriority", priorities.ComputeTaintTolerationPriority, 1), // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index fa100e8448a..71ecd0f156f 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -206,7 +206,7 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { } } else if policy.Argument.LabelPreference != nil { pcf = &PriorityConfigFactory{ - Function: func(args PluginFactoryArgs) algorithm.PriorityFunction { + MapReduceFunction: func(args PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) { return priorities.NewNodeLabelPriority( policy.Argument.LabelPreference.Label, policy.Argument.LabelPreference.Presence, diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 6bae311e546..46d34176f94 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -489,7 +489,7 @@ func TestZeroRequest(t *testing.T) { // to test what's actually in production. priorityConfigs := []algorithm.PriorityConfig{ {Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1}, - {Function: algorithmpriorities.BalancedResourceAllocation, Weight: 1}, + {Map: algorithmpriorities.BalancedResourceAllocationMap, Weight: 1}, { Function: algorithmpriorities.NewSelectorSpreadPriority( algorithm.FakePodLister(test.pods),