Merge pull request #80588 from liu-cong/errch

Use ErrorChannel to communicate errors during parallel execution in interpod_afiinity
This commit is contained in:
Kubernetes Prow Robot 2019-07-30 08:53:03 -07:00 committed by GitHub
commit 1f1c1e2ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,7 +18,6 @@ package priorities
import (
"context"
"sync"
"sync/atomic"
"k8s.io/api/core/v1"
@ -30,6 +29,7 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog"
)
@ -58,15 +58,11 @@ func NewInterPodAffinityPriority(
}
type podAffinityPriorityMap struct {
sync.Mutex
// nodes contain all nodes that should be considered
nodes []*v1.Node
// counts store the mapping from node name to so-far computed score of
// the node.
counts map[string]*int64
// The first error that we faced.
firstError error
}
func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
@ -76,20 +72,11 @@ func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap {
}
}
func (p *podAffinityPriorityMap) setError(err error) {
p.Lock()
defer p.Unlock()
if p.firstError == nil {
p.firstError = err
}
}
func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) {
func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
p.setError(err)
return
return err
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector)
if match {
@ -99,14 +86,18 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini
}
}
}
return nil
}
func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
for i := range terms {
term := &terms[i]
p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier)))
if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil {
return err
}
}
return nil
}
// CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding
// "weight" to the sum if the corresponding PodAffinityTerm is satisfied for
@ -152,14 +143,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, 1)
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil {
return err
}
}
if hasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, pod, existingPod, existingPodNode, -1)
if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil {
return err
}
}
if existingHasAffinityConstraints {
@ -173,24 +168,33 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for _, term := range terms {
pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight))
if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)); err != nil {
return err
}
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, existingPod, pod, existingPodNode, 1)
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil {
return err
}
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution
pm.processTerms(terms, existingPod, pod, existingPodNode, -1)
if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil {
return err
}
}
return nil
}
errCh := schedutil.NewErrorChannel()
ctx, cancel := context.WithCancel(context.Background())
processNode := func(i int) {
nodeInfo := nodeNameToInfo[allNodeNames[i]]
if nodeInfo.Node() != nil {
@ -198,7 +202,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// We need to process all the pods.
for _, existingPod := range nodeInfo.Pods() {
if err := processPod(existingPod); err != nil {
pm.setError(err)
errCh.SendErrorWithCancel(err, cancel)
}
}
} else {
@ -206,15 +210,15 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
// ones that have some.
for _, existingPod := range nodeInfo.PodsWithAffinity() {
if err := processPod(existingPod); err != nil {
pm.setError(err)
errCh.SendErrorWithCancel(err, cancel)
}
}
}
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
if pm.firstError != nil {
return nil, pm.firstError
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
if err := errCh.ReceiveError(); err != nil {
return nil, err
}
for _, node := range nodes {