Merge pull request #28952 from wojtek-t/optimize_pod_affinity

Automatic merge from submit-queue

First step of optimizing PodAffinity priority function

Ref #26144 

This is obviously only a first step - I will continue working on this code. However, this is changing the general scheme of computations to what is described in: https://github.com/kubernetes/kubernetes/issues/26144#issuecomment-232612384
This commit is contained in:
k8s-merge-robot 2016-07-15 08:59:17 -07:00 committed by GitHub
commit 8b2d994153
4 changed files with 212 additions and 208 deletions

View File

@ -853,10 +853,11 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in
// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) {
for _, ep := range allPods {
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm,
func(ep *api.Pod) (*api.Node, error) { return checker.info.GetNodeInfo(ep.Spec.NodeName) },
func(pod *api.Pod) (*api.Node, error) { return node, nil },
)
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil {
return false, err
}
match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, epNode, node, pod, podAffinityTerm)
if err != nil || match {
return match, err
}

View File

@ -19,6 +19,7 @@ package priorities
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -35,7 +36,12 @@ type InterPodAffinity struct {
failureDomains priorityutil.Topologies
}
func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.NodeLister, podLister algorithm.PodLister, hardPodAffinityWeight int, failureDomains []string) algorithm.PriorityFunction {
func NewInterPodAffinityPriority(
info predicates.NodeInfo,
nodeLister algorithm.NodeLister,
podLister algorithm.PodLister,
hardPodAffinityWeight int,
failureDomains []string) algorithm.PriorityFunction {
interPodAffinity := &InterPodAffinity{
info: info,
nodeLister: nodeLister,
@ -46,36 +52,19 @@ func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.
return interPodAffinity.CalculateInterPodAffinityPriority
}
// countPodsThatMatchPodAffinityTerm counts the number of given pods that match the podAffinityTerm.
func (ipa *InterPodAffinity) CountPodsThatMatchPodAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (int, error) {
matchedCount := 0
for _, ep := range podsForMatching {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm,
func(ep *api.Pod) (*api.Node, error) {
return ipa.info.GetNodeInfo(ep.Spec.NodeName)
},
func(pod *api.Pod) (*api.Node, error) {
return node, nil
},
)
if err != nil {
return 0, err
}
if match {
matchedCount++
}
// TODO: Share it with predicates by moving to better location.
// TODO: Can we avoid error handling here - this is only a matter of non-parsable selector?
func podMatchesNamespaceAndSelector(pod *api.Pod, affinityPod *api.Pod, term *api.PodAffinityTerm) (bool, error) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(affinityPod, *term)
if len(namespaces) != 0 && !namespaces.Has(pod.Namespace) {
return false, nil
}
return matchedCount, nil
}
// CountWeightByPodMatchAffinityTerm counts the weight to topologyCounts for all the given pods that match the podAffinityTerm.
func (ipa *InterPodAffinity) CountWeightByPodMatchAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, weight int, podAffinityTerm api.PodAffinityTerm, node *api.Node) (int, error) {
if weight == 0 {
return 0, nil
selector, err := unversioned.LabelSelectorAsSelector(term.LabelSelector)
if err != nil || !selector.Matches(labels.Set(pod.Labels)) {
return false, err
}
// get the pods which are there in that particular node
podsMatchedCount, err := ipa.CountPodsThatMatchPodAffinityTerm(pod, podsForMatching, node, podAffinityTerm)
return weight * podsMatchedCount, err
return true, nil
}
// compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
@ -98,99 +87,100 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod
}
// convert the topology key based weights to the node name based weights
var maxCount int
var minCount int
counts := map[string]int{}
for _, node := range nodes {
totalCount := 0
// count weights for the weighted pod affinity
var maxCount float64
var minCount float64
// counts store the mapping from node name to so-far computed score of
// the node.
counts := make(map[string]float64, len(nodes))
processTerm := func(term *api.PodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, weight float64) error {
match, err := podMatchesNamespaceAndSelector(podToCheck, affinityPod, term)
if err != nil {
return err
}
if match {
for _, node := range nodes {
if ipa.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) {
counts[node.Name] += weight
}
}
}
return nil
}
processTerms := func(terms []api.WeightedPodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, multiplier int) error {
for _, weightedTerm := range terms {
if err := processTerm(&weightedTerm.PodAffinityTerm, affinityPod, podToCheck, fixedNode, float64(weightedTerm.Weight*multiplier)); err != nil {
return err
}
}
return nil
}
for _, existingPod := range allPods {
existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return nil, err
}
existingPodAffinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
return nil, err
}
if affinity.PodAffinity != nil {
for _, weightedTerm := range affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, weightedTerm.Weight, weightedTerm.PodAffinityTerm, node)
if err != nil {
return nil, err
}
totalCount += weightedCount
}
}
// count weights for the weighted pod anti-affinity
if affinity.PodAntiAffinity != nil {
for _, weightedTerm := range affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, (0 - weightedTerm.Weight), weightedTerm.PodAffinityTerm, node)
if err != nil {
return nil, err
}
totalCount += weightedCount
}
}
// reverse direction checking: count weights for the inter-pod affinity/anti-affinity rules
// that are indicated by existing pods on the node.
for _, ep := range allPods {
epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations)
if err != nil {
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return nil, err
}
if epAffinity.PodAffinity != nil {
// count the implicit weight for the hard pod affinity indicated by the existing pod.
if ipa.hardPodAffinityWeight > 0 {
var podAffinityTerms []api.PodAffinityTerm
if len(epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
podAffinityTerms = epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// podAffinityTerms = append(podAffinityTerms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, epAffinityTerm := range podAffinityTerms {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount += ipa.hardPodAffinityWeight
}
}
}
// count weight for the weighted pod affinity indicated by the existing pod.
for _, epWeightedTerm := range epAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount += epWeightedTerm.Weight
}
}
}
// count weight for the weighted pod anti-affinity indicated by the existing pod.
if epAffinity.PodAntiAffinity != nil {
for _, epWeightedTerm := range epAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm,
func(pod *api.Pod) (*api.Node, error) { return node, nil },
func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) },
)
if err != nil {
return nil, err
}
if match {
totalCount -= epWeightedTerm.Weight
}
}
}
if affinity.PodAntiAffinity != nil {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return nil, err
}
}
counts[node.Name] = totalCount
if existingPodAffinity.PodAffinity != nil {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if ipa.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
if err := processTerm(&term, existingPod, pod, existingPodNode, float64(ipa.hardPodAffinityWeight)); err != nil {
return nil, err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return nil, err
}
}
if existingPodAffinity.PodAntiAffinity != nil {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return nil, err
}
}
}
for _, node := range nodes {
if counts[node.Name] > maxCount {
maxCount = counts[node.Name]
}
@ -200,17 +190,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod
}
// calculate final priority score for each node
result := []schedulerapi.HostPriority{}
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for _, node := range nodes {
fScore := float64(0)
if (maxCount - minCount) > 0 {
fScore = 10 * (float64(counts[node.Name]-minCount) / float64(maxCount-minCount))
fScore = 10 * ((counts[node.Name] - minCount) / (maxCount - minCount))
}
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
glog.V(10).Infof(
"%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
}
return result, nil
}

