Use ErrorChannel to communicate errors during parallel execution in interpod_afiinity.

This commit is contained in:
Cong Liu 2019-07-29 20:44:50 -04:00
parent 1871f75b32
commit 9663c33747

View File

@ -18,7 +18,6 @@ package priorities
import ( import (
"context" "context"
"sync"
"sync/atomic" "sync/atomic"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -30,6 +29,7 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog" "k8s.io/klog"
) )
@ -58,15 +58,11 @@ func NewInterPodAffinityPriority(
} }
type podAffinityPriorityMap struct { type podAffinityPriorityMap struct {
sync.Mutex
// nodes contain all nodes that should be considered // nodes contain all nodes that should be considered
nodes []*v1.Node nodes []*v1.Node
// counts store the mapping from node name to so-far computed score of // counts store the mapping from node name to so-far computed score of
// the node. // the node.
counts map[string]*int64 counts map[string]*int64
// The first error that we faced.
firstError error
} }
func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
@ -76,20 +72,11 @@ func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
} }
} }
func (p *podAffinityPriorityMap) setError(err error) { func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error {
p.Lock()
defer p.Unlock()
if p.firstError == nil {
p.firstError = err
}
}
func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil { if err != nil {
p.setError(err) return err
return
} }
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector)
if match { if match {
@ -99,13 +86,17 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini
} }
} }
} }
return nil
} }
func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for i := range terms { for i := range terms {
term := &terms[i] term := &terms[i]
p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))) if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil {
return err
}
} }
return nil
} }
// CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding // CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
@ -152,14 +143,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey> // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight. // value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, 1) if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return err
}
} }
if hasAntiAffinityConstraints { if hasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term, // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey> // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight. // value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, -1) if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return err
}
} }
if existingHasAffinityConstraints { if existingHasAffinityConstraints {
@ -173,24 +168,33 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//} //}
for _, term := range terms { for _, term := range terms {
pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)) if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)); err != nil {
return err
}
} }
} }
// For every soft pod affinity term of <existingPod>, if <pod> matches the term, // For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey> // increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight. // value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, existingPod, pod, existingPodNode, 1) if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return err
}
} }
if existingHasAntiAffinityConstraints { if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term, // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey> // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight. // value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, existingPod, pod, existingPodNode, -1) if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return err
}
} }
return nil return nil
} }
errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) { processNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]] nodeInfo := nodeNameToInfo[allNodeNames[i]]
if nodeInfo.Node() != nil { if nodeInfo.Node() != nil {
@ -198,7 +202,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// We need to process all the pods. // We need to process all the pods.
for _, existingPod := range nodeInfo.Pods() { for _, existingPod := range nodeInfo.Pods() {
if err := processPod(existingPod); err != nil { if err := processPod(existingPod); err != nil {
pm.setError(err) errCh.SendErrorWithCancel(err, cancel)
} }
} }
} else { } else {
@ -206,15 +210,15 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// ones that have some. // ones that have some.
for _, existingPod := range nodeInfo.PodsWithAffinity() { for _, existingPod := range nodeInfo.PodsWithAffinity() {
if err := processPod(existingPod); err != nil { if err := processPod(existingPod); err != nil {
pm.setError(err) errCh.SendErrorWithCancel(err, cancel)
} }
} }
} }
} }
} }
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
if pm.firstError != nil { if err := errCh.ReceiveError(); err != nil {
return nil, pm.firstError return nil, err
} }
for _, node := range nodes { for _, node := range nodes {