optimize preferred pod affinity

This commit is contained in:
Abdullah Gharaibeh 2019-12-05 12:47:39 -05:00
parent a3718d7653
commit 53be26e402
2 changed files with 142 additions and 99 deletions

View File

@ -45,6 +45,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",

View File

@ -23,6 +23,8 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
@ -36,47 +38,67 @@ import (
type topologyPairToScore map[string]map[string]int64 type topologyPairToScore map[string]map[string]int64
type podAffinityPriorityMap struct { type podAffinityPriorityMap struct {
// nodes contain all nodes that should be considered. topologyScore topologyPairToScore
nodes []*v1.Node affinityTerms []*weightedAffinityTerm
// tracks a topology pair score so far. antiAffinityTerms []*weightedAffinityTerm
topologyScore topologyPairToScore hardPodAffinityWeight int32
sync.Mutex sync.Mutex
} }
func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { // A "processed" representation of v1.WeightedAffinityTerm.
return &podAffinityPriorityMap{ type weightedAffinityTerm struct {
nodes: nodes, namespaces sets.String
topologyScore: make(topologyPairToScore), selector labels.Selector
} weight int32
topologyKey string
} }
func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error { func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil { if err != nil {
return err return nil, err
} }
return &weightedAffinityTerm{namespaces: namespaces, selector: selector, topologyKey: term.TopologyKey, weight: weight}, nil
}
func getProcessedTerms(pod *v1.Pod, terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
if terms == nil {
return nil, nil
}
var processedTerms []*weightedAffinityTerm
for i := range terms {
p, err := newWeightedAffinityTerm(pod, &terms[i].PodAffinityTerm, terms[i].Weight)
if err != nil {
return nil, err
}
processedTerms = append(processedTerms, p)
}
return processedTerms, nil
}
func (p *podAffinityPriorityMap) processTerm(term *weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
if len(fixedNode.Labels) == 0 { if len(fixedNode.Labels) == 0 {
return nil return nil
} }
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
tpValue, tpValueExist := fixedNode.Labels[term.TopologyKey] tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
if match && tpValueExist { if match && tpValueExist {
p.Lock() p.Lock()
if p.topologyScore[term.TopologyKey] == nil { if p.topologyScore[term.topologyKey] == nil {
p.topologyScore[term.TopologyKey] = make(map[string]int64) p.topologyScore[term.topologyKey] = make(map[string]int64)
} }
p.topologyScore[term.TopologyKey][tpValue] += weight p.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
p.Unlock() p.Unlock()
} }
return nil return nil
} }
func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { func (p *podAffinityPriorityMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for i := range terms { for _, term := range terms {
term := &terms[i] if err := p.processTerm(term, podToCheck, fixedNode, multiplier); err != nil {
if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil {
return err return err
} }
} }
@ -143,6 +165,75 @@ func CalculateInterPodAffinityPriorityReduce(pod *v1.Pod, meta interface{}, shar
return nil return nil
} }
func (p *podAffinityPriorityMap) processExistingPod(existingPod *v1.Pod, existingPodNodeInfo *schedulernodeinfo.NodeInfo, incomingPod *v1.Pod) error {
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
existingPodNode := existingPodNodeInfo.Node()
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
if err := p.processTerms(p.affinityTerms, existingPod, existingPodNode, 1); err != nil {
return err
}
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
if err := p.processTerms(p.antiAffinityTerms, existingPod, existingPodNode, -1); err != nil {
return err
}
if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if p.hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for i := range terms {
term := &terms[i]
processedTerm, err := newWeightedAffinityTerm(existingPod, term, p.hardPodAffinityWeight)
if err != nil {
return err
}
if err := p.processTerm(processedTerm, incomingPod, existingPodNode, 1); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
klog.Error(err)
return nil
}
if err := p.processTerms(terms, incomingPod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getProcessedTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
return err
}
if err := p.processTerms(terms, incomingPod, existingPodNode, -1); err != nil {
return err
}
}
return nil
}
func buildTopologyPairToScore( func buildTopologyPairToScore(
pod *v1.Pod, pod *v1.Pod,
sharedLister schedulerlisters.SharedLister, sharedLister schedulerlisters.SharedLister,
@ -158,9 +249,8 @@ func buildTopologyPairToScore(
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node. // Unless the pod being scheduled has affinity terms, we only
pm := newPodAffinityPriorityMap(filteredNodes) // need to process nodes hosting pods with affinity.
allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList() allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil { if err != nil {
klog.Errorf("get pods with affinity list error, err: %v", err) klog.Errorf("get pods with affinity list error, err: %v", err)
@ -174,70 +264,26 @@ func buildTopologyPairToScore(
} }
} }
processPod := func(existingPod *v1.Pod) error { var affinityTerms []*weightedAffinityTerm
existingPodNodeInfo, err := sharedLister.NodeInfos().Get(existingPod.Spec.NodeName) var antiAffinityTerms []*weightedAffinityTerm
if err != nil { if hasAffinityConstraints {
klog.Errorf("Node not found, %v", existingPod.Spec.NodeName) if affinityTerms, err = getProcessedTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil return nil
} }
existingPodAffinity := existingPod.Spec.Affinity }
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil if hasAntiAffinityConstraints {
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil if antiAffinityTerms, err = getProcessedTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
existingPodNode := existingPodNodeInfo.Node() klog.Error(err)
return nil
}
}
if hasAffinityConstraints { pm := podAffinityPriorityMap{
// For every soft pod affinity term of <pod>, if <existingPod> matches the term, topologyScore: make(topologyPairToScore),
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey> affinityTerms: affinityTerms,
// value as that of <existingPods>`s node by the term`s weight. antiAffinityTerms: antiAffinityTerms,
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution hardPodAffinityWeight: hardPodAffinityWeight,
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return err
}
}
if hasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return err
}
}
if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if hardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(hardPodAffinityWeight)); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return err
}
}
return nil
} }
errCh := schedutil.NewErrorChannel() errCh := schedutil.NewErrorChannel()
@ -245,22 +291,18 @@ func buildTopologyPairToScore(
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]
if nodeInfo.Node() != nil { if nodeInfo.Node() != nil {
// Unless the pod being scheduled has affinity terms, we only
// need to process pods with affinity in the node.
podsToProcess := nodeInfo.PodsWithAffinity()
if hasAffinityConstraints || hasAntiAffinityConstraints { if hasAffinityConstraints || hasAntiAffinityConstraints {
// We need to process all the pods. // We need to process all the pods.
for _, existingPod := range nodeInfo.Pods() { podsToProcess = nodeInfo.Pods()
if err := processPod(existingPod); err != nil { }
errCh.SendErrorWithCancel(err, cancel)
return for _, existingPod := range podsToProcess {
} if err := pm.processExistingPod(existingPod, nodeInfo, pod); err != nil {
} errCh.SendErrorWithCancel(err, cancel)
} else { return
// The pod doesn't have any constraints - we need to check only existing
// ones that have some.
for _, existingPod := range nodeInfo.PodsWithAffinity() {
if err := processPod(existingPod); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
} }
} }
} }