From 5e0211c72de097b931296f5a08c6a94945548312 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Tue, 12 May 2020 15:18:05 -0400 Subject: [PATCH] Added pre-processed required affinity terms to scheduler's PodInfo type. --- .../framework/plugins/interpodaffinity/BUILD | 2 - .../plugins/interpodaffinity/filtering.go | 245 +++++------------- .../interpodaffinity/filtering_test.go | 7 +- .../plugins/interpodaffinity/scoring.go | 14 +- pkg/scheduler/framework/v1alpha1/BUILD | 2 + pkg/scheduler/framework/v1alpha1/types.go | 48 +++- .../internal/queue/scheduling_queue.go | 24 +- pkg/scheduler/util/utils.go | 24 +- 8 files changed, 146 insertions(+), 220 deletions(-) diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD index 5b0a25e1202..2e8478b8616 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD @@ -16,9 +16,7 @@ go_library( "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 83d4a49a479..ca7b37d821e 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -22,9 +22,6 @@ import ( "sync" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" @@ -54,6 +51,8 @@ type preFilterState struct { topologyToMatchedAffinityTerms topologyToMatchedTermCount // A map of topology pairs to the number of existing pods that match the anti-affinity terms of the "pod". topologyToMatchedAntiAffinityTerms topologyToMatchedTermCount + // podInfo of the incoming pod. + podInfo *framework.PodInfo } // Clone the prefilter state. @@ -66,45 +65,27 @@ func (s *preFilterState) Clone() framework.StateData { copy.topologyToMatchedAffinityTerms = s.topologyToMatchedAffinityTerms.clone() copy.topologyToMatchedAntiAffinityTerms = s.topologyToMatchedAntiAffinityTerms.clone() copy.topologyToMatchedExistingAntiAffinityTerms = s.topologyToMatchedExistingAntiAffinityTerms.clone() + // No need to deep copy the podInfo because it shouldn't change. + copy.podInfo = s.podInfo return © } // updateWithPod updates the preFilterState counters with the (anti)affinity matches for the given pod. -func (s *preFilterState) updateWithPod(updatedPod, pod *v1.Pod, node *v1.Node, multiplier int64) error { +func (s *preFilterState) updateWithPod(updatedPod *v1.Pod, node *v1.Node, multiplier int64) error { if s == nil { return nil } // Update matching existing anti-affinity terms. - updatedPodAffinity := updatedPod.Spec.Affinity - if updatedPodAffinity != nil && updatedPodAffinity.PodAntiAffinity != nil { - antiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(updatedPodAffinity.PodAntiAffinity)) - if err != nil { - return fmt.Errorf("error in getting anti-affinity terms of Pod %v: %v", updatedPod.Name, err) - } - s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(pod, node, antiAffinityTerms, multiplier) - } + // TODO(#91058): AddPod/RemovePod should pass a *framework.PodInfo type instead of *v1.Pod. + updatedPodInfo := framework.NewPodInfo(updatedPod) + s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(s.podInfo.Pod, node, updatedPodInfo.RequiredAntiAffinityTerms, multiplier) // Update matching incoming pod (anti)affinity terms. - affinity := pod.Spec.Affinity - podNodeName := updatedPod.Spec.NodeName - if affinity != nil && len(podNodeName) > 0 { - if affinity.PodAffinity != nil { - affinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(affinity.PodAffinity)) - if err != nil { - return fmt.Errorf("error in getting affinity terms of Pod %v: %v", pod.Name, err) - } - s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, affinityTerms, multiplier) - } - if affinity.PodAntiAffinity != nil { - antiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity)) - if err != nil { - klog.Errorf("error in getting anti-affinity terms of Pod %v: %v", pod.Name, err) - } - s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, antiAffinityTerms, multiplier) - } - } + s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, s.podInfo.RequiredAffinityTerms, multiplier) + s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, s.podInfo.RequiredAntiAffinityTerms, multiplier) + return nil } @@ -131,11 +112,11 @@ func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount { // updateWithAffinityTerms updates the topologyToMatchedTermCount map with the specified value // for each affinity term if "targetPod" matches ALL terms. -func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []*affinityTerm, value int64) { +func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []framework.AffinityTerm, value int64) { if podMatchesAllAffinityTerms(targetPod, affinityTerms) { for _, t := range affinityTerms { - if topologyValue, ok := targetPodNode.Labels[t.topologyKey]; ok { - pair := topologyPair{key: t.topologyKey, value: topologyValue} + if topologyValue, ok := targetPodNode.Labels[t.TopologyKey]; ok { + pair := topologyPair{key: t.TopologyKey, value: topologyValue} m[pair] += value // value could be a negative value, hence we delete the entry if // the entry is down to zero. @@ -149,12 +130,12 @@ func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, t // updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value // for each anti-affinity term matched the target pod. -func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []*affinityTerm, value int64) { +func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) { // Check anti-affinity terms. for _, a := range antiAffinityTerms { - if schedutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.namespaces, a.selector) { - if topologyValue, ok := targetPodNode.Labels[a.topologyKey]; ok { - pair := topologyPair{key: a.topologyKey, value: topologyValue} + if schedutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.Namespaces, a.Selector) { + if topologyValue, ok := targetPodNode.Labels[a.TopologyKey]; ok { + pair := topologyPair{key: a.TopologyKey, value: topologyValue} m[pair] += value // value could be a negative value, hence we delete the entry if // the entry is down to zero. @@ -166,39 +147,13 @@ func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Po } } -// A processed version of v1.PodAffinityTerm. -type affinityTerm struct { - namespaces sets.String - selector labels.Selector - topologyKey string -} - -// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and -// selectors of the terms. -func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]*affinityTerm, error) { - if v1Terms == nil { - return nil, nil - } - - var terms []*affinityTerm - for _, term := range v1Terms { - namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, &term) - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) - if err != nil { - return nil, err - } - terms = append(terms, &affinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey}) - } - return terms, nil -} - // podMatchesAllAffinityTerms returns true IFF the given pod matches all the given terms. -func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool { +func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) bool { if len(terms) == 0 { return false } for _, term := range terms { - if !schedutil.PodMatchesTermsNamespaceAndSelector(pod, term.namespaces, term.selector) { + if !schedutil.PodMatchesTermsNamespaceAndSelector(pod, term.Namespaces, term.Selector) { return false } } @@ -208,8 +163,7 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool { // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, error) { - errCh := parallelize.NewErrorChannel() +func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount { var lock sync.Mutex topologyMap := make(topologyToMatchedTermCount) @@ -219,8 +173,6 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod topologyMap.append(toAppend) } - ctx, cancel := context.WithCancel(context.Background()) - processNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() @@ -229,35 +181,26 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod return } for _, existingPod := range nodeInfo.PodsWithAffinity { - existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod.Pod, node) - if err != nil { - errCh.SendErrorWithCancel(err, cancel) - return - } - if existingPodTopologyMaps != nil { + existingPodTopologyMaps := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node) + if len(existingPodTopologyMaps) != 0 { appendResult(existingPodTopologyMaps) } } } - parallelize.Until(ctx, len(allNodes), processNode) + parallelize.Until(context.Background(), len(allNodes), processNode) - if err := errCh.ReceiveError(); err != nil { - return nil, err - } - - return topologyMap, nil + return topologyMap } // getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod". // It returns a topologyToMatchedTermCount that are checked later by the affinity // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not // need to check all the pods in the cluster. -func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount, error) { +func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) { topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount) topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount) - affinity := pod.Spec.Affinity - if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { - return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil + if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 { + return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms } var lock sync.Mutex @@ -272,16 +215,6 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame } } - affinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(affinity.PodAffinity)) - if err != nil { - return nil, nil, err - } - - antiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity)) - if err != nil { - return nil, nil, err - } - processNode := func(i int) { nodeInfo := allNodes[i] node := nodeInfo.Node() @@ -293,10 +226,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount) for _, existingPod := range nodeInfo.Pods { // Check affinity terms. - nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, affinityTerms, 1) + nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1) // Check anti-affinity terms. - nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, antiAffinityTerms, 1) + nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1) } if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 { @@ -305,24 +238,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame } parallelize.Until(context.Background(), len(allNodes), processNode) - return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil -} - -// targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of -// "pod". This function does not check topology. -// So, whether the targetPod actually matches or not needs further checks for a specific -// node. -func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool { - affinity := pod.Spec.Affinity - if affinity == nil || affinity.PodAffinity == nil { - return false - } - affinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(affinity.PodAffinity)) - if err != nil { - klog.Errorf("error in getting affinity terms of Pod %v", pod.Name) - return false - } - return podMatchesAllAffinityTerms(targetPod, affinityTerms) + return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms } // PreFilter invoked at the prefilter extension point. @@ -337,22 +253,20 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err)) } + podInfo := framework.NewPodInfo(pod) + // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity - existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes) - if err != nil { - return framework.NewStatus(framework.Error, fmt.Sprintf("calculating preFilterState: %v", err)) - } + existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes) + // incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity // incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity - incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, allNodes) - if err != nil { - return framework.NewStatus(framework.Error, fmt.Sprintf("calculating preFilterState: %v", err)) - } + incomingPodAffinityMap, incomingPodAntiAffinityMap := getTPMapMatchingIncomingAffinityAntiAffinity(podInfo, allNodes) s := &preFilterState{ topologyToMatchedAffinityTerms: incomingPodAffinityMap, topologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap, topologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap, + podInfo: podInfo, } cycleState.Write(preFilterStateKey, s) @@ -370,7 +284,7 @@ func (pl *InterPodAffinity) AddPod(ctx context.Context, cycleState *framework.Cy if err != nil { return framework.NewStatus(framework.Error, err.Error()) } - state.updateWithPod(podToAdd, podToSchedule, nodeInfo.Node(), 1) + state.updateWithPod(podToAdd, nodeInfo.Node(), 1) return nil } @@ -380,7 +294,7 @@ func (pl *InterPodAffinity) RemovePod(ctx context.Context, cycleState *framework if err != nil { return framework.NewStatus(framework.Error, err.Error()) } - state.updateWithPod(podToRemove, podToSchedule, nodeInfo.Node(), -1) + state.updateWithPod(podToRemove, nodeInfo.Node(), -1) return nil } @@ -415,13 +329,13 @@ func (pl *InterPodAffinity) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, state return true, nil } -// nodeMatchesAllTopologyTerms checks whether "nodeInfo" matches topology of all the "terms" for the given "pod". -func nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *framework.NodeInfo, terms []v1.PodAffinityTerm) bool { +// nodeMatchesAllAffinityTerms checks whether "nodeInfo" matches all affinity terms of the incoming pod. +func nodeMatchesAllAffinityTerms(nodeInfo *framework.NodeInfo, state *preFilterState) bool { node := nodeInfo.Node() - for _, term := range terms { + for _, term := range state.podInfo.RequiredAffinityTerms { if topologyValue, ok := node.Labels[term.TopologyKey]; ok { pair := topologyPair{key: term.TopologyKey, value: topologyValue} - if topologyPairs[pair] <= 0 { + if state.topologyToMatchedAffinityTerms[pair] <= 0 { return false } } else { @@ -431,14 +345,13 @@ func nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs topologyToMatchedTer return true } -// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches -// topology of any "term" for the given "pod". -func nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *framework.NodeInfo, terms []v1.PodAffinityTerm) bool { +// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches any of the pod's anti affinity terms. +func nodeMatchesAnyAntiAffinityTerm(nodeInfo *framework.NodeInfo, state *preFilterState) bool { node := nodeInfo.Node() - for _, term := range terms { + for _, term := range state.podInfo.RequiredAntiAffinityTerms { if topologyValue, ok := node.Labels[term.TopologyKey]; ok { pair := topologyPair{key: term.TopologyKey, value: topologyValue} - if topologyPairs[pair] > 0 { + if state.topologyToMatchedAntiAffinityTerms[pair] > 0 { return true } } @@ -449,62 +362,38 @@ func nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTerm // getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node: // (1) Whether it has PodAntiAffinity // (2) Whether ANY AffinityTerm matches the incoming pod -func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (topologyToMatchedTermCount, error) { - affinity := existingPod.Spec.Affinity - if affinity == nil || affinity.PodAntiAffinity == nil { - return nil, nil - } - +func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount { topologyMap := make(topologyToMatchedTermCount) - for _, term := range schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) - if err != nil { - return nil, err - } - namespaces := schedutil.GetNamespacesFromPodAffinityTerm(existingPod, &term) - if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) { + for _, term := range existingPod.RequiredAntiAffinityTerms { + if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) { if topologyValue, ok := node.Labels[term.TopologyKey]; ok { pair := topologyPair{key: term.TopologyKey, value: topologyValue} topologyMap[pair]++ } } } - return topologyMap, nil + return topologyMap } // satisfiesPodsAffinityAntiAffinity checks if scheduling the pod onto this node would break any term of this pod. // This function returns two boolean flags. The first boolean flag indicates whether the pod matches affinity rules // or not. The second boolean flag indicates if the pod matches anti-affinity rules. -func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, - state *preFilterState, nodeInfo *framework.NodeInfo, - affinity *v1.Affinity) (bool, bool, error) { - node := nodeInfo.Node() - if node == nil { - return false, false, fmt.Errorf("node not found") - } - +func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) (bool, bool, error) { // Check all affinity terms. - topologyToMatchedAffinityTerms := state.topologyToMatchedAffinityTerms - if affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 { - matchExists := nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms) - if !matchExists { - // This pod may the first pod in a series that have affinity to themselves. In order - // to not leave such pods in pending state forever, we check that if no other pod - // in the cluster matches the namespace and selector of this pod and the pod matches - // its own terms, then we allow the pod to pass the affinity check. - if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) { - return false, false, nil - } + if !nodeMatchesAllAffinityTerms(nodeInfo, state) { + // This pod may be the first pod in a series that have affinity to themselves. In order + // to not leave such pods in pending state forever, we check that if no other pod + // in the cluster matches the namespace and selector of this pod and the pod matches + // its own terms, then we allow the pod to pass the affinity check. + podInfo := state.podInfo + if len(state.topologyToMatchedAffinityTerms) != 0 || !podMatchesAllAffinityTerms(podInfo.Pod, podInfo.RequiredAffinityTerms) { + return false, false, nil } } // Check all anti-affinity terms. - topologyToMatchedAntiAffinityTerms := state.topologyToMatchedAntiAffinityTerms - if antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 { - matchExists := nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms) - if matchExists { - return true, false, nil - } + if nodeMatchesAnyAntiAffinityTerm(nodeInfo, state) { + return true, false, nil } return true, true, nil @@ -513,6 +402,10 @@ func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, // Filter invoked at the filter extension point. // It checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration. func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + if nodeInfo.Node() == nil { + return framework.NewStatus(framework.Error, "node not found") + } + state, err := getPreFilterState(cycleState) if err != nil { return framework.NewStatus(framework.Error, err.Error()) @@ -526,11 +419,7 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy } // Now check if requirements will be satisfied on this node. - affinity := pod.Spec.Affinity - if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { - return nil - } - if satisfiesAffinity, satisfiesAntiAffinity, err := pl.satisfiesPodsAffinityAntiAffinity(pod, state, nodeInfo, affinity); err != nil || !satisfiesAffinity || !satisfiesAntiAffinity { + if satisfiesAffinity, satisfiesAntiAffinity, err := pl.satisfiesPodsAffinityAntiAffinity(state, nodeInfo); err != nil || !satisfiesAffinity || !satisfiesAntiAffinity { if err != nil { return framework.NewStatus(framework.Error, err.Error()) } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go index 0e346457c0a..51b155d4266 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go @@ -2035,7 +2035,6 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { pod *v1.Pod wantAffinityPodsMap topologyToMatchedTermCount wantAntiAffinityPodsMap topologyToMatchedTermCount - wantErr bool }{ { name: "nil test", @@ -2195,11 +2194,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := cache.NewSnapshot(tt.existingPods, tt.nodes) l, _ := s.NodeInfos().List() - gotAffinityPodsMap, gotAntiAffinityPodsMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l) - if (err != nil) != tt.wantErr { - t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr) - return - } + gotAffinityPodsMap, gotAntiAffinityPodsMap := getTPMapMatchingIncomingAffinityAntiAffinity(framework.NewPodInfo(tt.pod), l) if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) { t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap) } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index 0a4facb61c2..5d207eb3525 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -48,7 +48,7 @@ func (s *preScoreState) Clone() framework.StateData { // A "processed" representation of v1.WeightedAffinityTerm. type weightedAffinityTerm struct { - affinityTerm + framework.AffinityTerm weight int32 } @@ -58,7 +58,7 @@ func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32 if err != nil { return nil, err } - return &weightedAffinityTerm{affinityTerm: affinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey}, weight: weight}, nil + return &weightedAffinityTerm{AffinityTerm: framework.AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, weight: weight}, nil } func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) { @@ -87,13 +87,13 @@ func (m scoreMap) processTerm( return } - match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector) - tpValue, tpValueExist := fixedNode.Labels[term.topologyKey] + match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.Namespaces, term.Selector) + tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey] if match && tpValueExist { - if m[term.topologyKey] == nil { - m[term.topologyKey] = make(map[string]int64) + if m[term.TopologyKey] == nil { + m[term.TopologyKey] = make(map[string]int64) } - m[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier)) + m[term.TopologyKey][tpValue] += int64(term.weight * int32(multiplier)) } return } diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 341222deb9d..cfba556bcbb 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -25,6 +25,8 @@ go_library( "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/types.go b/pkg/scheduler/framework/v1alpha1/types.go index 4f0767c0c63..2cedc25bd4f 100644 --- a/pkg/scheduler/framework/v1alpha1/types.go +++ b/pkg/scheduler/framework/v1alpha1/types.go @@ -25,6 +25,9 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -65,13 +68,54 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo { // accelerate processing. This information is typically immutable (e.g., pre-processed // inter-pod affinity selectors). type PodInfo struct { - Pod *v1.Pod + Pod *v1.Pod + RequiredAffinityTerms []AffinityTerm + RequiredAntiAffinityTerms []AffinityTerm +} + +// AffinityTerm is a processed version of v1.PodAffinityTerm. +type AffinityTerm struct { + Namespaces sets.String + Selector labels.Selector + TopologyKey string +} + +func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm { + namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term) + selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + klog.Errorf("Cannot process label selector: %v", err) + return nil + } + return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey} +} + +// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and +// selectors of the terms. +func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) []AffinityTerm { + if v1Terms == nil { + return nil + } + + var terms []AffinityTerm + for _, term := range v1Terms { + t := newAffinityTerm(pod, &term) + if t == nil { + // We get here if the label selector failed to process, this is not supposed + // to happen because the pod should have been validated by the api server. + return nil + } + terms = append(terms, *t) + } + return terms } // NewPodInfo return a new PodInfo func NewPodInfo(pod *v1.Pod) *PodInfo { return &PodInfo{ - Pod: pod, + Pod: pod, + RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)), + RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)), } } diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 8359feb40bf..637d1ec1831 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -532,21 +532,19 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod var podsToMove []*framework.QueuedPodInfo for _, pInfo := range p.unschedulableQ.podInfoMap { up := pInfo.Pod - affinity := up.Spec.Affinity - if affinity != nil && affinity.PodAffinity != nil { - terms := util.GetPodAffinityTerms(affinity.PodAffinity) - for _, term := range terms { - namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term) - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) - if err != nil { - klog.Errorf("Error getting label selectors for pod: %v.", up.Name) - } - if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { - podsToMove = append(podsToMove, pInfo) - break - } + terms := util.GetPodAffinityTerms(up.Spec.Affinity) + for _, term := range terms { + namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term) + selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) + if err != nil { + klog.Errorf("Error getting label selectors for pod: %v.", up.Name) + } + if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { + podsToMove = append(podsToMove, pInfo) + break } } + } return podsToMove } diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 77cd9a0da22..36fae630a0b 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -83,28 +83,28 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool { } // GetPodAffinityTerms gets pod affinity terms by a pod affinity object. -func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) { - if podAffinity != nil { - if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { - terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution +func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { + if affinity != nil && affinity.PodAffinity != nil { + if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { + terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution } // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) //} } return terms } // GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity. -func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) { - if podAntiAffinity != nil { - if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { - terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution +func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) { + if affinity != nil && affinity.PodAntiAffinity != nil { + if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 { + terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution } // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...) //} } return terms