Merge pull request #91084 from ahg-g/ahg-affinity2

Added pre-processed preferred affinity terms to scheduler's PodInfo type
This commit is contained in:
Kubernetes Prow Robot 2020-05-15 23:02:03 -07:00 committed by GitHub
commit 65f2803c68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 123 deletions

View File

@ -15,7 +15,6 @@ go_library(
"//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -21,8 +21,6 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
@ -35,9 +33,8 @@ type scoreMap map[string]map[string]int64
// preScoreState computed at PreScore and used at Score.
type preScoreState struct {
topologyScore scoreMap
affinityTerms []*weightedAffinityTerm
antiAffinityTerms []*weightedAffinityTerm
topologyScore scoreMap
podInfo *framework.PodInfo
}
// Clone implements the mandatory Clone interface. We don't really copy the data since
@ -46,39 +43,8 @@ func (s *preScoreState) Clone() framework.StateData {
return s
}
// A "processed" representation of v1.WeightedAffinityTerm.
type weightedAffinityTerm struct {
framework.AffinityTerm
weight int32
}
func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
return &weightedAffinityTerm{AffinityTerm: framework.AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, weight: weight}, nil
}
func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
if v1Terms == nil {
return nil, nil
}
var terms []*weightedAffinityTerm
for i := range v1Terms {
p, err := newWeightedAffinityTerm(pod, &v1Terms[i].PodAffinityTerm, v1Terms[i].Weight)
if err != nil {
return nil, err
}
terms = append(terms, p)
}
return terms, nil
}
func (m scoreMap) processTerm(
term *weightedAffinityTerm,
term *framework.WeightedAffinityTerm,
podToCheck *v1.Pod,
fixedNode *v1.Node,
multiplier int,
@ -93,14 +59,14 @@ func (m scoreMap) processTerm(
if m[term.TopologyKey] == nil {
m[term.TopologyKey] = make(map[string]int64)
}
m[term.TopologyKey][tpValue] += int64(term.weight * int32(multiplier))
m[term.TopologyKey][tpValue] += int64(term.Weight * int32(multiplier))
}
return
}
func (m scoreMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
for _, term := range terms {
m.processTerm(term, podToCheck, fixedNode, multiplier)
m.processTerm(&term, podToCheck, fixedNode, multiplier)
}
}
@ -117,63 +83,44 @@ func (m scoreMap) append(other scoreMap) {
}
}
func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *framework.NodeInfo, incomingPod *v1.Pod, topoScore scoreMap) error {
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
func (pl *InterPodAffinity) processExistingPod(
state *preScoreState,
existingPod *framework.PodInfo,
existingPodNodeInfo *framework.NodeInfo,
incomingPod *v1.Pod,
topoScore scoreMap,
) {
existingPodNode := existingPodNodeInfo.Node()
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight.
topoScore.processTerms(state.affinityTerms, existingPod, existingPodNode, 1)
topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, existingPodNode, 1)
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight.
topoScore.processTerms(state.antiAffinityTerms, existingPod, existingPodNode, -1)
topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, existingPodNode, -1)
if existingHasAffinityConstraints {
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight>
if pl.args.HardPodAffinityWeight > 0 {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for i := range terms {
term := &terms[i]
processedTerm, err := newWeightedAffinityTerm(existingPod, term, pl.args.HardPodAffinityWeight)
if err != nil {
return err
}
topoScore.processTerm(processedTerm, incomingPod, existingPodNode, 1)
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getWeightedAffinityTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
klog.Error(err)
return nil
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
if pl.args.HardPodAffinityWeight > 0 {
for _, term := range existingPod.RequiredAffinityTerms {
t := framework.WeightedAffinityTerm{AffinityTerm: term, Weight: pl.args.HardPodAffinityWeight}
topoScore.processTerm(&t, incomingPod, existingPodNode, 1)
}
}
topoScore.processTerms(terms, incomingPod, existingPodNode, 1)
}
if existingHasAntiAffinityConstraints {
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getWeightedAffinityTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
return err
}
topoScore.processTerms(terms, incomingPod, existingPodNode, -1)
}
return nil
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, existingPodNode, 1)
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, existingPodNode, -1)
}
// PreScore builds and writes cycle state used by Score and NormalizeScore.
@ -198,40 +145,25 @@ func (pl *InterPodAffinity) PreScore(
// Unless the pod being scheduled has affinity terms, we only
// need to process nodes hosting pods with affinity.
allNodes, err := pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err))
}
var allNodes []*framework.NodeInfo
var err error
if hasAffinityConstraints || hasAntiAffinityConstraints {
allNodes, err = pl.sharedLister.NodeInfos().List()
if err != nil {
framework.NewStatus(framework.Error, fmt.Sprintf("get all nodes from shared lister error, err: %v", err))
}
}
var affinityTerms []*weightedAffinityTerm
var antiAffinityTerms []*weightedAffinityTerm
if hasAffinityConstraints {
if affinityTerms, err = getWeightedAffinityTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}
if hasAntiAffinityConstraints {
if antiAffinityTerms, err = getWeightedAffinityTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
} else {
allNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err))
}
}
state := &preScoreState{
topologyScore: make(map[string]map[string]int64),
affinityTerms: affinityTerms,
antiAffinityTerms: antiAffinityTerms,
topologyScore: make(map[string]map[string]int64),
podInfo: framework.NewPodInfo(pod),
}
errCh := parallelize.NewErrorChannel()
ctx, cancel := context.WithCancel(pCtx)
processNode := func(i int) {
nodeInfo := allNodes[i]
if nodeInfo.Node() == nil {
@ -247,10 +179,7 @@ func (pl *InterPodAffinity) PreScore(
topoScore := make(scoreMap)
for _, existingPod := range podsToProcess {
if err := pl.processExistingPod(state, existingPod.Pod, nodeInfo, pod, topoScore); err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
}
if len(topoScore) > 0 {
pl.Lock()
@ -258,10 +187,7 @@ func (pl *InterPodAffinity) PreScore(
pl.Unlock()
}
}
parallelize.Until(ctx, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
parallelize.Until(context.Background(), len(allNodes), processNode)
cycleState.Write(preScoreStateKey, state)
return nil

View File

@ -68,9 +68,11 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).
type PodInfo struct {
Pod *v1.Pod
RequiredAffinityTerms []AffinityTerm
RequiredAntiAffinityTerms []AffinityTerm
Pod *v1.Pod
RequiredAffinityTerms []AffinityTerm
RequiredAntiAffinityTerms []AffinityTerm
PreferredAffinityTerms []WeightedAffinityTerm
PreferredAntiAffinityTerms []WeightedAffinityTerm
}
// AffinityTerm is a processed version of v1.PodAffinityTerm.
@ -80,6 +82,12 @@ type AffinityTerm struct {
TopologyKey string
}
// WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm.
type WeightedAffinityTerm struct {
AffinityTerm
Weight int32
}
func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
@ -110,12 +118,44 @@ func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) []AffinityTerm
return terms
}
// getWeightedAffinityTerms returns the list of processed affinity terms.
func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) []WeightedAffinityTerm {
if v1Terms == nil {
return nil
}
var terms []WeightedAffinityTerm
for _, term := range v1Terms {
t := newAffinityTerm(pod, &term.PodAffinityTerm)
if t == nil {
// We get here if the label selector failed to process, this is not supposed
// to happen because the pod should have been validated by the api server.
return nil
}
terms = append(terms, WeightedAffinityTerm{AffinityTerm: *t, Weight: term.Weight})
}
return terms
}
// NewPodInfo return a new PodInfo
func NewPodInfo(pod *v1.Pod) *PodInfo {
var preferredAffinityTerms []v1.WeightedPodAffinityTerm
var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm
if affinity := pod.Spec.Affinity; affinity != nil {
if a := affinity.PodAffinity; a != nil {
preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
}
if a := affinity.PodAntiAffinity; a != nil {
preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
}
}
return &PodInfo{
Pod: pod,
RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)),
RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)),
Pod: pod,
RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)),
RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)),
PreferredAffinityTerms: getWeightedAffinityTerms(pod, preferredAffinityTerms),
PreferredAntiAffinityTerms: getWeightedAffinityTerms(pod, preferredAntiAffinityTerms),
}
}