Migrate a bunch of priority functions to map-reduce framework

This commit is contained in:
Wojciech Tyczynski 2016-09-09 14:42:10 +02:00
parent 6ffd30c2df
commit ea943d825e
5 changed files with 87 additions and 111 deletions

View File

@ -125,8 +125,12 @@ func calculateUnusedPriority(pod *api.Pod, podRequests *schedulercache.Resource,
// Calculate the resource used on a node. 'node' has information about the resources on the node. // Calculate the resource used on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node. // 'pods' is a list of pods currently scheduled on the node.
// TODO: Use Node() from nodeInfo instead of passing it. func calculateUsedPriority(pod *api.Pod, podRequests *schedulercache.Resource, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
func calculateUsedPriority(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
allocatableResources := nodeInfo.AllocatableResource() allocatableResources := nodeInfo.AllocatableResource()
totalResources := *podRequests totalResources := *podRequests
totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
@ -149,7 +153,7 @@ func calculateUsedPriority(pod *api.Pod, podRequests *schedulercache.Resource, n
return schedulerapi.HostPriority{ return schedulerapi.HostPriority{
Host: node.Name, Host: node.Name,
Score: int((cpuScore + memoryScore) / 2), Score: int((cpuScore + memoryScore) / 2),
} }, nil
} }
// LeastRequestedPriority is a priority function that favors nodes with fewer requested resources. // LeastRequestedPriority is a priority function that favors nodes with fewer requested resources.
@ -171,13 +175,15 @@ func LeastRequestedPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedul
// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes
// based on the maximum of the average of the fraction of requested to capacity. // based on the maximum of the average of the fraction of requested to capacity.
// Details: (cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2 // Details: (cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2
func MostRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { func MostRequestedPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
podResources := getNonZeroRequests(pod) var nonZeroRequest *schedulercache.Resource
list := make(schedulerapi.HostPriorityList, 0, len(nodes)) if priorityMeta, ok := meta.(*priorityMetadata); ok {
for _, node := range nodes { nonZeroRequest = priorityMeta.nonZeroRequest
list = append(list, calculateUsedPriority(pod, podResources, node, nodeNameToInfo[node.Name])) } else {
// We couldn't parse metadatat - fallback to computing it.
nonZeroRequest = getNonZeroRequests(pod)
} }
return list, nil return calculateUsedPriority(pod, nonZeroRequest, nodeInfo)
} }
type NodeLabelPrioritizer struct { type NodeLabelPrioritizer struct {
@ -185,37 +191,32 @@ type NodeLabelPrioritizer struct {
presence bool presence bool
} }
func NewNodeLabelPriority(label string, presence bool) algorithm.PriorityFunction { func NewNodeLabelPriority(label string, presence bool) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
labelPrioritizer := &NodeLabelPrioritizer{ labelPrioritizer := &NodeLabelPrioritizer{
label: label, label: label,
presence: presence, presence: presence,
} }
return labelPrioritizer.CalculateNodeLabelPriority return labelPrioritizer.CalculateNodeLabelPriorityMap, nil
} }
// CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value. // CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value.
// If presence is true, prioritizes nodes that have the specified label, regardless of value. // If presence is true, prioritizes nodes that have the specified label, regardless of value.
// If presence is false, prioritizes nodes that do not have the specified label. // If presence is false, prioritizes nodes that do not have the specified label.
func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { func (n *NodeLabelPrioritizer) CalculateNodeLabelPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
var score int node := nodeInfo.Node()
labeledNodes := map[string]bool{} if node == nil {
for _, node := range nodes { return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
exists := labels.Set(node.Labels).Has(n.label)
labeledNodes[node.Name] = (exists && n.presence) || (!exists && !n.presence)
} }
result := make(schedulerapi.HostPriorityList, 0, len(nodes)) exists := labels.Set(node.Labels).Has(n.label)
//score int - scale of 0-10 score := 0
// 0 being the lowest priority and 10 being the highest if (exists && n.presence) || (!exists && !n.presence) {
for nodeName, success := range labeledNodes {
if success {
score = 10 score = 10
} else {
score = 0
} }
result = append(result, schedulerapi.HostPriority{Host: nodeName, Score: score}) return schedulerapi.HostPriority{
} Host: node.Name,
return result, nil Score: score,
}, nil
} }
// This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range. // This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range.
@ -230,25 +231,20 @@ const (
// based on the total size of those images. // based on the total size of those images.
// - If none of the images are present, this node will be given the lowest priority. // - If none of the images are present, this node will be given the lowest priority.
// - If some of the images are present on a node, the larger their sizes' sum, the higher the node's priority. // - If some of the images are present on a node, the larger their sizes' sum, the higher the node's priority.
func ImageLocalityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { func ImageLocalityPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
sumSizeMap := make(map[string]int64) node := nodeInfo.Node()
for i := range pod.Spec.Containers { if node == nil {
for _, node := range nodes { return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
// Check if this container's image is present and get its size.
imageSize := checkContainerImageOnNode(node, &pod.Spec.Containers[i])
// Add this size to the total result of this node.
sumSizeMap[node.Name] += imageSize
}
} }
result := make(schedulerapi.HostPriorityList, 0, len(nodes)) var sumSize int64
// score int - scale of 0-10 for i := range pod.Spec.Containers {
// 0 being the lowest priority and 10 being the highest. sumSize += checkContainerImageOnNode(node, &pod.Spec.Containers[i])
for nodeName, sumSize := range sumSizeMap {
result = append(result, schedulerapi.HostPriority{Host: nodeName,
Score: calculateScoreFromSize(sumSize)})
} }
return result, nil return schedulerapi.HostPriority{
Host: node.Name,
Score: calculateScoreFromSize(sumSize),
}, nil
} }
// checkContainerImageOnNode checks if a container image is present on a node and returns its size. // checkContainerImageOnNode checks if a container image is present on a node and returns its size.
@ -290,17 +286,23 @@ func calculateScoreFromSize(sumSize int64) int {
// close the two metrics are to each other. // close the two metrics are to each other.
// Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by: // Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by:
// "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization" // "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization"
func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { func BalancedResourceAllocationMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
podResources := getNonZeroRequests(pod) var nonZeroRequest *schedulercache.Resource
list := make(schedulerapi.HostPriorityList, 0, len(nodes)) if priorityMeta, ok := meta.(*priorityMetadata); ok {
for _, node := range nodes { nonZeroRequest = priorityMeta.nonZeroRequest
list = append(list, calculateBalancedResourceAllocation(pod, podResources, node, nodeNameToInfo[node.Name])) } else {
// We couldn't parse metadatat - fallback to computing it.
nonZeroRequest = getNonZeroRequests(pod)
} }
return list, nil return calculateBalancedResourceAllocation(pod, nonZeroRequest, nodeInfo)
}
func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercache.Resource, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
} }
// TODO: Use Node() from nodeInfo instead of passing it.
func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercache.Resource, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
allocatableResources := nodeInfo.AllocatableResource() allocatableResources := nodeInfo.AllocatableResource()
totalResources := *podRequests totalResources := *podRequests
totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU totalResources.MilliCPU += nodeInfo.NonZeroRequest().MilliCPU
@ -335,7 +337,7 @@ func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *schedulercac
return schedulerapi.HostPriority{ return schedulerapi.HostPriority{
Host: node.Name, Host: node.Name,
Score: score, Score: score,
} }, nil
} }
func fractionOfCapacity(requested, capacity int64) float64 { func fractionOfCapacity(requested, capacity int64) float64 {
@ -345,7 +347,12 @@ func fractionOfCapacity(requested, capacity int64) float64 {
return float64(requested) / float64(capacity) return float64(requested) / float64(capacity)
} }
func CalculateNodePreferAvoidPodsPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) { func CalculateNodePreferAvoidPodsPriorityMap(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
controllerRef := priorityutil.GetControllerRef(pod) controllerRef := priorityutil.GetControllerRef(pod)
if controllerRef != nil { if controllerRef != nil {
// Ignore pods that are owned by other controller than ReplicationController // Ignore pods that are owned by other controller than ReplicationController
@ -355,49 +362,21 @@ func CalculateNodePreferAvoidPodsPriority(pod *api.Pod, nodeNameToInfo map[strin
} }
} }
if controllerRef == nil { if controllerRef == nil {
result := make(schedulerapi.HostPriorityList, 0, len(nodes)) return schedulerapi.HostPriority{Host: node.Name, Score: 10}, nil
for _, node := range nodes {
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 10})
}
return result, nil
} }
avoidNodes := make(map[string]bool, len(nodes))
avoidNode := false
for _, node := range nodes {
avoids, err := api.GetAvoidPodsFromNodeAnnotations(node.Annotations) avoids, err := api.GetAvoidPodsFromNodeAnnotations(node.Annotations)
if err != nil { if err != nil {
continue // If we cannot get annotation, assume it's schedulable there.
return schedulerapi.HostPriority{Host: node.Name, Score: 10}, nil
} }
avoidNode = false
for i := range avoids.PreferAvoidPods { for i := range avoids.PreferAvoidPods {
avoid := &avoids.PreferAvoidPods[i] avoid := &avoids.PreferAvoidPods[i]
if controllerRef != nil { if controllerRef != nil {
if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID { if avoid.PodSignature.PodController.Kind == controllerRef.Kind && avoid.PodSignature.PodController.UID == controllerRef.UID {
avoidNode = true return schedulerapi.HostPriority{Host: node.Name, Score: 0}, nil
}
}
if avoidNode {
// false is default value, so we don't even need to set it
// to avoid unnecessary map operations.
avoidNodes[node.Name] = true
break
} }
} }
} }
return schedulerapi.HostPriority{Host: node.Name, Score: 10}, nil
var score int
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for _, node := range nodes {
if avoidNodes[node.Name] {
score = 0
} else {
score = 10
}
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: score})
}
return result, nil
} }

