From 9663c337475768f50d999c8f9ff58a0481b6a56e Mon Sep 17 00:00:00 2001 From: Cong Liu Date: Mon, 29 Jul 2019 20:44:50 -0400 Subject: [PATCH] Use ErrorChannel to communicate errors during parallel execution in interpod_afiinity. --- .../algorithm/priorities/interpod_affinity.go | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go index 2f570630381..6e5ac48c26a 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -18,7 +18,6 @@ package priorities import ( "context" - "sync" "sync/atomic" "k8s.io/api/core/v1" @@ -30,6 +29,7 @@ import ( priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/klog" ) @@ -58,15 +58,11 @@ func NewInterPodAffinityPriority( } type podAffinityPriorityMap struct { - sync.Mutex - // nodes contain all nodes that should be considered nodes []*v1.Node // counts store the mapping from node name to so-far computed score of // the node. counts map[string]*int64 - // The first error that we faced. - firstError error } func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { @@ -76,20 +72,11 @@ func newPodAffinityPriorityMap(nodes []*v1.Node) *podAffinityPriorityMap { } } -func (p *podAffinityPriorityMap) setError(err error) { - p.Lock() - defer p.Unlock() - if p.firstError == nil { - p.firstError = err - } -} - -func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) { +func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, weight int64) error { namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(podDefiningAffinityTerm, term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { - p.setError(err) - return + return err } match := priorityutil.PodMatchesTermsNamespaceAndSelector(podToCheck, namespaces, selector) if match { @@ -99,13 +86,17 @@ func (p *podAffinityPriorityMap) processTerm(term *v1.PodAffinityTerm, podDefini } } } + return nil } -func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { +func (p *podAffinityPriorityMap) processTerms(terms []v1.WeightedPodAffinityTerm, podDefiningAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error { for i := range terms { term := &terms[i] - p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))) + if err := p.processTerm(&term.PodAffinityTerm, podDefiningAffinityTerm, podToCheck, fixedNode, int64(term.Weight*int32(multiplier))); err != nil { + return err + } } + return nil } // CalculateInterPodAffinityPriority compute a sum by iterating through the elements of weightedPodAffinityTerm and adding @@ -152,14 +143,18 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. terms := affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution - pm.processTerms(terms, pod, existingPod, existingPodNode, 1) + if err := pm.processTerms(terms, pod, existingPod, existingPodNode, 1); err != nil { + return err + } } if hasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. terms := affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution - pm.processTerms(terms, pod, existingPod, existingPodNode, -1) + if err := pm.processTerms(terms, pod, existingPod, existingPodNode, -1); err != nil { + return err + } } if existingHasAffinityConstraints { @@ -173,24 +168,33 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) //} for _, term := range terms { - pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)) + if err := pm.processTerm(&term, existingPod, pod, existingPodNode, int64(ipa.hardPodAffinityWeight)); err != nil { + return err + } } } // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of 's node by the term's weight. terms := existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution - pm.processTerms(terms, existingPod, pod, existingPodNode, 1) + if err := pm.processTerms(terms, existingPod, pod, existingPodNode, 1); err != nil { + return err + } } if existingHasAntiAffinityConstraints { // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of 's node by the term's weight. terms := existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution - pm.processTerms(terms, existingPod, pod, existingPodNode, -1) + if err := pm.processTerms(terms, existingPod, pod, existingPodNode, -1); err != nil { + return err + } } return nil } + + errCh := schedutil.NewErrorChannel() + ctx, cancel := context.WithCancel(context.Background()) processNode := func(i int) { nodeInfo := nodeNameToInfo[allNodeNames[i]] if nodeInfo.Node() != nil { @@ -198,7 +202,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node // We need to process all the pods. for _, existingPod := range nodeInfo.Pods() { if err := processPod(existingPod); err != nil { - pm.setError(err) + errCh.SendErrorWithCancel(err, cancel) } } } else { @@ -206,15 +210,15 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node // ones that have some. for _, existingPod := range nodeInfo.PodsWithAffinity() { if err := processPod(existingPod); err != nil { - pm.setError(err) + errCh.SendErrorWithCancel(err, cancel) } } } } } - workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode) - if pm.firstError != nil { - return nil, pm.firstError + workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode) + if err := errCh.ReceiveError(); err != nil { + return nil, err } for _, node := range nodes {