Cleanup and prepare for optimizing PodAffinity priority function.

This commit is contained in:
Wojciech Tyczynski 2016-07-14 09:51:31 +02:00
parent f27a8034fd
commit a538045d7b
4 changed files with 212 additions and 208 deletions

View File

@ -853,10 +853,11 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in
// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm. // AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) { func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) {
for _, ep := range allPods { for _, ep := range allPods {
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm, epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
func(ep *api.Pod) (*api.Node, error) { return checker.info.GetNodeInfo(ep.Spec.NodeName) }, if err != nil {
func(pod *api.Pod) (*api.Node, error) { return node, nil }, return false, err
) }
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, epNode, node, pod, podAffinityTerm)
if err != nil || match { if err != nil || match {
return match, err return match, err
} }

View File

@ -19,6 +19,7 @@ package priorities
import ( import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -35,7 +36,12 @@ type InterPodAffinity struct {
failureDomains priorityutil.Topologies failureDomains priorityutil.Topologies
} }
func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.NodeLister, podLister algorithm.PodLister, hardPodAffinityWeight int, failureDomains []string) algorithm.PriorityFunction { func NewInterPodAffinityPriority(
info predicates.NodeInfo,
nodeLister algorithm.NodeLister,
podLister algorithm.PodLister,
hardPodAffinityWeight int,
failureDomains []string) algorithm.PriorityFunction {
interPodAffinity := &InterPodAffinity{ interPodAffinity := &InterPodAffinity{
info: info, info: info,
nodeLister: nodeLister, nodeLister: nodeLister,
@ -46,36 +52,19 @@ func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.
return interPodAffinity.CalculateInterPodAffinityPriority return interPodAffinity.CalculateInterPodAffinityPriority
} }
// countPodsThatMatchPodAffinityTerm counts the number of given pods that match the podAffinityTerm. // TODO: Share it with predicates by moving to better location.
func (ipa *InterPodAffinity) CountPodsThatMatchPodAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (int, error) { // TODO: Can we avoid error handling here - this is only a matter of non-parsable selector?
matchedCount := 0 func podMatchesNamespaceAndSelector(pod *api.Pod, affinityPod *api.Pod, term *api.PodAffinityTerm) (bool, error) {
for _, ep := range podsForMatching { namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(affinityPod, *term)
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm, if len(namespaces) != 0 && !namespaces.Has(pod.Namespace) {
func(ep *api.Pod) (*api.Node, error) { return false, nil
return ipa.info.GetNodeInfo(ep.Spec.NodeName)
},
func(pod *api.Pod) (*api.Node, error) {
return node, nil
},
)
if err != nil {
return 0, err
}
if match {
matchedCount++
}
} }
return matchedCount, nil
}
// CountWeightByPodMatchAffinityTerm counts the weight to topologyCounts for all the given pods that match the podAffinityTerm. selector, err := unversioned.LabelSelectorAsSelector(term.LabelSelector)
func (ipa *InterPodAffinity) CountWeightByPodMatchAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, weight int, podAffinityTerm api.PodAffinityTerm, node *api.Node) (int, error) { if err != nil || !selector.Matches(labels.Set(pod.Labels)) {
if weight == 0 { return false, err
return 0, nil
} }
// get the pods which are there in that particular node return true, nil
podsMatchedCount, err := ipa.CountPodsThatMatchPodAffinityTerm(pod, podsForMatching, node, podAffinityTerm)
return weight * podsMatchedCount, err
} }
// compute a sum by iterating through the elements of weightedPodAffinityTerm and adding // compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
@ -98,99 +87,100 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod
} }
// convert the topology key based weights to the node name based weights // convert the topology key based weights to the node name based weights
var maxCount int var maxCount float64
var minCount int var minCount float64
counts := map[string]int{} // counts store the mapping from node name to so-far computed score of
for _, node := range nodes { // the node.
totalCount := 0 counts := make(map[string]float64, len(nodes))
// count weights for the weighted pod affinity
processTerm := func(term *api.PodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, weight float64) error {
match, err := podMatchesNamespaceAndSelector(podToCheck, affinityPod, term)
if err != nil {
return err
}
if match {
for _, node := range nodes {
if ipa.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) {
counts[node.Name] += weight
}
}
}
return nil
}
processTerms := func(terms []api.WeightedPodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, multiplier int) error {
for _, weightedTerm := range terms {
if err := processTerm(&weightedTerm.PodAffinityTerm, affinityPod, podToCheck, fixedNode, float64(weightedTerm.Weight*multiplier)); err != nil {
return err
}
}
return nil
}
for _, existingPod := range allPods {
existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return nil, err
}
existingPodAffinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
return nil, err
}
if affinity.PodAffinity != nil { if affinity.PodAffinity != nil {
for _, weightedTerm := range affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { // For every soft pod affinity term of <pod>, if <existingPod> matches the term,
weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, weightedTerm.Weight, weightedTerm.PodAffinityTerm, node) // increment <counts> for every node in the cluster with the same <term.TopologyKey>
if err != nil { // value as that of <existingPods>`s node by the term`s weight.
return nil, err terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
} if err := processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
totalCount += weightedCount
}
}
// count weights for the weighted pod anti-affinity
if affinity.PodAntiAffinity != nil {
for _, weightedTerm := range affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, (0 - weightedTerm.Weight), weightedTerm.PodAffinityTerm, node)
if err != nil {
return nil, err
}
totalCount += weightedCount
}
}
// reverse direction checking: count weights for the inter-pod affinity/anti-affinity rules
// that are indicated by existing pods on the node.
for _, ep := range allPods {
epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations)
if err != nil {
return nil, err return nil, err
} }
}
if epAffinity.PodAffinity != nil { if affinity.PodAntiAffinity != nil {
// count the implicit weight for the hard pod affinity indicated by the existing pod. // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
if ipa.hardPodAffinityWeight > 0 { // decrement <counts> for every node in the cluster with the same <term.TopologyKey>
var podAffinityTerms []api.PodAffinityTerm // value as that of <existingPod>`s node by the term`s weight.
if len(epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
podAffinityTerms = epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution if err := processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
} return nil, err
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAffinityTerms = append(podAffinityTerms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, epAffinityTerm := range podAffinityTerms {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount += ipa.hardPodAffinityWeight
}
}
}
// count weight for the weighted pod affinity indicated by the existing pod.
for _, epWeightedTerm := range epAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount += epWeightedTerm.Weight
}
}
}
// count weight for the weighted pod anti-affinity indicated by the existing pod.
if epAffinity.PodAntiAffinity != nil {
for _, epWeightedTerm := range epAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount -= epWeightedTerm.Weight
}
}
} }
} }
counts[node.Name] = totalCount if existingPodAffinity.PodAffinity != nil {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if ipa.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
if err := processTerm(&term, existingPod, pod, existingPodNode, float64(ipa.hardPodAffinityWeight)); err != nil {
return nil, err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return nil, err
}
}
if existingPodAffinity.PodAntiAffinity != nil {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return nil, err
}
}
}
for _, node := range nodes {
if counts[node.Name] > maxCount { if counts[node.Name] > maxCount {
maxCount = counts[node.Name] maxCount = counts[node.Name]
} }
@ -200,17 +190,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod
} }
// calculate final priority score for each node // calculate final priority score for each node
result := []schedulerapi.HostPriority{} result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for _, node := range nodes { for _, node := range nodes {
fScore := float64(0) fScore := float64(0)
if (maxCount - minCount) > 0 { if (maxCount - minCount) > 0 {
fScore = 10 * (float64(counts[node.Name]-minCount) / float64(maxCount-minCount)) fScore = 10 * ((counts[node.Name] - minCount) / (maxCount - minCount))
} }
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
glog.V(10).Infof( if glog.V(10) {
"%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore), // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
) // not logged. There is visible performance gain from it.
glog.V(10).Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
} }
return result, nil return result, nil
} }

