diff --git a/pkg/scheduler/algorithm/priorities/BUILD b/pkg/scheduler/algorithm/priorities/BUILD index dd610b299bc..e47ce9355b0 100644 --- a/pkg/scheduler/algorithm/priorities/BUILD +++ b/pkg/scheduler/algorithm/priorities/BUILD @@ -45,6 +45,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 0e45781ce57..107bbf8181a 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -23,6 +23,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" @@ -36,47 +38,67 @@ import ( type topologyPairToScore map[string]map[string]int64 type podAffinityPriorityMap struct { - // nodes contain all nodes that should be considered. - nodes []*v1.Node - // tracks a topology pair score so far. - topologyScore topologyPairToScore + topologyScore topologyPairToScore + affinityTerms []*weightedAffinityTerm + antiAffinityTerms []*weightedAffinityTerm + hardPodAffinityWeight int32 sync.Mutex } -func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { - return &podAffinityPriorityMap{ - nodes: nodes, - topologyScore: make(topologyPairToScore), - } +// A "processed" representation of v1.WeightedAffinityTerm. +type weightedAffinityTerm struct { + namespaces sets.String + selector labels.Selector + weight int32 + topologyKey string } -func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error { - namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) +func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) { + namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { - return err + return nil, err } + return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil +} + +func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) { + if terms == nil { + return nil, nil + } + + var processedTerms []*weightedAffinityTerm + for i := range terms { + p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight) + if err != nil { + return nil, err + } + processedTerms = append(processedTerms, p) + } + return processedTerms, nil +} + +func (p *podAffinityPriorityMap) processTerm(term *weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { if len(fixedNode.Labels) == 0 { return nil } - match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) - tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey] + match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector) + tpValue, tpValueExist := fixedNode.Labels[term.topologyKey] if match && tpValueExist { p.Lock() - if p.topologyScore[term.TopologyKey] == nil { - p.topologyScore[term.TopologyKey] = make(map[string]int64) + if p.topologyScore[term.topologyKey] == nil { + p.topologyScore[term.topologyKey] = make(map[string]int64) } - p.topologyScore[term.TopologyKey][tpValue] += weight + p.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier)) p.Unlock() } return nil } -func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { - for i := range terms { - term := &terms[i] - if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil { +func (p *podAffinityPriorityMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { + for _, term := range terms { + if err := p.processTerm(term, podToCheck, fixedNode, multiplier); err != nil { return err } } @@ -143,6 +165,75 @@ func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, shar return nil } +func (p *podAffinityPriorityMap) processExistingPod(existingPod *v1.Pod, existingPodNodeInfo *schedulernodeinfo.NodeInfo, incomingPod *v1.Pod) error { + existingPodAffinity := existingPod.Spec.Affinity + existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil + existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil + existingPodNode := existingPodNodeInfo.Node() + + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of `s node by the term`s weight. + if err := p.processTerms(p.affinityTerms, existingPod, existingPodNode, 1); err != nil { + return err + } + + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of `s node by the term`s weight. + if err := p.processTerms(p.antiAffinityTerms, existingPod, existingPodNode, -1); err != nil { + return err + } + + if existingHasAffinityConstraints { + // For every hard pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the constant + if p.hardPodAffinityWeight > 0 { + terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution + // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. + //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { + // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) + //} + for i := range terms { + term := &terms[i] + processedTerm, err := newWeightedAffinityTerm(existingPod, term, p.hardPodAffinityWeight) + if err != nil { + return err + } + if err := p.processTerm(processedTerm, incomingPod, existingPodNode, 1); err != nil { + return err + } + } + } + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the term's weight. + terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if err != nil { + klog.Error(err) + return nil + } + + if err := p.processTerms(terms, incomingPod, existingPodNode, 1); err != nil { + return err + } + } + if existingHasAntiAffinityConstraints { + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of 's node by the term's weight. + terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if err != nil { + return err + } + if err := p.processTerms(terms, incomingPod, existingPodNode, -1); err != nil { + return err + } + } + return nil +} + func buildTopologyPairToScore( pod *v1.Pod, sharedLister schedulerlisters.SharedLister, @@ -158,9 +249,8 @@ func buildTopologyPairToScore( hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil - // pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node. - pm := newPodAffinityPriorityMap(filteredNodes) - + // Unless the pod being scheduled has affinity terms, we only + // need to process nodes hosting pods with affinity. allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList() if err != nil { klog.Errorf("get pods with affinity list error, err: %v", err) @@ -174,70 +264,26 @@ func buildTopologyPairToScore( } } - processPod := func(existingPod *v1.Pod) error { - existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName) - if err != nil { - klog.Errorf("Node not found, %v", existingPod.Spec.NodeName) + var affinityTerms []*weightedAffinityTerm + var antiAffinityTerms []*weightedAffinityTerm + if hasAffinityConstraints { + if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { + klog.Error(err) return nil } - existingPodAffinity := existingPod.Spec.Affinity - existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil - existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil - existingPodNode := existingPodNodeInfo.Node() + } + if hasAntiAffinityConstraints { + if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { + klog.Error(err) + return nil + } + } - if hasAffinityConstraints { - // For every soft pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of `s node by the term`s weight. - terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution - if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil { - return err - } - } - if hasAntiAffinityConstraints { - // For every soft pod anti-affinity term of , if matches the term, - // decrement for every node in the cluster with the same - // value as that of `s node by the term`s weight. - terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution - if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil { - return err - } - } - - if existingHasAffinityConstraints { - // For every hard pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of 's node by the constant - if hardPodAffinityWeight > 0 { - terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution - // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} - for _, term := range terms { - if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(hardPodAffinityWeight)); err != nil { - return err - } - } - } - // For every soft pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of 's node by the term's weight. - terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution - if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil { - return err - } - } - if existingHasAntiAffinityConstraints { - // For every soft pod anti-affinity term of , if matches the term, - // decrement for every node in the cluster with the same - // value as that of 's node by the term's weight. - terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution - if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil { - return err - } - } - return nil + pm := podAffinityPriorityMap{ + topologyScore: make(topologyPairToScore), + affinityTerms: affinityTerms, + antiAffinityTerms: antiAffinityTerms, + hardPodAffinityWeight: hardPodAffinityWeight, } errCh := schedutil.NewErrorChannel() @@ -245,22 +291,18 @@ func buildTopologyPairToScore( processNode := func(i int) { nodeInfo := allNodes[i] if nodeInfo.Node() != nil { + // Unless the pod being scheduled has affinity terms, we only + // need to process pods with affinity in the node. + podsToProcess := nodeInfo.PodsWithAffinity() if hasAffinityConstraints || hasAntiAffinityConstraints { // We need to process all the pods. - for _, existingPod := range nodeInfo.Pods() { - if err := processPod(existingPod); err != nil { - errCh.SendErrorWithCancel(err, cancel) - return - } - } - } else { - // The pod doesn't have any constraints - we need to check only existing - // ones that have some. - for _, existingPod := range nodeInfo.PodsWithAffinity() { - if err := processPod(existingPod); err != nil { - errCh.SendErrorWithCancel(err, cancel) - return - } + podsToProcess = nodeInfo.Pods() + } + + for _, existingPod := range podsToProcess { + if err := pm.processExistingPod(existingPod, nodeInfo, pod); err != nil { + errCh.SendErrorWithCancel(err, cancel) + return } } }