diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index a83e153e5a1..9417e645c68 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -20,14 +20,17 @@ import ( "fmt" "sync" + "github.com/golang/glog" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/scheduler/algorithm" + priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" "k8s.io/kubernetes/pkg/scheduler/schedulercache" schedutil "k8s.io/kubernetes/pkg/scheduler/util" - - "github.com/golang/glog" ) // PredicateMetadataFactory defines a factory of predicate metadata. @@ -50,7 +53,13 @@ type predicateMetadata struct { podRequest *schedulercache.Resource podPorts []*v1.ContainerPort //key is a pod full name with the anti-affinity rules. - matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm + matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm + // A map of node name to a list of Pods on the node that can potentially match + // the affinity rules of the "pod". + nodeNameToMatchingAffinityPods map[string][]*v1.Pod + // A map of node name to a list of Pods on the node that can potentially match + // the anti-affinity rules of the "pod". + nodeNameToMatchingAntiAffinityPods map[string][]*v1.Pod serviceAffinityInUse bool serviceAffinityMatchingPodList []*v1.Pod serviceAffinityMatchingPodServices []*v1.Service @@ -108,12 +117,19 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf if err != nil { return nil } + affinityPods, antiAffinityPods, err := getPodsMatchingAffinity(pod, nodeNameToInfoMap) + if err != nil { + glog.Errorf("[predicate meta data generation] error finding pods that match affinity terms") + return nil + } predicateMetadata := &predicateMetadata{ - pod: pod, - podBestEffort: isPodBestEffort(pod), - podRequest: GetResourceRequest(pod), - podPorts: schedutil.GetContainerPorts(pod), - matchingAntiAffinityTerms: matchingTerms, + pod: pod, + podBestEffort: isPodBestEffort(pod), + podRequest: GetResourceRequest(pod), + podPorts: schedutil.GetContainerPorts(pod), + matchingAntiAffinityTerms: matchingTerms, + nodeNameToMatchingAffinityPods: affinityPods, + nodeNameToMatchingAntiAffinityPods: antiAffinityPods, } for predicateName, precomputeFunc := range predicateMetadataProducers { glog.V(10).Infof("Precompute: %v", predicateName) @@ -131,6 +147,33 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error { } // Delete any anti-affinity rule from the deletedPod. delete(meta.matchingAntiAffinityTerms, deletedPodFullName) + // Delete pod from the matching affinity or anti-affinity pods if exists. + affinity := meta.pod.Spec.Affinity + podNodeName := deletedPod.Spec.NodeName + if affinity != nil && len(podNodeName) > 0 { + if affinity.PodAffinity != nil { + for i, p := range meta.nodeNameToMatchingAffinityPods[podNodeName] { + if p == deletedPod { + s := meta.nodeNameToMatchingAffinityPods[podNodeName] + s[i] = s[len(s)-1] + s = s[:len(s)-1] + meta.nodeNameToMatchingAffinityPods[podNodeName] = s + break + } + } + } + if affinity.PodAntiAffinity != nil { + for i, p := range meta.nodeNameToMatchingAntiAffinityPods[podNodeName] { + if p == deletedPod { + s := meta.nodeNameToMatchingAntiAffinityPods[podNodeName] + s[i] = s[len(s)-1] + s = s[:len(s)-1] + meta.nodeNameToMatchingAntiAffinityPods[podNodeName] = s + break + } + } + } + } // All pods in the serviceAffinityMatchingPodList are in the same namespace. // So, if the namespace of the first one is not the same as the namespace of the // deletedPod, we don't need to check the list, as deletedPod isn't in the list. @@ -173,6 +216,35 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms } } + // Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed. + affinity := meta.pod.Spec.Affinity + podNodeName := addedPod.Spec.NodeName + if affinity != nil && len(podNodeName) > 0 { + if targetPodMatchesAffinityOfPod(meta.pod, addedPod) { + found := false + for _, p := range meta.nodeNameToMatchingAffinityPods[podNodeName] { + if p == addedPod { + found = true + break + } + } + if !found { + meta.nodeNameToMatchingAffinityPods[podNodeName] = append(meta.nodeNameToMatchingAffinityPods[podNodeName], addedPod) + } + } + if targetPodMatchesAntiAffinityOfPod(meta.pod, addedPod) { + found := false + for _, p := range meta.nodeNameToMatchingAntiAffinityPods[podNodeName] { + if p == addedPod { + found = true + break + } + } + if !found { + meta.nodeNameToMatchingAntiAffinityPods[podNodeName] = append(meta.nodeNameToMatchingAntiAffinityPods[podNodeName], addedPod) + } + } + } // If addedPod is in the same namespace as the meta.pod, update the list // of matching pods if applicable. if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace { @@ -200,9 +272,162 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { for k, v := range meta.matchingAntiAffinityTerms { newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...) } + newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod) + for k, v := range meta.nodeNameToMatchingAffinityPods { + newPredMeta.nodeNameToMatchingAffinityPods[k] = append([]*v1.Pod(nil), v...) + } + newPredMeta.nodeNameToMatchingAntiAffinityPods = make(map[string][]*v1.Pod) + for k, v := range meta.nodeNameToMatchingAntiAffinityPods { + newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...) + } newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), meta.serviceAffinityMatchingPodServices...) newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), meta.serviceAffinityMatchingPodList...) return (algorithm.PredicateMetadata)(newPredMeta) } + +type affinityTermProperties struct { + namespaces sets.String + selector labels.Selector +} + +// getAffinityTermProperties receives a Pod and affinity terms and returns the namespaces and +// selectors of the terms. +func getAffinityTermProperties(pod *v1.Pod, terms []v1.PodAffinityTerm) (properties []*affinityTermProperties, err error) { + if terms == nil { + return properties, nil + } + + for _, term := range terms { + namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term) + selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + return nil, err + } + properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector}) + } + return properties, nil +} + +// podMatchesAffinityTermProperties return true IFF the given pod matches all the given properties. +func podMatchesAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool { + if len(properties) == 0 { + return false + } + for _, property := range properties { + if !priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) { + return false + } + } + return true +} + +// getPodsMatchingAffinity finds existing Pods that match affinity terms of the given "pod". +// It ignores topology. It returns a set of Pods that are checked later by the affinity +// predicate. With this set of pods available, the affinity predicate does not +// need to check all the pods in the cluster. +func getPodsMatchingAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (affinityPods map[string][]*v1.Pod, antiAffinityPods map[string][]*v1.Pod, err error) { + allNodeNames := make([]string, 0, len(nodeInfoMap)) + + affinity := pod.Spec.Affinity + if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { + return nil, nil, nil + } + + for name := range nodeInfoMap { + allNodeNames = append(allNodeNames, name) + } + + var lock sync.Mutex + var firstError error + affinityPods = make(map[string][]*v1.Pod) + antiAffinityPods = make(map[string][]*v1.Pod) + appendResult := func(nodeName string, affPods, antiAffPods []*v1.Pod) { + lock.Lock() + defer lock.Unlock() + if len(affPods) > 0 { + affinityPods[nodeName] = affPods + } + if len(antiAffPods) > 0 { + antiAffinityPods[nodeName] = antiAffPods + } + } + + catchError := func(err error) { + lock.Lock() + defer lock.Unlock() + if firstError == nil { + firstError = err + } + } + + affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity)) + if err != nil { + return nil, nil, err + } + antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity)) + if err != nil { + return nil, nil, err + } + + processNode := func(i int) { + nodeInfo := nodeInfoMap[allNodeNames[i]] + node := nodeInfo.Node() + if node == nil { + catchError(fmt.Errorf("nodeInfo.Node is nil")) + return + } + affPods := make([]*v1.Pod, 0, len(nodeInfo.Pods())) + antiAffPods := make([]*v1.Pod, 0, len(nodeInfo.Pods())) + for _, existingPod := range nodeInfo.Pods() { + // Check affinity properties. + if podMatchesAffinityTermProperties(existingPod, affinityProperties) { + affPods = append(affPods, existingPod) + } + // Check anti-affinity properties. + if podMatchesAffinityTermProperties(existingPod, antiAffinityProperties) { + antiAffPods = append(antiAffPods, existingPod) + } + } + if len(antiAffPods) > 0 || len(affPods) > 0 { + appendResult(node.Name, affPods, antiAffPods) + } + } + workqueue.Parallelize(16, len(allNodeNames), processNode) + return affinityPods, antiAffinityPods, firstError +} + +// podMatchesAffinity returns true if "targetPod" matches any affinity rule of +// "pod". Similar to getPodsMatchingAffinity, this function does not check topology. +// So, whether the targetPod actually matches or not needs further checks for a specific +// node. +func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool { + affinity := pod.Spec.Affinity + if affinity == nil || affinity.PodAffinity == nil { + return false + } + affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity)) + if err != nil { + glog.Errorf("error in getting affinity properties of Pod %v", pod.Name) + return false + } + return podMatchesAffinityTermProperties(targetPod, affinityProperties) +} + +// targetPodMatchesAntiAffinityOfPod returns true if "targetPod" matches any anti-affinity +// rule of "pod". Similar to getPodsMatchingAffinity, this function does not check topology. +// So, whether the targetPod actually matches or not needs further checks for a specific +// node. +func targetPodMatchesAntiAffinityOfPod(pod, targetPod *v1.Pod) bool { + affinity := pod.Spec.Affinity + if affinity == nil || affinity.PodAntiAffinity == nil { + return false + } + properties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity)) + if err != nil { + glog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name) + return false + } + return podMatchesAffinityTermProperties(targetPod, properties) +} diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index fab725d4018..b5b450f5d1c 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -88,6 +88,13 @@ func (s sortableServices) Swap(i, j int) { s[i], s[j] = s[j], s[i] } var _ = sort.Interface(&sortableServices{}) +func sortNodePodMap(np map[string][]*v1.Pod) { + for _, pl := range np { + sortablePods := sortablePods(pl) + sort.Sort(sortablePods) + } +} + // predicateMetadataEquivalent returns true if the two metadata are equivalent. // Note: this function does not compare podRequest. func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { @@ -111,6 +118,16 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) { return fmt.Errorf("matchingAntiAffinityTerms are not euqal") } + sortNodePodMap(meta1.nodeNameToMatchingAffinityPods) + sortNodePodMap(meta2.nodeNameToMatchingAffinityPods) + if !reflect.DeepEqual(meta1.nodeNameToMatchingAffinityPods, meta2.nodeNameToMatchingAffinityPods) { + return fmt.Errorf("nodeNameToMatchingAffinityPods are not euqal") + } + sortNodePodMap(meta1.nodeNameToMatchingAntiAffinityPods) + sortNodePodMap(meta2.nodeNameToMatchingAntiAffinityPods) + if !reflect.DeepEqual(meta1.nodeNameToMatchingAntiAffinityPods, meta2.nodeNameToMatchingAntiAffinityPods) { + return fmt.Errorf("nodeNameToMatchingAntiAffinityPods are not euqal") + } if meta1.serviceAffinityInUse { sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList) sort.Sort(sortablePods1) @@ -189,6 +206,34 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) { }, }, } + affinityComplex := &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "foo", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"bar", "buzz"}, + }, + }, + }, + TopologyKey: "region", + }, + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"bar", "security", "test"}, + }, + }, + }, + TopologyKey: "zone", + }, + }, + } tests := []struct { description string @@ -312,6 +357,41 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, }, }, + { + description: "metadata matching pod affinity and anti-affinity are updated correctly after adding and removing a pod", + pendingPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1}, + }, + existingPods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeA"}, + }, + {ObjectMeta: metav1.ObjectMeta{Name: "p2"}, + Spec: v1.PodSpec{ + NodeName: "nodeC", + Affinity: &v1.Affinity{ + PodAntiAffinity: antiAffinityFooBar, + PodAffinity: affinityComplex, + }, + }, + }, + }, + addedPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1}, + Spec: v1.PodSpec{ + NodeName: "nodeA", + Affinity: &v1.Affinity{ + PodAntiAffinity: antiAffinityComplex, + }, + }, + }, + services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}}, + nodes: []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}}, + {ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}}, + {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, + }, + }, } for _, test := range tests { @@ -360,6 +440,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) { // on the idea that shallow-copy should produce an object that is deep-equal to the original // object. func TestPredicateMetadata_ShallowCopy(t *testing.T) { + selector1 := map[string]string{"foo": "bar"} source := predicateMetadata{ pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -392,6 +473,45 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) { }, }, }, + nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{ + "nodeA": { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeA"}, + }, + }, + "nodeC": { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"}, + Spec: v1.PodSpec{ + NodeName: "nodeC", + }, + }, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeC"}, + }, + }, + }, + nodeNameToMatchingAntiAffinityPods: map[string][]*v1.Pod{ + "nodeN": { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeN"}, + }, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"}, + Spec: v1.PodSpec{ + NodeName: "nodeM", + }, + }, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p3"}, + Spec: v1.PodSpec{ + NodeName: "nodeM", + }, + }, + }, + "nodeM": { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeM"}, + }, + }, + }, serviceAffinityInUse: true, serviceAffinityMatchingPodList: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index f194b715ac5..f2ca8a407ec 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -1150,7 +1150,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta algorithm if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { return true, nil, nil } - if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, nodeInfo, affinity); failedPredicates != nil { + if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil { failedPredicates := append([]algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates) return false, failedPredicates, error } @@ -1380,60 +1380,129 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta return nil, nil } +// anyMatchingPodInTopology checks that any of the given Pods are in the +// topology specified by the affinity term. +func (c *PodAffinityChecker) anyMatchingPodInTopology(pod *v1.Pod, matchingPods map[string][]*v1.Pod, nodeInfo *schedulercache.NodeInfo, term *v1.PodAffinityTerm) (bool, error) { + if len(term.TopologyKey) == 0 { + return false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") + } + if len(matchingPods) == 0 { + return false, nil + } + // Special case: When the topological domain is node, we can limit our + // search to pods on that node without searching the entire cluster. + if term.TopologyKey == kubeletapis.LabelHostname { + if pods, ok := matchingPods[nodeInfo.Node().Name]; ok { + // It may seem odd that we are comparing a node with itself to see if it + // has the same topology key, but it is necessary to check extra conditions + // that the function performs, such as checking that node labels are not nil. + return len(pods) > 0 && priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), nodeInfo.Node(), term.TopologyKey), nil + } + return false, nil + } + // Topology key is not "Hostname". Checking all matching pods. + for nodeName, pods := range matchingPods { + matchingPodNodeInfo, err := c.info.GetNodeInfo(nodeName) + if err != nil { + return false, err + } + if len(pods) > 0 && priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), matchingPodNodeInfo, term.TopologyKey) { + return true, nil + } + } + return false, nil +} + // Checks if scheduling the pod onto this node would break any rules of this pod. -func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo, affinity *v1.Affinity) (algorithm.PredicateFailureReason, error) { +func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, + meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, + affinity *v1.Affinity) (algorithm.PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return ErrPodAffinityRulesNotMatch, fmt.Errorf("Node is nil") } - filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything()) - if err != nil { - return ErrPodAffinityRulesNotMatch, err - } - - // Check all affinity terms. - for _, term := range GetPodAffinityTerms(affinity.PodAffinity) { - termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term) - if err != nil { - errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err) - glog.Error(errMessage) - return ErrPodAffinityRulesNotMatch, errors.New(errMessage) - } - if !termMatches { - // If the requirement matches a pod's own labels are namespace, and there are - // no other such pods, then disregard the requirement. This is necessary to - // not block forever because the first pod of the collection can't be scheduled. - if matchingPodExists { - glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v", - podName(pod), node.Name, term) - return ErrPodAffinityRulesNotMatch, nil - } - namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term) - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if predicateMeta, ok := meta.(*predicateMetadata); ok { + // Check all affinity terms. + matchingPods := predicateMeta.nodeNameToMatchingAffinityPods + for _, term := range GetPodAffinityTerms(affinity.PodAffinity) { + termMatches, err := c.anyMatchingPodInTopology(pod, matchingPods, nodeInfo, &term) if err != nil { - errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err) + errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err) + glog.Errorf(errMessage) + return ErrPodAffinityRulesNotMatch, errors.New(errMessage) + } + if !termMatches { + // This pod may the first pod in a series that have affinity to themselves. In order + // to not leave such pods in pending state forever, we check that if no other pod + // in the cluster matches the namespace and selector of this pod and the pod matches + // its own terms, then we allow the pod to pass the affinity check. + if !(len(matchingPods) == 0 && targetPodMatchesAffinityOfPod(pod, pod)) { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v", + podName(pod), node.Name, term) + return ErrPodAffinityRulesNotMatch, nil + } + } + } + + // Check all anti-affinity terms. + matchingPods = predicateMeta.nodeNameToMatchingAntiAffinityPods + for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { + termMatches, err := c.anyMatchingPodInTopology(pod, matchingPods, nodeInfo, &term) + if err != nil || termMatches { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v", + podName(pod), node.Name, term, err) + return ErrPodAntiAffinityRulesNotMatch, nil + } + } + } else { // We don't have precomputed metadata. We have to follow a slow path to check affinity rules. + filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything()) + if err != nil { + return ErrPodAffinityRulesNotMatch, err + } + + // Check all affinity terms. + for _, term := range GetPodAffinityTerms(affinity.PodAffinity) { + termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term) + if err != nil { + errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err) glog.Error(errMessage) return ErrPodAffinityRulesNotMatch, errors.New(errMessage) } - match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) - if !match { - glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v", - podName(pod), node.Name, term) - return ErrPodAffinityRulesNotMatch, nil + if !termMatches { + // If the requirement matches a pod's own labels are namespace, and there are + // no other such pods, then disregard the requirement. This is necessary to + // not block forever because the first pod of the collection can't be scheduled. + if matchingPodExists { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v", + podName(pod), node.Name, term) + return ErrPodAffinityRulesNotMatch, nil + } + namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term) + selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err) + glog.Error(errMessage) + return ErrPodAffinityRulesNotMatch, errors.New(errMessage) + } + match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) + if !match { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v", + podName(pod), node.Name, term) + return ErrPodAffinityRulesNotMatch, nil + } + } + } + + // Check all anti-affinity terms. + for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { + termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term) + if err != nil || termMatches { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v", + podName(pod), node.Name, term, err) + return ErrPodAntiAffinityRulesNotMatch, nil } } } - - // Check all anti-affinity terms. - for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { - termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term) - if err != nil || termMatches { - glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v", - podName(pod), node.Name, term, err) - return ErrPodAntiAffinityRulesNotMatch, nil - } - } - if glog.V(10) { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index 6b9e6f0a0b4..eb952428721 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -2793,7 +2793,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { }, }, pods: []*v1.Pod{ - {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelA}}, + {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: podLabelA}}, }, nodes: []v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, @@ -3132,7 +3132,8 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { for indexTest, test := range tests { nodeListInfo := FakeNodeListInfo(test.nodes) - for indexNode, node := range test.nodes { + nodeInfoMap := make(map[string]*schedulercache.NodeInfo) + for i, node := range test.nodes { var podsOnNode []*v1.Pod for _, pod := range test.pods { if pod.Spec.NodeName == node.Name { @@ -3140,21 +3141,23 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { } } + nodeInfo := schedulercache.NewNodeInfo(podsOnNode...) + nodeInfo.SetNode(&test.nodes[i]) + nodeInfoMap[node.Name] = nodeInfo + } + + for indexNode, node := range test.nodes { testFit := PodAffinityChecker{ info: nodeListInfo, podLister: schedulertesting.FakePodLister(test.pods), } - nodeInfo := schedulercache.NewNodeInfo(podsOnNode...) - nodeInfo.SetNode(&node) - nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo} var meta algorithm.PredicateMetadata - if !test.nometa { meta = PredicateMetadata(test.pod, nodeInfoMap) } - fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfo) + fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfoMap[node.Name]) if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) { t.Errorf("index: %d test: %s unexpected failure reasons: %v expect: %v", indexTest, test.test, reasons, test.nodesExpectAffinityFailureReasons[indexNode]) }