mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #86046 from ahg-g/ahg1-affinity
Optimize required pod affinity (2)
This commit is contained in:
commit
9bf52c2aa6
@ -51,16 +51,10 @@ type topologyPair struct {
|
||||
value string
|
||||
}
|
||||
|
||||
type podSet map[*v1.Pod]struct{}
|
||||
|
||||
type topologyPairSet map[topologyPair]struct{}
|
||||
|
||||
// topologyPairsMaps keeps topologyPairToAntiAffinityPods and antiAffinityPodToTopologyPairs in sync
|
||||
// as they are the inverse of each others.
|
||||
type topologyPairsMaps struct {
|
||||
topologyPairToPods map[topologyPair]podSet
|
||||
podToTopologyPairs map[string]topologyPairSet
|
||||
}
|
||||
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int64)" so that
|
||||
// we can do atomic additions instead of using a global mutext, however we need to consider
|
||||
// how to init each topologyToMatchedTermCount.
|
||||
type topologyToMatchedTermCount map[topologyPair]int64
|
||||
|
||||
type criticalPath struct {
|
||||
// topologyValue denotes the topology value mapping to topology key.
|
||||
@ -186,61 +180,89 @@ func (m *serviceAffinityMetadata) clone() *serviceAffinityMetadata {
|
||||
}
|
||||
|
||||
type podAffinityMetadata struct {
|
||||
topologyPairsAntiAffinityPodsMap *topologyPairsMaps
|
||||
// A map of topology pairs to a list of Pods that can potentially match
|
||||
// the affinity terms of the "pod" and its inverse.
|
||||
topologyPairsPotentialAffinityPods *topologyPairsMaps
|
||||
// A map of topology pairs to a list of Pods that can potentially match
|
||||
// the anti-affinity terms of the "pod" and its inverse.
|
||||
topologyPairsPotentialAntiAffinityPods *topologyPairsMaps
|
||||
// A map of topology pairs to the number of existing pods that has anti-affinity terms that match the "pod".
|
||||
topologyToMatchedExistingAntiAffinityTerms topologyToMatchedTermCount
|
||||
// A map of topology pairs to the number of existing pods that match the affinity terms of the "pod".
|
||||
topologyToMatchedAffinityTerms topologyToMatchedTermCount
|
||||
// A map of topology pairs to the number of existing pods that match the anti-affinity terms of the "pod".
|
||||
topologyToMatchedAntiAffinityTerms topologyToMatchedTermCount
|
||||
}
|
||||
|
||||
func (m *podAffinityMetadata) addPod(addedPod *v1.Pod, pod *v1.Pod, node *v1.Node) error {
|
||||
// Add matching anti-affinity terms of the addedPod to the map.
|
||||
topologyPairsMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, addedPod, node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.topologyPairsAntiAffinityPodsMap.appendMaps(topologyPairsMaps)
|
||||
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
|
||||
affinity := pod.Spec.Affinity
|
||||
podNodeName := addedPod.Spec.NodeName
|
||||
if affinity != nil && len(podNodeName) > 0 {
|
||||
// It is assumed that when the added pod matches affinity of the pod, all the terms must match,
|
||||
// this should be changed when the implementation of targetPodMatchesAffinityOfPod/podMatchesAffinityTermProperties
|
||||
// is changed
|
||||
if targetPodMatchesAffinityOfPod(pod, addedPod) {
|
||||
affinityTerms := GetPodAffinityTerms(affinity.PodAffinity)
|
||||
for _, term := range affinityTerms {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
m.topologyPairsPotentialAffinityPods.addTopologyPair(pair, addedPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
if targetPodMatchesAntiAffinityOfPod(pod, addedPod) {
|
||||
antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity)
|
||||
for _, term := range antiAffinityTerms {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
m.topologyPairsPotentialAntiAffinityPods.addTopologyPair(pair, addedPod)
|
||||
// updateWithAffinityTerms updates the topologyToMatchedTermCount map with the specified value
|
||||
// for each affinity term if "targetPod" matches ALL terms.
|
||||
func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []*affinityTermProperties, value int64) {
|
||||
if podMatchesAllAffinityTermProperties(targetPod, affinityTerms) {
|
||||
for _, t := range affinityTerms {
|
||||
if topologyValue, ok := targetPodNode.Labels[t.topologyKey]; ok {
|
||||
pair := topologyPair{key: t.topologyKey, value: topologyValue}
|
||||
m[pair] += value
|
||||
// value could be a negative value, hence we delete the entry if
|
||||
// the entry is down to zero.
|
||||
if m[pair] == 0 {
|
||||
delete(m, pair)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *podAffinityMetadata) removePod(deletedPod *v1.Pod) {
|
||||
// updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
|
||||
// for each anti-affinity term matched the target pod.
|
||||
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []*affinityTermProperties, value int64) {
|
||||
// Check anti-affinity properties.
|
||||
for _, a := range antiAffinityTerms {
|
||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.namespaces, a.selector) {
|
||||
if topologyValue, ok := targetPodNode.Labels[a.topologyKey]; ok {
|
||||
pair := topologyPair{key: a.topologyKey, value: topologyValue}
|
||||
m[pair] += value
|
||||
// value could be a negative value, hence we delete the entry if
|
||||
// the entry is down to zero.
|
||||
if m[pair] == 0 {
|
||||
delete(m, pair)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *podAffinityMetadata) updatePod(updatedPod, pod *v1.Pod, node *v1.Node, multiplier int64) error {
|
||||
if m == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
m.topologyPairsAntiAffinityPodsMap.removePod(deletedPod)
|
||||
// Delete pod from the matching affinity or anti-affinity topology pairs maps.
|
||||
m.topologyPairsPotentialAffinityPods.removePod(deletedPod)
|
||||
m.topologyPairsPotentialAntiAffinityPods.removePod(deletedPod)
|
||||
// Update matching existing anti-affinity terms.
|
||||
updatedPodAffinity := updatedPod.Spec.Affinity
|
||||
if updatedPodAffinity != nil && updatedPodAffinity.PodAntiAffinity != nil {
|
||||
antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(updatedPodAffinity.PodAntiAffinity))
|
||||
if err != nil {
|
||||
klog.Errorf("error in getting anti-affinity properties of Pod %v", updatedPod.Name)
|
||||
return err
|
||||
}
|
||||
m.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(pod, node, antiAffinityProperties, multiplier)
|
||||
}
|
||||
|
||||
// Update matching incoming pod (anti)affinity terms.
|
||||
affinity := pod.Spec.Affinity
|
||||
podNodeName := updatedPod.Spec.NodeName
|
||||
if affinity != nil && len(podNodeName) > 0 {
|
||||
if affinity.PodAffinity == nil {
|
||||
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
|
||||
if err != nil {
|
||||
klog.Errorf("error in getting affinity properties of Pod %v", pod.Name)
|
||||
return err
|
||||
}
|
||||
m.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, affinityProperties, multiplier)
|
||||
}
|
||||
if affinity.PodAntiAffinity != nil {
|
||||
antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
|
||||
if err != nil {
|
||||
klog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name)
|
||||
return err
|
||||
}
|
||||
m.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, antiAffinityProperties, multiplier)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *podAffinityMetadata) clone() *podAffinityMetadata {
|
||||
@ -249,9 +271,9 @@ func (m *podAffinityMetadata) clone() *podAffinityMetadata {
|
||||
}
|
||||
|
||||
copy := podAffinityMetadata{}
|
||||
copy.topologyPairsPotentialAffinityPods = m.topologyPairsPotentialAffinityPods.clone()
|
||||
copy.topologyPairsPotentialAntiAffinityPods = m.topologyPairsPotentialAntiAffinityPods.clone()
|
||||
copy.topologyPairsAntiAffinityPodsMap = m.topologyPairsAntiAffinityPodsMap.clone()
|
||||
copy.topologyToMatchedAffinityTerms = m.topologyToMatchedAffinityTerms.clone()
|
||||
copy.topologyToMatchedAntiAffinityTerms = m.topologyToMatchedAntiAffinityTerms.clone()
|
||||
copy.topologyToMatchedExistingAntiAffinityTerms = m.topologyToMatchedExistingAntiAffinityTerms.clone()
|
||||
|
||||
return ©
|
||||
}
|
||||
@ -421,9 +443,9 @@ func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo,
|
||||
}
|
||||
|
||||
return &podAffinityMetadata{
|
||||
topologyPairsPotentialAffinityPods: incomingPodAffinityMap,
|
||||
topologyPairsPotentialAntiAffinityPods: incomingPodAntiAffinityMap,
|
||||
topologyPairsAntiAffinityPodsMap: existingPodAntiAffinityMap,
|
||||
topologyToMatchedAffinityTerms: incomingPodAffinityMap,
|
||||
topologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap,
|
||||
topologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -527,50 +549,14 @@ func NodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints
|
||||
return true
|
||||
}
|
||||
|
||||
// returns a pointer to a new topologyPairsMaps
|
||||
func newTopologyPairsMaps() *topologyPairsMaps {
|
||||
return &topologyPairsMaps{
|
||||
topologyPairToPods: make(map[topologyPair]podSet),
|
||||
podToTopologyPairs: make(map[string]topologyPairSet),
|
||||
func (m topologyToMatchedTermCount) appendMaps(toAppend topologyToMatchedTermCount) {
|
||||
for pair := range toAppend {
|
||||
m[pair] += toAppend[pair]
|
||||
}
|
||||
}
|
||||
|
||||
func (m *topologyPairsMaps) addTopologyPair(pair topologyPair, pod *v1.Pod) {
|
||||
podFullName := schedutil.GetPodFullName(pod)
|
||||
if m.topologyPairToPods[pair] == nil {
|
||||
m.topologyPairToPods[pair] = make(map[*v1.Pod]struct{})
|
||||
}
|
||||
m.topologyPairToPods[pair][pod] = struct{}{}
|
||||
if m.podToTopologyPairs[podFullName] == nil {
|
||||
m.podToTopologyPairs[podFullName] = make(map[topologyPair]struct{})
|
||||
}
|
||||
m.podToTopologyPairs[podFullName][pair] = struct{}{}
|
||||
}
|
||||
|
||||
func (m *topologyPairsMaps) removePod(deletedPod *v1.Pod) {
|
||||
deletedPodFullName := schedutil.GetPodFullName(deletedPod)
|
||||
for pair := range m.podToTopologyPairs[deletedPodFullName] {
|
||||
delete(m.topologyPairToPods[pair], deletedPod)
|
||||
if len(m.topologyPairToPods[pair]) == 0 {
|
||||
delete(m.topologyPairToPods, pair)
|
||||
}
|
||||
}
|
||||
delete(m.podToTopologyPairs, deletedPodFullName)
|
||||
}
|
||||
|
||||
func (m *topologyPairsMaps) appendMaps(toAppend *topologyPairsMaps) {
|
||||
if toAppend == nil {
|
||||
return
|
||||
}
|
||||
for pair := range toAppend.topologyPairToPods {
|
||||
for pod := range toAppend.topologyPairToPods[pair] {
|
||||
m.addTopologyPair(pair, pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *topologyPairsMaps) clone() *topologyPairsMaps {
|
||||
copy := newTopologyPairsMaps()
|
||||
func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount {
|
||||
copy := make(topologyToMatchedTermCount, len(m))
|
||||
copy.appendMaps(m)
|
||||
return copy
|
||||
}
|
||||
@ -633,7 +619,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) erro
|
||||
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
|
||||
return fmt.Errorf("deletedPod and meta.pod must not be the same")
|
||||
}
|
||||
meta.podAffinityMetadata.removePod(deletedPod)
|
||||
meta.podAffinityMetadata.updatePod(deletedPod, meta.pod, node, -1)
|
||||
meta.evenPodsSpreadMetadata.removePod(deletedPod, meta.pod, node)
|
||||
meta.serviceAffinityMetadata.removePod(deletedPod, node)
|
||||
|
||||
@ -651,7 +637,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
|
||||
return fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
if err := meta.podAffinityMetadata.addPod(addedPod, meta.pod, node); err != nil {
|
||||
if err := meta.podAffinityMetadata.updatePod(addedPod, meta.pod, node, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
// Update meta.evenPodsSpreadMetadata if meta.pod has hard spread constraints
|
||||
@ -678,9 +664,11 @@ func (meta *predicateMetadata) ShallowCopy() Metadata {
|
||||
return (Metadata)(newPredMeta)
|
||||
}
|
||||
|
||||
// A processed version of v1.PodAffinityTerm.
|
||||
type affinityTermProperties struct {
|
||||
namespaces sets.String
|
||||
selector labels.Selector
|
||||
namespaces sets.String
|
||||
selector labels.Selector
|
||||
topologyKey string
|
||||
}
|
||||
|
||||
// getAffinityTermProperties receives a Pod and affinity terms and returns the namespaces and
|
||||
@ -696,7 +684,7 @@ func getAffinityTermProperties(pod *v1.Pod, terms []v1.PodAffinityTerm) (propert
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector})
|
||||
properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey})
|
||||
}
|
||||
return properties, nil
|
||||
}
|
||||
@ -714,31 +702,18 @@ func podMatchesAllAffinityTermProperties(pod *v1.Pod, properties []*affinityTerm
|
||||
return true
|
||||
}
|
||||
|
||||
// podMatchesAnyAffinityTermProperties returns true if the given pod matches any given property.
|
||||
func podMatchesAnyAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool {
|
||||
if len(properties) == 0 {
|
||||
return false
|
||||
}
|
||||
for _, property := range properties {
|
||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
|
||||
// (1) Whether it has PodAntiAffinity
|
||||
// (2) Whether any AffinityTerm matches the incoming pod
|
||||
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*topologyPairsMaps, error) {
|
||||
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyToMatchedTermCount, error) {
|
||||
errCh := schedutil.NewErrorChannel()
|
||||
var lock sync.Mutex
|
||||
topologyMaps := newTopologyPairsMaps()
|
||||
topologyMap := make(topologyToMatchedTermCount)
|
||||
|
||||
appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) {
|
||||
appendResult := func(toAppend topologyToMatchedTermCount) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
topologyMaps.appendMaps(toAppend)
|
||||
topologyMap.appendMaps(toAppend)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -757,7 +732,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernode
|
||||
return
|
||||
}
|
||||
if existingPodTopologyMaps != nil {
|
||||
appendTopologyPairsMaps(existingPodTopologyMaps)
|
||||
appendResult(existingPodTopologyMaps)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -767,30 +742,30 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernode
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return topologyMaps, nil
|
||||
return topologyMap, nil
|
||||
}
|
||||
|
||||
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
|
||||
// It returns a topologyPairsMaps that are checked later by the affinity
|
||||
// predicate. With this topologyPairsMaps available, the affinity predicate does not
|
||||
// It returns a topologyToMatchedTermCount that are checked later by the affinity
|
||||
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
|
||||
// need to check all the pods in the cluster.
|
||||
func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyPairsAffinityPodsMaps *topologyPairsMaps, topologyPairsAntiAffinityPodsMaps *topologyPairsMaps, err error) {
|
||||
func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount, error) {
|
||||
topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
|
||||
topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount)
|
||||
affinity := pod.Spec.Affinity
|
||||
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
|
||||
return newTopologyPairsMaps(), newTopologyPairsMaps(), nil
|
||||
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
|
||||
}
|
||||
|
||||
var lock sync.Mutex
|
||||
topologyPairsAffinityPodsMaps = newTopologyPairsMaps()
|
||||
topologyPairsAntiAffinityPodsMaps = newTopologyPairsMaps()
|
||||
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps *topologyPairsMaps) {
|
||||
appendResult := func(nodeName string, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap topologyToMatchedTermCount) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 {
|
||||
topologyPairsAffinityPodsMaps.appendMaps(nodeTopologyPairsAffinityPodsMaps)
|
||||
if len(nodeTopologyPairsAffinityPodsMap) > 0 {
|
||||
topologyPairsAffinityPodsMap.appendMaps(nodeTopologyPairsAffinityPodsMap)
|
||||
}
|
||||
if len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 {
|
||||
topologyPairsAntiAffinityPodsMaps.appendMaps(nodeTopologyPairsAntiAffinityPodsMaps)
|
||||
if len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
|
||||
topologyToMatchedExistingAntiAffinityTerms.appendMaps(nodeTopologyPairsAntiAffinityPodsMap)
|
||||
}
|
||||
}
|
||||
|
||||
@ -813,37 +788,23 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*sched
|
||||
klog.Error("node not found")
|
||||
return
|
||||
}
|
||||
nodeTopologyPairsAffinityPodsMaps := newTopologyPairsMaps()
|
||||
nodeTopologyPairsAntiAffinityPodsMaps := newTopologyPairsMaps()
|
||||
nodeTopologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
|
||||
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount)
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
// Check affinity properties.
|
||||
if podMatchesAllAffinityTermProperties(existingPod, affinityProperties) {
|
||||
for _, term := range affinityTerms {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
nodeTopologyPairsAffinityPodsMaps.addTopologyPair(pair, existingPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod, node, affinityProperties, 1)
|
||||
|
||||
// Check anti-affinity properties.
|
||||
for i, term := range antiAffinityTerms {
|
||||
p := antiAffinityProperties[i]
|
||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, p.namespaces, p.selector) {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
nodeTopologyPairsAntiAffinityPodsMaps.addTopologyPair(pair, existingPod)
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod, node, antiAffinityProperties, 1)
|
||||
}
|
||||
|
||||
if len(nodeTopologyPairsAffinityPodsMaps.topologyPairToPods) > 0 || len(nodeTopologyPairsAntiAffinityPodsMaps.topologyPairToPods) > 0 {
|
||||
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
|
||||
if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
|
||||
appendResult(node.Name, nodeTopologyPairsAffinityPodsMap, nodeTopologyPairsAntiAffinityPodsMap)
|
||||
}
|
||||
}
|
||||
workqueue.ParallelizeUntil(context.Background(), 16, len(allNodes), processNode)
|
||||
|
||||
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, nil
|
||||
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
|
||||
}
|
||||
|
||||
// targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of
|
||||
@ -862,20 +823,3 @@ func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool {
|
||||
}
|
||||
return podMatchesAllAffinityTermProperties(targetPod, affinityProperties)
|
||||
}
|
||||
|
||||
// targetPodMatchesAntiAffinityOfPod returns true if "targetPod" matches ANY anti-affinity
|
||||
// term of "pod". This function does not check topology.
|
||||
// So, whether the targetPod actually matches or not needs further checks for a specific
|
||||
// node.
|
||||
func targetPodMatchesAntiAffinityOfPod(pod, targetPod *v1.Pod) bool {
|
||||
affinity := pod.Spec.Affinity
|
||||
if affinity == nil || affinity.PodAntiAffinity == nil {
|
||||
return false
|
||||
}
|
||||
properties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
|
||||
if err != nil {
|
||||
klog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name)
|
||||
return false
|
||||
}
|
||||
return podMatchesAnyAffinityTermProperties(targetPod, properties)
|
||||
}
|
||||
|
@ -70,19 +70,15 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
|
||||
for !reflect.DeepEqual(meta1.podFitsHostPortsMetadata.podPorts, meta2.podFitsHostPortsMetadata.podPorts) {
|
||||
return fmt.Errorf("podPorts are not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyPairsPotentialAffinityPods, meta2.podAffinityMetadata.topologyPairsPotentialAffinityPods) {
|
||||
return fmt.Errorf("topologyPairsPotentialAffinityPods are not equal")
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyToMatchedAffinityTerms, meta2.podAffinityMetadata.topologyToMatchedAffinityTerms) {
|
||||
return fmt.Errorf("topologyToMatchedAffinityTerms are not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyPairsPotentialAntiAffinityPods, meta2.podAffinityMetadata.topologyPairsPotentialAntiAffinityPods) {
|
||||
return fmt.Errorf("topologyPairsPotentialAntiAffinityPods are not equal")
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyToMatchedAntiAffinityTerms, meta2.podAffinityMetadata.topologyToMatchedAntiAffinityTerms) {
|
||||
return fmt.Errorf("topologyToMatchedAntiAffinityTerms are not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyPairsAntiAffinityPodsMap.podToTopologyPairs,
|
||||
meta2.podAffinityMetadata.topologyPairsAntiAffinityPodsMap.podToTopologyPairs) {
|
||||
return fmt.Errorf("topologyPairsAntiAffinityPodsMap.podToTopologyPairs are not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyPairsAntiAffinityPodsMap.topologyPairToPods,
|
||||
meta2.podAffinityMetadata.topologyPairsAntiAffinityPodsMap.topologyPairToPods) {
|
||||
return fmt.Errorf("topologyPairsAntiAffinityPodsMap.topologyPairToPods are not equal")
|
||||
if !reflect.DeepEqual(meta1.podAffinityMetadata.topologyToMatchedExistingAntiAffinityTerms,
|
||||
meta2.podAffinityMetadata.topologyToMatchedExistingAntiAffinityTerms) {
|
||||
return fmt.Errorf("topologyToMatchedExistingAntiAffinityTerms are not equal, got: %v, want: %v", meta1.podAffinityMetadata.topologyToMatchedExistingAntiAffinityTerms, meta2.podAffinityMetadata.topologyToMatchedExistingAntiAffinityTerms)
|
||||
}
|
||||
if meta1.serviceAffinityMetadata != nil {
|
||||
sortablePods1 := sortablePods(meta1.serviceAffinityMetadata.matchingPodList)
|
||||
@ -379,7 +375,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
|
||||
// Remove the added pod and from existingPodsMeta1 an make sure it is equal
|
||||
// to meta generated for existing pods.
|
||||
existingPodsMeta2, _ := getMeta(fakelisters.PodLister(test.existingPods))
|
||||
if err := existingPodsMeta1.RemovePod(test.addedPod, nil); err != nil {
|
||||
if err := existingPodsMeta1.RemovePod(test.addedPod, nodeInfo.Node()); err != nil {
|
||||
t.Errorf("error removing pod from meta: %v", err)
|
||||
}
|
||||
if err := predicateMetadataEquivalent(existingPodsMeta1, existingPodsMeta2); err != nil {
|
||||
@ -393,7 +389,6 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
|
||||
// on the idea that shallow-copy should produce an object that is deep-equal to the original
|
||||
// object.
|
||||
func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||
selector1 := map[string]string{"foo": "bar"}
|
||||
source := predicateMetadata{
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -421,95 +416,17 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||
},
|
||||
},
|
||||
podAffinityMetadata: &podAffinityMetadata{
|
||||
topologyPairsAntiAffinityPodsMap: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "name", value: "machine1"}: {
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2", Labels: selector1},
|
||||
Spec: v1.PodSpec{NodeName: "nodeC"},
|
||||
}: struct{}{},
|
||||
},
|
||||
{key: "name", value: "machine2"}: {
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
|
||||
Spec: v1.PodSpec{NodeName: "nodeA"},
|
||||
}: struct{}{},
|
||||
},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"p2_": {
|
||||
topologyPair{key: "name", value: "machine1"}: struct{}{},
|
||||
},
|
||||
"p1_": {
|
||||
topologyPair{key: "name", value: "machine2"}: struct{}{},
|
||||
},
|
||||
},
|
||||
topologyToMatchedExistingAntiAffinityTerms: topologyToMatchedTermCount{
|
||||
{key: "name", value: "machine1"}: 1,
|
||||
{key: "name", value: "machine2"}: 1,
|
||||
},
|
||||
topologyPairsPotentialAffinityPods: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "name", value: "nodeA"}: {
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
|
||||
Spec: v1.PodSpec{NodeName: "nodeA"},
|
||||
}: struct{}{},
|
||||
},
|
||||
{key: "name", value: "nodeC"}: {
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "nodeC",
|
||||
},
|
||||
}: struct{}{},
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1},
|
||||
Spec: v1.PodSpec{NodeName: "nodeC"},
|
||||
}: struct{}{},
|
||||
},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"p1_": {
|
||||
topologyPair{key: "name", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
"p2_": {
|
||||
topologyPair{key: "name", value: "nodeC"}: struct{}{},
|
||||
},
|
||||
"p6_": {
|
||||
topologyPair{key: "name", value: "nodeC"}: struct{}{},
|
||||
},
|
||||
},
|
||||
topologyToMatchedAffinityTerms: topologyToMatchedTermCount{
|
||||
{key: "name", value: "nodeA"}: 1,
|
||||
{key: "name", value: "nodeC"}: 2,
|
||||
},
|
||||
topologyPairsPotentialAntiAffinityPods: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "name", value: "nodeN"}: {
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
|
||||
Spec: v1.PodSpec{NodeName: "nodeN"},
|
||||
}: struct{}{},
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "nodeM",
|
||||
},
|
||||
}: struct{}{},
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p3"},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "nodeM",
|
||||
},
|
||||
}: struct{}{},
|
||||
},
|
||||
{key: "name", value: "nodeM"}: {
|
||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1},
|
||||
Spec: v1.PodSpec{NodeName: "nodeM"},
|
||||
}: struct{}{},
|
||||
},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"p1_": {
|
||||
topologyPair{key: "name", value: "nodeN"}: struct{}{},
|
||||
},
|
||||
"p2_": {
|
||||
topologyPair{key: "name", value: "nodeN"}: struct{}{},
|
||||
},
|
||||
"p3_": {
|
||||
topologyPair{key: "name", value: "nodeN"}: struct{}{},
|
||||
},
|
||||
"p6_": {
|
||||
topologyPair{key: "name", value: "nodeM"}: struct{}{},
|
||||
},
|
||||
},
|
||||
topologyToMatchedAntiAffinityTerms: topologyToMatchedTermCount{
|
||||
{key: "name", value: "nodeN"}: 3,
|
||||
{key: "name", value: "nodeM"}: 1,
|
||||
},
|
||||
},
|
||||
evenPodsSpreadMetadata: &evenPodsSpreadMetadata{
|
||||
@ -573,13 +490,13 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
nodeA := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: map[string]string{"hostname": "nodeA"}}}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
existingPods []*v1.Pod
|
||||
nodes []*v1.Node
|
||||
pod *v1.Pod
|
||||
wantAffinityPodsMaps *topologyPairsMaps
|
||||
wantAntiAffinityPodsMaps *topologyPairsMaps
|
||||
wantErr bool
|
||||
name string
|
||||
existingPods []*v1.Pod
|
||||
nodes []*v1.Node
|
||||
pod *v1.Pod
|
||||
wantAffinityPodsMap topologyToMatchedTermCount
|
||||
wantAntiAffinityPodsMap topologyToMatchedTermCount
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "nil test",
|
||||
@ -587,8 +504,8 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "aaa-normal"},
|
||||
},
|
||||
wantAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAntiAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
wantAntiAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
},
|
||||
{
|
||||
name: "incoming pod without affinity/anti-affinity causes a no-op",
|
||||
@ -597,8 +514,8 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "aaa-normal"},
|
||||
},
|
||||
wantAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAntiAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
wantAntiAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
},
|
||||
{
|
||||
name: "no pod has label that violates incoming pod's affinity and anti-affinity",
|
||||
@ -617,8 +534,8 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAntiAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
wantAntiAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
},
|
||||
{
|
||||
name: "existing pod matches incoming pod's affinity and anti-affinity - single term case",
|
||||
@ -637,29 +554,15 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodA: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 1,
|
||||
},
|
||||
wantAntiAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodA: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAntiAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "existing pod matches incoming pod's affinity and anti-affinity - mutiple terms case",
|
||||
name: "existing pod matches incoming pod's affinity and anti-affinity - multiple terms case",
|
||||
existingPods: []*v1.Pod{normalPodAB},
|
||||
nodes: []*v1.Node{nodeA},
|
||||
pod: &v1.Pod{
|
||||
@ -675,25 +578,11 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodAB: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 2, // 2 one for each term.
|
||||
},
|
||||
wantAntiAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodAB: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAntiAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -713,16 +602,9 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAntiAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodA: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
wantAntiAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -742,16 +624,9 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAntiAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodAB: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
wantAntiAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -771,16 +646,9 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMaps: newTopologyPairsMaps(),
|
||||
wantAntiAffinityPodsMaps: &topologyPairsMaps{
|
||||
topologyPairToPods: map[topologyPair]podSet{
|
||||
{key: "hostname", value: "nodeA"}: {normalPodB: struct{}{}},
|
||||
},
|
||||
podToTopologyPairs: map[string]topologyPairSet{
|
||||
"normal_": {
|
||||
topologyPair{key: "hostname", value: "nodeA"}: struct{}{},
|
||||
},
|
||||
},
|
||||
wantAffinityPodsMap: make(topologyToMatchedTermCount),
|
||||
wantAntiAffinityPodsMap: topologyToMatchedTermCount{
|
||||
{key: "hostname", value: "nodeA"}: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -788,16 +656,16 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(tt.existingPods, tt.nodes))
|
||||
l, _ := s.NodeInfos().List()
|
||||
gotAffinityPodsMaps, gotAntiAffinityPodsMaps, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l)
|
||||
gotAffinityPodsMap, gotAntiAffinityPodsMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(gotAffinityPodsMaps, tt.wantAffinityPodsMaps) {
|
||||
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMaps = %#v, want %#v", gotAffinityPodsMaps, tt.wantAffinityPodsMaps)
|
||||
if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) {
|
||||
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap)
|
||||
}
|
||||
if !reflect.DeepEqual(gotAntiAffinityPodsMaps, tt.wantAntiAffinityPodsMaps) {
|
||||
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAntiAffinityPodsMaps = %#v, want %#v", gotAntiAffinityPodsMaps, tt.wantAntiAffinityPodsMaps)
|
||||
if !reflect.DeepEqual(gotAntiAffinityPodsMap, tt.wantAntiAffinityPodsMap) {
|
||||
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAntiAffinityPodsMap = %#v, want %#v", gotAntiAffinityPodsMap, tt.wantAntiAffinityPodsMap)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -1298,13 +1298,13 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po
|
||||
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
|
||||
// (1) Whether it has PodAntiAffinity
|
||||
// (2) Whether ANY AffinityTerm matches the incoming pod
|
||||
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (*topologyPairsMaps, error) {
|
||||
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (topologyToMatchedTermCount, error) {
|
||||
affinity := existingPod.Spec.Affinity
|
||||
if affinity == nil || affinity.PodAntiAffinity == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
topologyMaps := newTopologyPairsMaps()
|
||||
topologyMap := make(topologyToMatchedTermCount)
|
||||
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
|
||||
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
|
||||
if err != nil {
|
||||
@ -1314,15 +1314,15 @@ func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.P
|
||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
topologyMaps.addTopologyPair(pair, existingPod)
|
||||
topologyMap[pair]++
|
||||
}
|
||||
}
|
||||
}
|
||||
return topologyMaps, nil
|
||||
return topologyMap, nil
|
||||
}
|
||||
|
||||
func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.Pod, existingPods []*v1.Pod) (*topologyPairsMaps, error) {
|
||||
topologyMaps := newTopologyPairsMaps()
|
||||
func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.Pod, existingPods []*v1.Pod) (topologyToMatchedTermCount, error) {
|
||||
topologyMaps := make(topologyToMatchedTermCount)
|
||||
|
||||
for _, existingPod := range existingPods {
|
||||
existingPodNodeInfo, err := c.nodeInfoLister.Get(existingPod.Spec.NodeName)
|
||||
@ -1346,9 +1346,9 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
||||
if node == nil {
|
||||
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("node not found")
|
||||
}
|
||||
var topologyMaps *topologyPairsMaps
|
||||
var topologyMap topologyToMatchedTermCount
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||
topologyMaps = predicateMeta.podAffinityMetadata.topologyPairsAntiAffinityPodsMap
|
||||
topologyMap = predicateMeta.podAffinityMetadata.topologyToMatchedExistingAntiAffinityTerms
|
||||
} else {
|
||||
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
|
||||
// present in nodeInfo. Pods on other nodes pass the filter.
|
||||
@ -1358,7 +1358,7 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
||||
klog.Error(errMessage)
|
||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
||||
}
|
||||
if topologyMaps, err = c.getMatchingAntiAffinityTopologyPairsOfPods(pod, filteredPods); err != nil {
|
||||
if topologyMap, err = c.getMatchingAntiAffinityTopologyPairsOfPods(pod, filteredPods); err != nil {
|
||||
errMessage := fmt.Sprintf("Failed to get all terms that match pod %s: %v", podName(pod), err)
|
||||
klog.Error(errMessage)
|
||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
||||
@ -1368,7 +1368,7 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
||||
// Iterate over topology pairs to get any of the pods being affected by
|
||||
// the scheduled pod anti-affinity terms
|
||||
for topologyKey, topologyValue := range node.Labels {
|
||||
if topologyMaps.topologyPairToPods[topologyPair{key: topologyKey, value: topologyValue}] != nil {
|
||||
if topologyMap[topologyPair{key: topologyKey, value: topologyValue}] > 0 {
|
||||
klog.V(10).Infof("Cannot schedule pod %+v onto node %v", podName(pod), node.Name)
|
||||
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
|
||||
}
|
||||
@ -1384,12 +1384,12 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
||||
|
||||
// nodeMatchesAllTopologyTerms checks whether "nodeInfo" matches
|
||||
// topology of all the "terms" for the given "pod".
|
||||
func (c *PodAffinityChecker) nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs *topologyPairsMaps, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
|
||||
func (c *PodAffinityChecker) nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
|
||||
node := nodeInfo.Node()
|
||||
for _, term := range terms {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
if _, ok := topologyPairs.topologyPairToPods[pair]; !ok {
|
||||
if topologyPairs[pair] <= 0 {
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
@ -1401,12 +1401,12 @@ func (c *PodAffinityChecker) nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPa
|
||||
|
||||
// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches
|
||||
// topology of any "term" for the given "pod".
|
||||
func (c *PodAffinityChecker) nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs *topologyPairsMaps, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
|
||||
func (c *PodAffinityChecker) nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *schedulernodeinfo.NodeInfo, terms []v1.PodAffinityTerm) bool {
|
||||
node := nodeInfo.Node()
|
||||
for _, term := range terms {
|
||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||
if _, ok := topologyPairs.topologyPairToPods[pair]; ok {
|
||||
if topologyPairs[pair] > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@ -1424,15 +1424,15 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
|
||||
}
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||
// Check all affinity terms.
|
||||
topologyPairsPotentialAffinityPods := predicateMeta.podAffinityMetadata.topologyPairsPotentialAffinityPods
|
||||
topologyToMatchedAffinityTerms := predicateMeta.podAffinityMetadata.topologyToMatchedAffinityTerms
|
||||
if affinityTerms := GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
|
||||
matchExists := c.nodeMatchesAllTopologyTerms(pod, topologyPairsPotentialAffinityPods, nodeInfo, affinityTerms)
|
||||
matchExists := c.nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
|
||||
if !matchExists {
|
||||
// This pod may the first pod in a series that have affinity to themselves. In order
|
||||
// to not leave such pods in pending state forever, we check that if no other pod
|
||||
// in the cluster matches the namespace and selector of this pod and the pod matches
|
||||
// its own terms, then we allow the pod to pass the affinity check.
|
||||
if !(len(topologyPairsPotentialAffinityPods.topologyPairToPods) == 0 && targetPodMatchesAffinityOfPod(pod, pod)) {
|
||||
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
|
||||
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinity",
|
||||
podName(pod), node.Name)
|
||||
return ErrPodAffinityRulesNotMatch, nil
|
||||
@ -1441,9 +1441,9 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
|
||||
}
|
||||
|
||||
// Check all anti-affinity terms.
|
||||
topologyPairsPotentialAntiAffinityPods := predicateMeta.podAffinityMetadata.topologyPairsPotentialAntiAffinityPods
|
||||
topologyToMatchedAntiAffinityTerms := predicateMeta.podAffinityMetadata.topologyToMatchedAntiAffinityTerms
|
||||
if antiAffinityTerms := GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
|
||||
matchExists := c.nodeMatchesAnyTopologyTerm(pod, topologyPairsPotentialAntiAffinityPods, nodeInfo, antiAffinityTerms)
|
||||
matchExists := c.nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
|
||||
if matchExists {
|
||||
klog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinity",
|
||||
podName(pod), node.Name)
|
||||
|
Loading…
Reference in New Issue
Block a user