View File

@ -18,9 +18,6 @@ package util
import ( import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
) )
// For each of these resources, a pod that doesn't request the resource explicitly // For each of these resources, a pod that doesn't request the resource explicitly
@ -53,83 +50,3 @@ func GetNonzeroRequests(requests *api.ResourceList) (int64, int64) {
} }
return outMilliCPU, outMemory return outMilliCPU, outMemory
} }
// FilterPodsByNameSpaces filters the pods based the given list of namespaces,
// empty set of namespaces means all namespaces.
func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod {
if len(pods) == 0 || len(names) == 0 {
return pods
}
result := []*api.Pod{}
for _, pod := range pods {
if names.Has(pod.Namespace) {
result = append(result, pod)
}
}
return result
}
// GetNamespacesFromPodAffinityTerm returns a set of names
// according to the namespaces indicated in podAffinityTerm.
// if the NameSpaces is nil considers the given pod's namespace
// if the Namespaces is empty list then considers all the namespaces
func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String {
names := sets.String{}
if podAffinityTerm.Namespaces == nil {
names.Insert(pod.Namespace)
} else if len(podAffinityTerm.Namespaces) != 0 {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// NodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key.
func NodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool {
return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey]
}
type Topologies struct {
DefaultKeys []string
}
// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value.
func (tps *Topologies) NodesHaveSameTopologyKey(nodeA *api.Node, nodeB *api.Node, topologyKey string) bool {
if len(topologyKey) == 0 {
// assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation)
for _, defaultKey := range tps.DefaultKeys {
if NodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) {
return true
}
}
return false
} else {
return NodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey)
}
}
type getNodeFunc func(*api.Pod) (*api.Node, error)
// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA
func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm, getNodeA, getNodeB getNodeFunc) (bool, error) {
names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm)
if len(names) != 0 && !names.Has(podA.Namespace) {
return false, nil
}
labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector)
if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) {
return false, err
}
podANode, err := getNodeA(podA)
if err != nil {
return false, err
}
podBNode, err := getNodeB(podB)
if err != nil {
return false, err
}
return tps.NodesHaveSameTopologyKey(podANode, podBNode, podBAffinityTerm.TopologyKey), nil
}

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
// FilterPodsByNameSpaces filters the pods based the given list of namespaces,
// empty set of namespaces means all namespaces.
func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod {
if len(pods) == 0 || len(names) == 0 {
return pods
}
result := []*api.Pod{}
for _, pod := range pods {
if names.Has(pod.Namespace) {
result = append(result, pod)
}
}
return result
}
// GetNamespacesFromPodAffinityTerm returns a set of names
// according to the namespaces indicated in podAffinityTerm.
// if the NameSpaces is nil considers the given pod's namespace
// if the Namespaces is empty list then considers all the namespaces
func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String {
names := sets.String{}
if podAffinityTerm.Namespaces == nil {
names.Insert(pod.Namespace)
} else if len(podAffinityTerm.Namespaces) != 0 {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// nodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key.
func nodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool {
return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey]
}
type Topologies struct {
DefaultKeys []string
}
// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value.
func (tps *Topologies) NodesHaveSameTopologyKey(nodeA, nodeB *api.Node, topologyKey string) bool {
if len(topologyKey) == 0 {
// assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation)
for _, defaultKey := range tps.DefaultKeys {
if nodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) {
return true
}
}
return false
} else {
return nodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey)
}
}
// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA
// TODO: Get rid this method. We should avoid computing Namespaces and selectors multiple times
// and check them on higher levels and then use NodesHaveSameTopologyKey method.
func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, nodeA, nodeB *api.Node, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm) (bool, error) {
names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm)
if len(names) != 0 && !names.Has(podA.Namespace) {
return false, nil
}
labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector)
if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) {
return false, err
}
return tps.NodesHaveSameTopologyKey(nodeA, nodeB, podBAffinityTerm.TopologyKey), nil
}