View File

@ -18,9 +18,6 @@ package util
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
// For each of these resources, a pod that doesn't request the resource explicitly
@ -53,83 +50,3 @@ func GetNonzeroRequests(requests *api.ResourceList) (int64, int64) {
}
return outMilliCPU, outMemory
}
// FilterPodsByNameSpaces filters the pods based the given list of namespaces,
// empty set of namespaces means all namespaces.
func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod {
if len(pods) == 0 || len(names) == 0 {
return pods
}
result := []*api.Pod{}
for _, pod := range pods {
if names.Has(pod.Namespace) {
result = append(result, pod)
}
}
return result
}
// GetNamespacesFromPodAffinityTerm returns a set of names
// according to the namespaces indicated in podAffinityTerm.
// if the NameSpaces is nil considers the given pod's namespace
// if the Namespaces is empty list then considers all the namespaces
func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String {
names := sets.String{}
if podAffinityTerm.Namespaces == nil {
names.Insert(pod.Namespace)
} else if len(podAffinityTerm.Namespaces) != 0 {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// NodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key.
func NodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool {
return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey]
}
type Topologies struct {
DefaultKeys []string
}
// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value.
func (tps *Topologies) NodesHaveSameTopologyKey(nodeA *api.Node, nodeB *api.Node, topologyKey string) bool {
if len(topologyKey) == 0 {
// assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation)
for _, defaultKey := range tps.DefaultKeys {
if NodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) {
return true
}
}
return false
} else {
return NodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey)
}
}
type getNodeFunc func(*api.Pod) (*api.Node, error)
// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA
func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm, getNodeA, getNodeB getNodeFunc) (bool, error) {
names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm)
if len(names) != 0 && !names.Has(podA.Namespace) {
return false, nil
}
labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector)
if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) {
return false, err
}
podANode, err := getNodeA(podA)
if err != nil {
return false, err
}
podBNode, err := getNodeB(podB)
if err != nil {
return false, err
}
return tps.NodesHaveSameTopologyKey(podANode, podBNode, podBAffinityTerm.TopologyKey), nil
}

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/sets"
)
// FilterPodsByNameSpaces filters the pods based the given list of namespaces,
// empty set of namespaces means all namespaces.
func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod {
if len(pods) == 0 || len(names) == 0 {
return pods
}
result := []*api.Pod{}
for _, pod := range pods {
if names.Has(pod.Namespace) {
result = append(result, pod)
}
}
return result
}
// GetNamespacesFromPodAffinityTerm returns a set of names
// according to the namespaces indicated in podAffinityTerm.
// if the NameSpaces is nil considers the given pod's namespace
// if the Namespaces is empty list then considers all the namespaces
func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String {
names := sets.String{}
if podAffinityTerm.Namespaces == nil {
names.Insert(pod.Namespace)
} else if len(podAffinityTerm.Namespaces) != 0 {
names.Insert(podAffinityTerm.Namespaces...)
}
return names
}
// nodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key.
func nodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool {
return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey]
}
type Topologies struct {
DefaultKeys []string
}
// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key.
// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value.
func (tps *Topologies) NodesHaveSameTopologyKey(nodeA, nodeB *api.Node, topologyKey string) bool {
if len(topologyKey) == 0 {
// assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation)
for _, defaultKey := range tps.DefaultKeys {
if nodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) {
return true
}
}
return false
} else {
return nodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey)
}
}
// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA
// TODO: Get rid this method. We should avoid computing Namespaces and selectors multiple times
// and check them on higher levels and then use NodesHaveSameTopologyKey method.
func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, nodeA, nodeB *api.Node, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm) (bool, error) {
names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm)
if len(names) != 0 && !names.Has(podA.Namespace) {
return false, nil
}
labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector)
if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) {
return false, err
}
return tps.NodesHaveSameTopologyKey(nodeA, nodeB, podBAffinityTerm.TopologyKey), nil
}