Merge pull request #91062 from ahg-g/ahg-affinity1

Added pre-processed required affinity terms to scheduler's PodInfo type.
This commit is contained in:
Kubernetes Prow Robot 2020-05-15 02:49:07 -07:00 committed by GitHub
commit c453be845a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 146 additions and 220 deletions

View File

@ -16,9 +16,7 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -22,9 +22,6 @@ import (
"sync"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
@ -54,6 +51,8 @@ type preFilterState struct {
topologyToMatchedAffinityTerms topologyToMatchedTermCount
// A map of topology pairs to the number of existing pods that match the anti-affinity terms of the "pod".
topologyToMatchedAntiAffinityTerms topologyToMatchedTermCount
// podInfo of the incoming pod.
podInfo *framework.PodInfo
}
// Clone the prefilter state.
@ -66,45 +65,27 @@ func (s *preFilterState) Clone() framework.StateData {
copy.topologyToMatchedAffinityTerms = s.topologyToMatchedAffinityTerms.clone()
copy.topologyToMatchedAntiAffinityTerms = s.topologyToMatchedAntiAffinityTerms.clone()
copy.topologyToMatchedExistingAntiAffinityTerms = s.topologyToMatchedExistingAntiAffinityTerms.clone()
// No need to deep copy the podInfo because it shouldn't change.
copy.podInfo = s.podInfo
return &copy
}
// updateWithPod updates the preFilterState counters with the (anti)affinity matches for the given pod.
func (s *preFilterState) updateWithPod(updatedPod, pod *v1.Pod, node *v1.Node, multiplier int64) error {
func (s *preFilterState) updateWithPod(updatedPod *v1.Pod, node *v1.Node, multiplier int64) error {
if s == nil {
return nil
}
// Update matching existing anti-affinity terms.
updatedPodAffinity := updatedPod.Spec.Affinity
if updatedPodAffinity != nil && updatedPodAffinity.PodAntiAffinity != nil {
antiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(updatedPodAffinity.PodAntiAffinity))
if err != nil {
return fmt.Errorf("error in getting anti-affinity terms of Pod %v: %v", updatedPod.Name, err)
}
s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(pod, node, antiAffinityTerms, multiplier)
}
// TODO(#91058): AddPod/RemovePod should pass a *framework.PodInfo type instead of *v1.Pod.
updatedPodInfo := framework.NewPodInfo(updatedPod)
s.topologyToMatchedExistingAntiAffinityTerms.updateWithAntiAffinityTerms(s.podInfo.Pod, node, updatedPodInfo.RequiredAntiAffinityTerms, multiplier)
// Update matching incoming pod (anti)affinity terms.
affinity := pod.Spec.Affinity
podNodeName := updatedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
if affinity.PodAffinity != nil {
affinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
return fmt.Errorf("error in getting affinity terms of Pod %v: %v", pod.Name, err)
}
s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, affinityTerms, multiplier)
}
if affinity.PodAntiAffinity != nil {
antiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
if err != nil {
klog.Errorf("error in getting anti-affinity terms of Pod %v: %v", pod.Name, err)
}
s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, antiAffinityTerms, multiplier)
}
}
s.topologyToMatchedAffinityTerms.updateWithAffinityTerms(updatedPod, node, s.podInfo.RequiredAffinityTerms, multiplier)
s.topologyToMatchedAntiAffinityTerms.updateWithAntiAffinityTerms(updatedPod, node, s.podInfo.RequiredAntiAffinityTerms, multiplier)
return nil
}
@ -131,11 +112,11 @@ func (m topologyToMatchedTermCount) clone() topologyToMatchedTermCount {
// updateWithAffinityTerms updates the topologyToMatchedTermCount map with the specified value
// for each affinity term if "targetPod" matches ALL terms.
func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []*affinityTerm, value int64) {
func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, affinityTerms []framework.AffinityTerm, value int64) {
if podMatchesAllAffinityTerms(targetPod, affinityTerms) {
for _, t := range affinityTerms {
if topologyValue, ok := targetPodNode.Labels[t.topologyKey]; ok {
pair := topologyPair{key: t.topologyKey, value: topologyValue}
if topologyValue, ok := targetPodNode.Labels[t.TopologyKey]; ok {
pair := topologyPair{key: t.TopologyKey, value: topologyValue}
m[pair] += value
// value could be a negative value, hence we delete the entry if
// the entry is down to zero.
@ -149,12 +130,12 @@ func (m topologyToMatchedTermCount) updateWithAffinityTerms(targetPod *v1.Pod, t
// updateAntiAffinityTerms updates the topologyToMatchedTermCount map with the specified value
// for each anti-affinity term matched the target pod.
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []*affinityTerm, value int64) {
func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Pod, targetPodNode *v1.Node, antiAffinityTerms []framework.AffinityTerm, value int64) {
// Check anti-affinity terms.
for _, a := range antiAffinityTerms {
if schedutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.namespaces, a.selector) {
if topologyValue, ok := targetPodNode.Labels[a.topologyKey]; ok {
pair := topologyPair{key: a.topologyKey, value: topologyValue}
if schedutil.PodMatchesTermsNamespaceAndSelector(targetPod, a.Namespaces, a.Selector) {
if topologyValue, ok := targetPodNode.Labels[a.TopologyKey]; ok {
pair := topologyPair{key: a.TopologyKey, value: topologyValue}
m[pair] += value
// value could be a negative value, hence we delete the entry if
// the entry is down to zero.
@ -166,39 +147,13 @@ func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(targetPod *v1.Po
}
}
// A processed version of v1.PodAffinityTerm.
type affinityTerm struct {
namespaces sets.String
selector labels.Selector
topologyKey string
}
// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and
// selectors of the terms.
func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]*affinityTerm, error) {
if v1Terms == nil {
return nil, nil
}
var terms []*affinityTerm
for _, term := range v1Terms {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
terms = append(terms, &affinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey})
}
return terms, nil
}
// podMatchesAllAffinityTerms returns true IFF the given pod matches all the given terms.
func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool {
func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) bool {
if len(terms) == 0 {
return false
}
for _, term := range terms {
if !schedutil.PodMatchesTermsNamespaceAndSelector(pod, term.namespaces, term.selector) {
if !schedutil.PodMatchesTermsNamespaceAndSelector(pod, term.Namespaces, term.Selector) {
return false
}
}
@ -208,8 +163,7 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool {
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
// (1) Whether it has PodAntiAffinity
// (2) Whether any AffinityTerm matches the incoming pod
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, error) {
errCh := parallelize.NewErrorChannel()
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount {
var lock sync.Mutex
topologyMap := make(topologyToMatchedTermCount)
@ -219,8 +173,6 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod
topologyMap.append(toAppend)
}
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
@ -229,35 +181,26 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.Nod
return
}
for _, existingPod := range nodeInfo.PodsWithAffinity {
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod.Pod, node)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
if existingPodTopologyMaps != nil {
existingPodTopologyMaps := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, node)
if len(existingPodTopologyMaps) != 0 {
appendResult(existingPodTopologyMaps)
}
}
}
parallelize.Until(ctx, len(allNodes), processNode)
parallelize.Until(context.Background(), len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}
return topologyMap, nil
return topologyMap
}
// getTPMapMatchingIncomingAffinityAntiAffinity finds existing Pods that match affinity terms of the given "pod".
// It returns a topologyToMatchedTermCount that are checked later by the affinity
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
// need to check all the pods in the cluster.
func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount, error) {
func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
topologyPairsAffinityPodsMap := make(topologyToMatchedTermCount)
topologyToMatchedExistingAntiAffinityTerms := make(topologyToMatchedTermCount)
affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms
}
var lock sync.Mutex
@ -272,16 +215,6 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame
}
}
affinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
return nil, nil, err
}
antiAffinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
if err != nil {
return nil, nil, err
}
processNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
@ -293,10 +226,10 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame
nodeTopologyPairsAntiAffinityPodsMap := make(topologyToMatchedTermCount)
for _, existingPod := range nodeInfo.Pods {
// Check affinity terms.
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, affinityTerms, 1)
nodeTopologyPairsAffinityPodsMap.updateWithAffinityTerms(existingPod.Pod, node, podInfo.RequiredAffinityTerms, 1)
// Check anti-affinity terms.
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, antiAffinityTerms, 1)
nodeTopologyPairsAntiAffinityPodsMap.updateWithAntiAffinityTerms(existingPod.Pod, node, podInfo.RequiredAntiAffinityTerms, 1)
}
if len(nodeTopologyPairsAffinityPodsMap) > 0 || len(nodeTopologyPairsAntiAffinityPodsMap) > 0 {
@ -305,24 +238,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, allNodes []*frame
}
parallelize.Until(context.Background(), len(allNodes), processNode)
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms, nil
}
// targetPodMatchesAffinityOfPod returns true if "targetPod" matches ALL affinity terms of
// "pod". This function does not check topology.
// So, whether the targetPod actually matches or not needs further checks for a specific
// node.
func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool {
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAffinity == nil {
return false
}
affinityTerms, err := getAffinityTerms(pod, schedutil.GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
klog.Errorf("error in getting affinity terms of Pod %v", pod.Name)
return false
}
return podMatchesAllAffinityTerms(targetPod, affinityTerms)
return topologyPairsAffinityPodsMap, topologyToMatchedExistingAntiAffinityTerms
}
// PreFilter invoked at the prefilter extension point.
@ -337,22 +253,20 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err))
}
podInfo := framework.NewPodInfo(pod)
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("calculating preFilterState: %v", err))
}
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
incomingPodAffinityMap, incomingPodAntiAffinityMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(pod, allNodes)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("calculating preFilterState: %v", err))
}
incomingPodAffinityMap, incomingPodAntiAffinityMap := getTPMapMatchingIncomingAffinityAntiAffinity(podInfo, allNodes)
s := &preFilterState{
topologyToMatchedAffinityTerms: incomingPodAffinityMap,
topologyToMatchedAntiAffinityTerms: incomingPodAntiAffinityMap,
topologyToMatchedExistingAntiAffinityTerms: existingPodAntiAffinityMap,
podInfo: podInfo,
}
cycleState.Write(preFilterStateKey, s)
@ -370,7 +284,7 @@ func (pl *InterPodAffinity) AddPod(ctx context.Context, cycleState *framework.Cy
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
state.updateWithPod(podToAdd, podToSchedule, nodeInfo.Node(), 1)
state.updateWithPod(podToAdd, nodeInfo.Node(), 1)
return nil
}
@ -380,7 +294,7 @@ func (pl *InterPodAffinity) RemovePod(ctx context.Context, cycleState *framework
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
state.updateWithPod(podToRemove, podToSchedule, nodeInfo.Node(), -1)
state.updateWithPod(podToRemove, nodeInfo.Node(), -1)
return nil
}
@ -415,13 +329,13 @@ func (pl *InterPodAffinity) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, state
return true, nil
}
// nodeMatchesAllTopologyTerms checks whether "nodeInfo" matches topology of all the "terms" for the given "pod".
func nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *framework.NodeInfo, terms []v1.PodAffinityTerm) bool {
// nodeMatchesAllAffinityTerms checks whether "nodeInfo" matches all affinity terms of the incoming pod.
func nodeMatchesAllAffinityTerms(nodeInfo *framework.NodeInfo, state *preFilterState) bool {
node := nodeInfo.Node()
for _, term := range terms {
for _, term := range state.podInfo.RequiredAffinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
if topologyPairs[pair] <= 0 {
if state.topologyToMatchedAffinityTerms[pair] <= 0 {
return false
}
} else {
@ -431,14 +345,13 @@ func nodeMatchesAllTopologyTerms(pod *v1.Pod, topologyPairs topologyToMatchedTer
return true
}
// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches
// topology of any "term" for the given "pod".
func nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTermCount, nodeInfo *framework.NodeInfo, terms []v1.PodAffinityTerm) bool {
// nodeMatchesAnyTopologyTerm checks whether "nodeInfo" matches any of the pod's anti affinity terms.
func nodeMatchesAnyAntiAffinityTerm(nodeInfo *framework.NodeInfo, state *preFilterState) bool {
node := nodeInfo.Node()
for _, term := range terms {
for _, term := range state.podInfo.RequiredAntiAffinityTerms {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
if topologyPairs[pair] > 0 {
if state.topologyToMatchedAntiAffinityTerms[pair] > 0 {
return true
}
}
@ -449,62 +362,38 @@ func nodeMatchesAnyTopologyTerm(pod *v1.Pod, topologyPairs topologyToMatchedTerm
// getMatchingAntiAffinityTopologyPairs calculates the following for "existingPod" on given node:
// (1) Whether it has PodAntiAffinity
// (2) Whether ANY AffinityTerm matches the incoming pod
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (topologyToMatchedTermCount, error) {
affinity := existingPod.Spec.Affinity
if affinity == nil || affinity.PodAntiAffinity == nil {
return nil, nil
}
func getMatchingAntiAffinityTopologyPairsOfPod(newPod *v1.Pod, existingPod *framework.PodInfo, node *v1.Node) topologyToMatchedTermCount {
topologyMap := make(topologyToMatchedTermCount)
for _, term := range schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
for _, term := range existingPod.RequiredAntiAffinityTerms {
if schedutil.PodMatchesTermsNamespaceAndSelector(newPod, term.Namespaces, term.Selector) {
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
topologyMap[pair]++
}
}
}
return topologyMap, nil
return topologyMap
}
// satisfiesPodsAffinityAntiAffinity checks if scheduling the pod onto this node would break any term of this pod.
// This function returns two boolean flags. The first boolean flag indicates whether the pod matches affinity rules
// or not. The second boolean flag indicates if the pod matches anti-affinity rules.
func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
state *preFilterState, nodeInfo *framework.NodeInfo,
affinity *v1.Affinity) (bool, bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, false, fmt.Errorf("node not found")
}
func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) (bool, bool, error) {
// Check all affinity terms.
topologyToMatchedAffinityTerms := state.topologyToMatchedAffinityTerms
if affinityTerms := schedutil.GetPodAffinityTerms(affinity.PodAffinity); len(affinityTerms) > 0 {
matchExists := nodeMatchesAllTopologyTerms(pod, topologyToMatchedAffinityTerms, nodeInfo, affinityTerms)
if !matchExists {
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if len(topologyToMatchedAffinityTerms) != 0 || !targetPodMatchesAffinityOfPod(pod, pod) {
return false, false, nil
}
if !nodeMatchesAllAffinityTerms(nodeInfo, state) {
// This pod may be the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
podInfo := state.podInfo
if len(state.topologyToMatchedAffinityTerms) != 0 || !podMatchesAllAffinityTerms(podInfo.Pod, podInfo.RequiredAffinityTerms) {
return false, false, nil
}
}
// Check all anti-affinity terms.
topologyToMatchedAntiAffinityTerms := state.topologyToMatchedAntiAffinityTerms
if antiAffinityTerms := schedutil.GetPodAntiAffinityTerms(affinity.PodAntiAffinity); len(antiAffinityTerms) > 0 {
matchExists := nodeMatchesAnyTopologyTerm(pod, topologyToMatchedAntiAffinityTerms, nodeInfo, antiAffinityTerms)
if matchExists {
return true, false, nil
}
if nodeMatchesAnyAntiAffinityTerm(nodeInfo, state) {
return true, false, nil
}
return true, true, nil
@ -513,6 +402,10 @@ func (pl *InterPodAffinity) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
// Filter invoked at the filter extension point.
// It checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration.
func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if nodeInfo.Node() == nil {
return framework.NewStatus(framework.Error, "node not found")
}
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
@ -526,11 +419,7 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy
}
// Now check if <pod> requirements will be satisfied on this node.
affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return nil
}
if satisfiesAffinity, satisfiesAntiAffinity, err := pl.satisfiesPodsAffinityAntiAffinity(pod, state, nodeInfo, affinity); err != nil || !satisfiesAffinity || !satisfiesAntiAffinity {
if satisfiesAffinity, satisfiesAntiAffinity, err := pl.satisfiesPodsAffinityAntiAffinity(state, nodeInfo); err != nil || !satisfiesAffinity || !satisfiesAntiAffinity {
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}

View File

@ -2035,7 +2035,6 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
pod *v1.Pod
wantAffinityPodsMap topologyToMatchedTermCount
wantAntiAffinityPodsMap topologyToMatchedTermCount
wantErr bool
}{
{
name: "nil test",
@ -2195,11 +2194,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := cache.NewSnapshot(tt.existingPods, tt.nodes)
l, _ := s.NodeInfos().List()
gotAffinityPodsMap, gotAntiAffinityPodsMap, err := getTPMapMatchingIncomingAffinityAntiAffinity(tt.pod, l)
if (err != nil) != tt.wantErr {
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() error = %v, wantErr %v", err, tt.wantErr)
return
}
gotAffinityPodsMap, gotAntiAffinityPodsMap := getTPMapMatchingIncomingAffinityAntiAffinity(framework.NewPodInfo(tt.pod), l)
if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) {
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap)
}

View File

@ -48,7 +48,7 @@ func (s *preScoreState) Clone() framework.StateData {
// A "processed" representation of v1.WeightedAffinityTerm.
type weightedAffinityTerm struct {
affinityTerm
framework.AffinityTerm
weight int32
}
@ -58,7 +58,7 @@ func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32
if err != nil {
return nil, err
}
return &weightedAffinityTerm{affinityTerm: affinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey}, weight: weight}, nil
return &weightedAffinityTerm{AffinityTerm: framework.AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, weight: weight}, nil
}
func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
@ -87,13 +87,13 @@ func (m scoreMap) processTerm(
return
}
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.Namespaces, term.Selector)
tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey]
if match && tpValueExist {
if m[term.topologyKey] == nil {
m[term.topologyKey] = make(map[string]int64)
if m[term.TopologyKey] == nil {
m[term.TopologyKey] = make(map[string]int64)
}
m[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
m[term.TopologyKey][tpValue] += int64(term.weight * int32(multiplier))
}
return
}

View File

@ -26,6 +26,8 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",

View File

@ -25,6 +25,9 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -65,13 +68,54 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).
type PodInfo struct {
Pod *v1.Pod
Pod *v1.Pod
RequiredAffinityTerms []AffinityTerm
RequiredAntiAffinityTerms []AffinityTerm
}
// AffinityTerm is a processed version of v1.PodAffinityTerm.
type AffinityTerm struct {
Namespaces sets.String
Selector labels.Selector
TopologyKey string
}
func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.Errorf("Cannot process label selector: %v", err)
return nil
}
return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}
}
// getAffinityTerms receives a Pod and affinity terms and returns the namespaces and
// selectors of the terms.
func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) []AffinityTerm {
if v1Terms == nil {
return nil
}
var terms []AffinityTerm
for _, term := range v1Terms {
t := newAffinityTerm(pod, &term)
if t == nil {
// We get here if the label selector failed to process, this is not supposed
// to happen because the pod should have been validated by the api server.
return nil
}
terms = append(terms, *t)
}
return terms
}
// NewPodInfo return a new PodInfo
func NewPodInfo(pod *v1.Pod) *PodInfo {
return &PodInfo{
Pod: pod,
Pod: pod,
RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)),
RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)),
}
}

View File

@ -532,21 +532,19 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulableQ.podInfoMap {
up := pInfo.Pod
affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil {
terms := util.GetPodAffinityTerms(affinity.PodAffinity)
for _, term := range terms {
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
}
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, pInfo)
break
}
terms := util.GetPodAffinityTerms(up.Spec.Affinity)
for _, term := range terms {
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
}
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, pInfo)
break
}
}
}
return podsToMove
}

View File

@ -83,28 +83,28 @@ func MoreImportantPod(pod1, pod2 *v1.Pod) bool {
}
// GetPodAffinityTerms gets pod affinity terms by a pod affinity object.
func GetPodAffinityTerms(podAffinity *v1.PodAffinity) (terms []v1.PodAffinityTerm) {
if podAffinity != nil {
if len(podAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
func GetPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
if affinity != nil && affinity.PodAffinity != nil {
if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms
}
// GetPodAntiAffinityTerms gets pod affinity terms by a pod anti-affinity.
func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.PodAffinityTerm) {
if podAntiAffinity != nil {
if len(podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = podAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
func GetPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
if affinity != nil && affinity.PodAntiAffinity != nil {
if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, podAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
}
return terms