diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD index 2e8478b8616..64089c7e297 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD @@ -15,7 +15,6 @@ go_library( "//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index 5d207eb3525..ab1474f2e6f 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -21,8 +21,6 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" schedutil "k8s.io/kubernetes/pkg/scheduler/util" @@ -35,9 +33,8 @@ type scoreMap map[string]map[string]int64 // preScoreState computed at PreScore and used at Score. type preScoreState struct { - topologyScore scoreMap - affinityTerms []*weightedAffinityTerm - antiAffinityTerms []*weightedAffinityTerm + topologyScore scoreMap + podInfo *framework.PodInfo } // Clone implements the mandatory Clone interface. We don't really copy the data since @@ -46,39 +43,8 @@ func (s *preScoreState) Clone() framework.StateData { return s } -// A "processed" representation of v1.WeightedAffinityTerm. -type weightedAffinityTerm struct { - framework.AffinityTerm - weight int32 -} - -func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) { - namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term) - selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) - if err != nil { - return nil, err - } - return &weightedAffinityTerm{AffinityTerm: framework.AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, weight: weight}, nil -} - -func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) { - if v1Terms == nil { - return nil, nil - } - - var terms []*weightedAffinityTerm - for i := range v1Terms { - p, err := newWeightedAffinityTerm(pod, &v1Terms[i].PodAffinityTerm, v1Terms[i].Weight) - if err != nil { - return nil, err - } - terms = append(terms, p) - } - return terms, nil -} - func (m scoreMap) processTerm( - term *weightedAffinityTerm, + term *framework.WeightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int, @@ -93,14 +59,14 @@ func (m scoreMap) processTerm( if m[term.TopologyKey] == nil { m[term.TopologyKey] = make(map[string]int64) } - m[term.TopologyKey][tpValue] += int64(term.weight * int32(multiplier)) + m[term.TopologyKey][tpValue] += int64(term.Weight * int32(multiplier)) } return } -func (m scoreMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { +func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { for _, term := range terms { - m.processTerm(term, podToCheck, fixedNode, multiplier) + m.processTerm(&term, podToCheck, fixedNode, multiplier) } } @@ -117,63 +83,44 @@ func (m scoreMap) append(other scoreMap) { } } -func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *framework.NodeInfo, incomingPod *v1.Pod, topoScore scoreMap) error { - existingPodAffinity := existingPod.Spec.Affinity - existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil - existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil +func (pl *InterPodAffinity) processExistingPod( + state *preScoreState, + existingPod *framework.PodInfo, + existingPodNodeInfo *framework.NodeInfo, + incomingPod *v1.Pod, + topoScore scoreMap, +) { existingPodNode := existingPodNodeInfo.Node() // For every soft pod affinity term of , if matches the term, // increment for every node in the cluster with the same // value as that of `s node by the term`s weight. - topoScore.processTerms(state.affinityTerms, existingPod, existingPodNode, 1) + topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, existingPodNode, 1) // For every soft pod anti-affinity term of , if matches the term, // decrement for every node in the cluster with the same // value as that of `s node by the term`s weight. - topoScore.processTerms(state.antiAffinityTerms, existingPod, existingPodNode, -1) + topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, existingPodNode, -1) - if existingHasAffinityConstraints { - // For every hard pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of 's node by the constant - if pl.args.HardPodAffinityWeight > 0 { - terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution - // TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. - //if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 { - // terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...) - //} - for i := range terms { - term := &terms[i] - processedTerm, err := newWeightedAffinityTerm(existingPod, term, pl.args.HardPodAffinityWeight) - if err != nil { - return err - } - topoScore.processTerm(processedTerm, incomingPod, existingPodNode, 1) - } - } - // For every soft pod affinity term of , if matches the term, - // increment for every node in the cluster with the same - // value as that of 's node by the term's weight. - terms, err := getWeightedAffinityTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) - if err != nil { - klog.Error(err) - return nil + // For every hard pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the constant + if pl.args.HardPodAffinityWeight > 0 { + for _, term := range existingPod.RequiredAffinityTerms { + t := framework.WeightedAffinityTerm{AffinityTerm: term, Weight: pl.args.HardPodAffinityWeight} + topoScore.processTerm(&t, incomingPod, existingPodNode, 1) } + } - topoScore.processTerms(terms, incomingPod, existingPodNode, 1) - } - if existingHasAntiAffinityConstraints { - // For every soft pod anti-affinity term of , if matches the term, - // decrement for every node in the cluster with the same - // value as that of 's node by the term's weight. - terms, err := getWeightedAffinityTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) - if err != nil { - return err - } - topoScore.processTerms(terms, incomingPod, existingPodNode, -1) - } - return nil + // For every soft pod affinity term of , if matches the term, + // increment for every node in the cluster with the same + // value as that of 's node by the term's weight. + topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, existingPodNode, 1) + + // For every soft pod anti-affinity term of , if matches the term, + // decrement for every node in the cluster with the same + // value as that of 's node by the term's weight. + topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, existingPodNode, -1) } // PreScore builds and writes cycle state used by Score and NormalizeScore. @@ -198,40 +145,25 @@ func (pl *InterPodAffinity) PreScore( // Unless the pod being scheduled has affinity terms, we only // need to process nodes hosting pods with affinity. - allNodes, err := pl.sharedLister.NodeInfos().HavePodsWithAffinityList() - if err != nil { - framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err)) - } + var allNodes []*framework.NodeInfo + var err error if hasAffinityConstraints || hasAntiAffinityConstraints { allNodes, err = pl.sharedLister.NodeInfos().List() if err != nil { framework.NewStatus(framework.Error, fmt.Sprintf("get all nodes from shared lister error, err: %v", err)) } - } - - var affinityTerms []*weightedAffinityTerm - var antiAffinityTerms []*weightedAffinityTerm - if hasAffinityConstraints { - if affinityTerms, err = getWeightedAffinityTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { - klog.Error(err) - return nil - } - } - if hasAntiAffinityConstraints { - if antiAffinityTerms, err = getWeightedAffinityTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil { - klog.Error(err) - return nil + } else { + allNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList() + if err != nil { + framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err)) } } state := &preScoreState{ - topologyScore: make(map[string]map[string]int64), - affinityTerms: affinityTerms, - antiAffinityTerms: antiAffinityTerms, + topologyScore: make(map[string]map[string]int64), + podInfo: framework.NewPodInfo(pod), } - errCh := parallelize.NewErrorChannel() - ctx, cancel := context.WithCancel(pCtx) processNode := func(i int) { nodeInfo := allNodes[i] if nodeInfo.Node() == nil { @@ -247,10 +179,7 @@ func (pl *InterPodAffinity) PreScore( topoScore := make(scoreMap) for _, existingPod := range podsToProcess { - if err := pl.processExistingPod(state, existingPod.Pod, nodeInfo, pod, topoScore); err != nil { - errCh.SendErrorWithCancel(err, cancel) - return - } + pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore) } if len(topoScore) > 0 { pl.Lock() @@ -258,10 +187,7 @@ func (pl *InterPodAffinity) PreScore( pl.Unlock() } } - parallelize.Until(ctx, len(allNodes), processNode) - if err := errCh.ReceiveError(); err != nil { - return framework.NewStatus(framework.Error, err.Error()) - } + parallelize.Until(context.Background(), len(allNodes), processNode) cycleState.Write(preScoreStateKey, state) return nil diff --git a/pkg/scheduler/framework/v1alpha1/types.go b/pkg/scheduler/framework/v1alpha1/types.go index 2cedc25bd4f..60c41d42fba 100644 --- a/pkg/scheduler/framework/v1alpha1/types.go +++ b/pkg/scheduler/framework/v1alpha1/types.go @@ -68,9 +68,11 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo { // accelerate processing. This information is typically immutable (e.g., pre-processed // inter-pod affinity selectors). type PodInfo struct { - Pod *v1.Pod - RequiredAffinityTerms []AffinityTerm - RequiredAntiAffinityTerms []AffinityTerm + Pod *v1.Pod + RequiredAffinityTerms []AffinityTerm + RequiredAntiAffinityTerms []AffinityTerm + PreferredAffinityTerms []WeightedAffinityTerm + PreferredAntiAffinityTerms []WeightedAffinityTerm } // AffinityTerm is a processed version of v1.PodAffinityTerm. @@ -80,6 +82,12 @@ type AffinityTerm struct { TopologyKey string } +// WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm. +type WeightedAffinityTerm struct { + AffinityTerm + Weight int32 +} + func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm { namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) @@ -110,12 +118,44 @@ func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) []AffinityTerm return terms } +// getWeightedAffinityTerms returns the list of processed affinity terms. +func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) []WeightedAffinityTerm { + if v1Terms == nil { + return nil + } + + var terms []WeightedAffinityTerm + for _, term := range v1Terms { + t := newAffinityTerm(pod, &term.PodAffinityTerm) + if t == nil { + // We get here if the label selector failed to process, this is not supposed + // to happen because the pod should have been validated by the api server. + return nil + } + terms = append(terms, WeightedAffinityTerm{AffinityTerm: *t, Weight: term.Weight}) + } + return terms +} + // NewPodInfo return a new PodInfo func NewPodInfo(pod *v1.Pod) *PodInfo { + var preferredAffinityTerms []v1.WeightedPodAffinityTerm + var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm + if affinity := pod.Spec.Affinity; affinity != nil { + if a := affinity.PodAffinity; a != nil { + preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution + } + if a := affinity.PodAntiAffinity; a != nil { + preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution + } + } + return &PodInfo{ - Pod: pod, - RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)), - RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)), + Pod: pod, + RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)), + RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)), + PreferredAffinityTerms: getWeightedAffinityTerms(pod, preferredAffinityTerms), + PreferredAntiAffinityTerms: getWeightedAffinityTerms(pod, preferredAntiAffinityTerms), } }