diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 715a6ce724c..acf7d259dce 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -853,10 +853,11 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in // AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm. func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, error) { for _, ep := range allPods { - match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm, - func(ep *api.Pod) (*api.Node, error) { return checker.info.GetNodeInfo(ep.Spec.NodeName) }, - func(pod *api.Pod) (*api.Node, error) { return node, nil }, - ) + epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName) + if err != nil { + return false, err + } + match, err := checker.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, epNode, node, pod, podAffinityTerm) if err != nil || match { return match, err } diff --git a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 4260d16111e..b506a1b1f0f 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/plugin/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -19,6 +19,7 @@ package priorities import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -35,7 +36,12 @@ type InterPodAffinity struct { failureDomains priorityutil.Topologies } -func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm.NodeLister, podLister algorithm.PodLister, hardPodAffinityWeight int, failureDomains []string) algorithm.PriorityFunction { +func NewInterPodAffinityPriority( + info predicates.NodeInfo, + nodeLister algorithm.NodeLister, + podLister algorithm.PodLister, + hardPodAffinityWeight int, + failureDomains []string) algorithm.PriorityFunction { interPodAffinity := &InterPodAffinity{ info: info, nodeLister: nodeLister, @@ -46,36 +52,19 @@ func NewInterPodAffinityPriority(info predicates.NodeInfo, nodeLister algorithm. return interPodAffinity.CalculateInterPodAffinityPriority } -// countPodsThatMatchPodAffinityTerm counts the number of given pods that match the podAffinityTerm. -func (ipa *InterPodAffinity) CountPodsThatMatchPodAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (int, error) { - matchedCount := 0 - for _, ep := range podsForMatching { - match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(ep, pod, podAffinityTerm, - func(ep *api.Pod) (*api.Node, error) { - return ipa.info.GetNodeInfo(ep.Spec.NodeName) - }, - func(pod *api.Pod) (*api.Node, error) { - return node, nil - }, - ) - if err != nil { - return 0, err - } - if match { - matchedCount++ - } +// TODO: Share it with predicates by moving to better location. +// TODO: Can we avoid error handling here - this is only a matter of non-parsable selector? +func podMatchesNamespaceAndSelector(pod *api.Pod, affinityPod *api.Pod, term *api.PodAffinityTerm) (bool, error) { + namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(affinityPod, *term) + if len(namespaces) != 0 && !namespaces.Has(pod.Namespace) { + return false, nil } - return matchedCount, nil -} -// CountWeightByPodMatchAffinityTerm counts the weight to topologyCounts for all the given pods that match the podAffinityTerm. -func (ipa *InterPodAffinity) CountWeightByPodMatchAffinityTerm(pod *api.Pod, podsForMatching []*api.Pod, weight int, podAffinityTerm api.PodAffinityTerm, node *api.Node) (int, error) { - if weight == 0 { - return 0, nil + selector, err := unversioned.LabelSelectorAsSelector(term.LabelSelector) + if err != nil || !selector.Matches(labels.Set(pod.Labels)) { + return false, err } - // get the pods which are there in that particular node - podsMatchedCount, err := ipa.CountPodsThatMatchPodAffinityTerm(pod, podsForMatching, node, podAffinityTerm) - return weight * podsMatchedCount, err + return true, nil } // compute a sum by iterating through the elements of weightedPodAffinityTerm and adding @@ -98,99 +87,100 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod } // convert the topology key based weights to the node name based weights - var maxCount int - var minCount int - counts := map[string]int{} - for _, node := range nodes { - totalCount := 0 - // count weights for the weighted pod affinity + var maxCount float64 + var minCount float64 + // counts store the mapping from node name to so-far computed score of + // the node. + counts := make(map[string]float64, len(nodes)) + + processTerm := func(term *api.PodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, weight float64) error { + match, err := podMatchesNamespaceAndSelector(podToCheck, affinityPod, term) + if err != nil { + return err + } + if match { + for _, node := range nodes { + if ipa.failureDomains.NodesHaveSameTopologyKey(node, fixedNode, term.TopologyKey) { + counts[node.Name] += weight + } + } + } + return nil + } + processTerms := func(terms []api.WeightedPodAffinityTerm, affinityPod, podToCheck *api.Pod, fixedNode *api.Node, multiplier int) error { + for _, weightedTerm := range terms { + if err := processTerm(&weightedTerm.PodAffinityTerm, affinityPod, podToCheck, fixedNode, float64(weightedTerm.Weight*multiplier)); err != nil { + return err + } + } + return nil + } + + for _, existingPod := range allPods { + existingPodNode, err := ipa.info.GetNodeInfo(existingPod.Spec.NodeName) + if err != nil { + return nil, err + } + existingPodAffinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations) + if err != nil { + return nil, err + } + if affinity.PodAffinity != nil { - for _, weightedTerm := range affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, weightedTerm.Weight, weightedTerm.PodAffinityTerm, node) - if err != nil { - return nil, err - } - totalCount += weightedCount - } - } - - // count weights for the weighted pod anti-affinity - if affinity.PodAntiAffinity != nil { - for _, weightedTerm := range affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - weightedCount, err := ipa.CountWeightByPodMatchAffinityTerm(pod, allPods, (0 - weightedTerm.Weight), weightedTerm.PodAffinityTerm, node) - if err != nil { - return nil, err - } - totalCount += weightedCount - } - } - - // reverse direction checking: count weights for the inter-pod affinity/anti-affinity rules - // that are indicated by existing pods on the node. - for _, ep := range allPods { - epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations) - if err != nil { + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of `s node by the term`s weight. + terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution + if err := processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil { return nil, err } - - if epAffinity.PodAffinity != nil { - // count the implicit weight for the hard pod affinity indicated by the existing pod. - if ipa.hardPodAffinityWeight > 0 { - var podAffinityTerms []api.PodAffinityTerm - if len(epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { - podAffinityTerms = epAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution - } - // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // podAffinityTerms = append(podAffinityTerms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} - for _, epAffinityTerm := range podAffinityTerms { - match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epAffinityTerm, - func(pod *api.Pod) (*api.Node, error) { return node, nil }, - func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) }, - ) - if err != nil { - return nil, err - } - if match { - totalCount += ipa.hardPodAffinityWeight - } - } - } - - // count weight for the weighted pod affinity indicated by the existing pod. - for _, epWeightedTerm := range epAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm, - func(pod *api.Pod) (*api.Node, error) { return node, nil }, - func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) }, - ) - if err != nil { - return nil, err - } - if match { - totalCount += epWeightedTerm.Weight - } - } - } - - // count weight for the weighted pod anti-affinity indicated by the existing pod. - if epAffinity.PodAntiAffinity != nil { - for _, epWeightedTerm := range epAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution { - match, err := ipa.failureDomains.CheckIfPodMatchPodAffinityTerm(pod, ep, epWeightedTerm.PodAffinityTerm, - func(pod *api.Pod) (*api.Node, error) { return node, nil }, - func(ep *api.Pod) (*api.Node, error) { return ipa.info.GetNodeInfo(ep.Spec.NodeName) }, - ) - if err != nil { - return nil, err - } - if match { - totalCount -= epWeightedTerm.Weight - } - } + } + if affinity.PodAntiAffinity != nil { + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of `s node by the term`s weight. + terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution + if err := processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil { + return nil, err } } - counts[node.Name] = totalCount + if existingPodAffinity.PodAffinity != nil { + // For every hard pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the constant + if ipa.hardPodAffinityWeight > 0 { + terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution + // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. + //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //} + for _, term := range terms { + if err := processTerm(&term, existingPod, pod, existingPodNode, float64(ipa.hardPodAffinityWeight)); err != nil { + return nil, err + } + } + } + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the term's weight. + terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution + if err := processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil { + return nil, err + } + } + if existingPodAffinity.PodAntiAffinity != nil { + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of 's node by the term's weight. + terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution + if err := processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil { + return nil, err + } + } + } + + for _, node := range nodes { if counts[node.Name] > maxCount { maxCount = counts[node.Name] } @@ -200,17 +190,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *api.Pod, nod } // calculate final priority score for each node - result := []schedulerapi.HostPriority{} + result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for _, node := range nodes { fScore := float64(0) if (maxCount - minCount) > 0 { - fScore = 10 * (float64(counts[node.Name]-minCount) / float64(maxCount-minCount)) + fScore = 10 * ((counts[node.Name] - minCount) / (maxCount - minCount)) } result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) - glog.V(10).Infof( - "%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore), - ) + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.V(10).Infof("%v -> %v: InterPodAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore)) + } } - return result, nil } diff --git a/plugin/pkg/scheduler/algorithm/priorities/util/non_zero.go b/plugin/pkg/scheduler/algorithm/priorities/util/non_zero.go index c7e810c8ad2..8588471d479 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/util/non_zero.go +++ b/plugin/pkg/scheduler/algorithm/priorities/util/non_zero.go @@ -18,9 +18,6 @@ package util import ( "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/sets" ) // For each of these resources, a pod that doesn't request the resource explicitly @@ -53,83 +50,3 @@ func GetNonzeroRequests(requests *api.ResourceList) (int64, int64) { } return outMilliCPU, outMemory } - -// FilterPodsByNameSpaces filters the pods based the given list of namespaces, -// empty set of namespaces means all namespaces. -func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod { - if len(pods) == 0 || len(names) == 0 { - return pods - } - result := []*api.Pod{} - for _, pod := range pods { - if names.Has(pod.Namespace) { - result = append(result, pod) - } - } - return result -} - -// GetNamespacesFromPodAffinityTerm returns a set of names -// according to the namespaces indicated in podAffinityTerm. -// if the NameSpaces is nil considers the given pod's namespace -// if the Namespaces is empty list then considers all the namespaces -func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String { - names := sets.String{} - if podAffinityTerm.Namespaces == nil { - names.Insert(pod.Namespace) - } else if len(podAffinityTerm.Namespaces) != 0 { - names.Insert(podAffinityTerm.Namespaces...) - } - return names -} - -// NodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key. -func NodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool { - return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey] -} - -type Topologies struct { - DefaultKeys []string -} - -// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key. -// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value. -func (tps *Topologies) NodesHaveSameTopologyKey(nodeA *api.Node, nodeB *api.Node, topologyKey string) bool { - if len(topologyKey) == 0 { - // assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation) - for _, defaultKey := range tps.DefaultKeys { - if NodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) { - return true - } - } - return false - } else { - return NodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey) - } -} - -type getNodeFunc func(*api.Pod) (*api.Node, error) - -// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA -func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm, getNodeA, getNodeB getNodeFunc) (bool, error) { - names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm) - if len(names) != 0 && !names.Has(podA.Namespace) { - return false, nil - } - - labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector) - if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) { - return false, err - } - - podANode, err := getNodeA(podA) - if err != nil { - return false, err - } - podBNode, err := getNodeB(podB) - if err != nil { - return false, err - } - - return tps.NodesHaveSameTopologyKey(podANode, podBNode, podBAffinityTerm.TopologyKey), nil -} diff --git a/plugin/pkg/scheduler/algorithm/priorities/util/topologies.go b/plugin/pkg/scheduler/algorithm/priorities/util/topologies.go new file mode 100644 index 00000000000..c870c7348fe --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/priorities/util/topologies.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/sets" +) + +// FilterPodsByNameSpaces filters the pods based the given list of namespaces, +// empty set of namespaces means all namespaces. +func FilterPodsByNameSpaces(names sets.String, pods []*api.Pod) []*api.Pod { + if len(pods) == 0 || len(names) == 0 { + return pods + } + result := []*api.Pod{} + for _, pod := range pods { + if names.Has(pod.Namespace) { + result = append(result, pod) + } + } + return result +} + +// GetNamespacesFromPodAffinityTerm returns a set of names +// according to the namespaces indicated in podAffinityTerm. +// if the NameSpaces is nil considers the given pod's namespace +// if the Namespaces is empty list then considers all the namespaces +func GetNamespacesFromPodAffinityTerm(pod *api.Pod, podAffinityTerm api.PodAffinityTerm) sets.String { + names := sets.String{} + if podAffinityTerm.Namespaces == nil { + names.Insert(pod.Namespace) + } else if len(podAffinityTerm.Namespaces) != 0 { + names.Insert(podAffinityTerm.Namespaces...) + } + return names +} + +// nodesHaveSameTopologyKeyInternal checks if nodeA and nodeB have same label value with given topologyKey as label key. +func nodesHaveSameTopologyKeyInternal(nodeA, nodeB *api.Node, topologyKey string) bool { + return nodeA.Labels != nil && nodeB.Labels != nil && len(nodeA.Labels[topologyKey]) > 0 && nodeA.Labels[topologyKey] == nodeB.Labels[topologyKey] +} + +type Topologies struct { + DefaultKeys []string +} + +// NodesHaveSameTopologyKey checks if nodeA and nodeB have same label value with given topologyKey as label key. +// If the topologyKey is nil/empty, check if the two nodes have any of the default topologyKeys, and have same corresponding label value. +func (tps *Topologies) NodesHaveSameTopologyKey(nodeA, nodeB *api.Node, topologyKey string) bool { + if len(topologyKey) == 0 { + // assumes this is allowed only for PreferredDuringScheduling pod anti-affinity (ensured by api/validation) + for _, defaultKey := range tps.DefaultKeys { + if nodesHaveSameTopologyKeyInternal(nodeA, nodeB, defaultKey) { + return true + } + } + return false + } else { + return nodesHaveSameTopologyKeyInternal(nodeA, nodeB, topologyKey) + } +} + +// CheckIfPodMatchPodAffinityTerm checks if podB's affinity request is compatible with podA +// TODO: Get rid this method. We should avoid computing Namespaces and selectors multiple times +// and check them on higher levels and then use NodesHaveSameTopologyKey method. +func (tps *Topologies) CheckIfPodMatchPodAffinityTerm(podA *api.Pod, nodeA, nodeB *api.Node, podB *api.Pod, podBAffinityTerm api.PodAffinityTerm) (bool, error) { + names := GetNamespacesFromPodAffinityTerm(podB, podBAffinityTerm) + if len(names) != 0 && !names.Has(podA.Namespace) { + return false, nil + } + + labelSelector, err := unversioned.LabelSelectorAsSelector(podBAffinityTerm.LabelSelector) + if err != nil || !labelSelector.Matches(labels.Set(podA.Labels)) { + return false, err + } + + return tps.NodesHaveSameTopologyKey(nodeA, nodeB, podBAffinityTerm.TopologyKey), nil +}