View File

@ -295,8 +295,7 @@ func TestLeastRequested(t *testing.T) {
for _, test := range tests { for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
lrp := priorityFunction(LeastRequestedPriorityMap, nil) list, err := priorityFunction(LeastRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
list, err := lrp(test.pod, nodeNameToInfo, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -451,7 +450,7 @@ func TestMostRequested(t *testing.T) {
for _, test := range tests { for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := MostRequestedPriority(test.pod, nodeNameToInfo, test.nodes) list, err := priorityFunction(MostRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -541,11 +540,8 @@ func TestNewNodeLabelPriority(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
prioritizer := NodeLabelPrioritizer{ nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
label: test.label, list, err := priorityFunction(NewNodeLabelPriority(test.label, test.presence))(nil, nodeNameToInfo, test.nodes)
presence: test.presence,
}
list, err := prioritizer.CalculateNodeLabelPriority(nil, map[string]*schedulercache.NodeInfo{}, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -784,7 +780,7 @@ func TestBalancedResourceAllocation(t *testing.T) {
for _, test := range tests { for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := BalancedResourceAllocation(test.pod, nodeNameToInfo, test.nodes) list, err := priorityFunction(BalancedResourceAllocationMap, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -928,7 +924,7 @@ func TestImageLocalityPriority(t *testing.T) {
for _, test := range tests { for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := ImageLocalityPriority(test.pod, nodeNameToInfo, test.nodes) list, err := priorityFunction(ImageLocalityPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1152,7 +1148,8 @@ func TestNodePreferAvoidPriority(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
list, err := CalculateNodePreferAvoidPodsPriority(test.pod, map[string]*schedulercache.NodeInfo{}, test.nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }

View File

@ -82,7 +82,7 @@ func init() {
// ImageLocalityPriority prioritizes nodes based on locality of images requested by a pod. Nodes with larger size // ImageLocalityPriority prioritizes nodes based on locality of images requested by a pod. Nodes with larger size
// of already-installed packages required by the pod will be preferred over nodes with no already-installed // of already-installed packages required by the pod will be preferred over nodes with no already-installed
// packages required by the pod or a small total size of already-installed packages required by the pod. // packages required by the pod or a small total size of already-installed packages required by the pod.
factory.RegisterPriorityFunction("ImageLocalityPriority", priorities.ImageLocalityPriority, 1) factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1)
// Fit is defined based on the absence of port conflicts. // Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from // This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates() // predicates.GeneralPredicates()
@ -98,7 +98,7 @@ func init() {
// Fit is determined by node selector query. // Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches) factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches)
// Optional, cluster-autoscaler friendly priority function - give used nodes higher priority. // Optional, cluster-autoscaler friendly priority function - give used nodes higher priority.
factory.RegisterPriorityFunction("MostRequestedPriority", priorities.MostRequestedPriority, 1) factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
} }
func replace(set sets.String, replaceWhat, replaceWith string) sets.String { func replace(set sets.String, replaceWhat, replaceWith string) sets.String {
@ -167,7 +167,7 @@ func defaultPriorities() sets.String {
// Prioritize nodes by least requested utilization. // Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1), factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
// Prioritizes nodes to help achieve balanced resource usage // Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction("BalancedResourceAllocation", priorities.BalancedResourceAllocation, 1), factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node. // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory( factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority", "SelectorSpreadPriority",
@ -180,7 +180,7 @@ func defaultPriorities() sets.String {
), ),
// Set this weight large enough to override all other priority functions. // Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720. // TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriority, 10000), factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
factory.RegisterPriorityFunction("NodeAffinityPriority", priorities.CalculateNodeAffinityPriority, 1), factory.RegisterPriorityFunction("NodeAffinityPriority", priorities.CalculateNodeAffinityPriority, 1),
factory.RegisterPriorityFunction("TaintTolerationPriority", priorities.ComputeTaintTolerationPriority, 1), factory.RegisterPriorityFunction("TaintTolerationPriority", priorities.ComputeTaintTolerationPriority, 1),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.) // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)

View File

@ -206,7 +206,7 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
} }
} else if policy.Argument.LabelPreference != nil { } else if policy.Argument.LabelPreference != nil {
pcf = &PriorityConfigFactory{ pcf = &PriorityConfigFactory{
Function: func(args PluginFactoryArgs) algorithm.PriorityFunction { MapReduceFunction: func(args PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewNodeLabelPriority( return priorities.NewNodeLabelPriority(
policy.Argument.LabelPreference.Label, policy.Argument.LabelPreference.Label,
policy.Argument.LabelPreference.Presence, policy.Argument.LabelPreference.Presence,

View File

@ -489,7 +489,7 @@ func TestZeroRequest(t *testing.T) {
// to test what's actually in production. // to test what's actually in production.
priorityConfigs := []algorithm.PriorityConfig{ priorityConfigs := []algorithm.PriorityConfig{
{Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1}, {Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1},
{Function: algorithmpriorities.BalancedResourceAllocation, Weight: 1}, {Map: algorithmpriorities.BalancedResourceAllocationMap, Weight: 1},
{ {
Function: algorithmpriorities.NewSelectorSpreadPriority( Function: algorithmpriorities.NewSelectorSpreadPriority(
algorithm.FakePodLister(test.pods), algorithm.FakePodLister(test.pods),