Added pre-processed preferred affinity terms to scheduler's PodInfo type.

This commit is contained in:
Abdullah Gharaibeh 2020-05-15 09:00:56 -04:00
parent 71277de4d6
commit 087839daf7
3 changed files with 88 additions and 123 deletions

View File

@ -15,7 +15,6 @@ go_library(
"//pkg/scheduler/internal/parallelize:go_default_library", "//pkg/scheduler/internal/parallelize:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",

View File

@ -21,8 +21,6 @@ import (
"fmt" "fmt"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize" "k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
@ -35,9 +33,8 @@ type scoreMap map[string]map[string]int64
// preScoreState computed at PreScore and used at Score. // preScoreState computed at PreScore and used at Score.
type preScoreState struct { type preScoreState struct {
topologyScore scoreMap topologyScore scoreMap
affinityTerms []*weightedAffinityTerm podInfo *framework.PodInfo
antiAffinityTerms []*weightedAffinityTerm
} }
// Clone implements the mandatory Clone interface. We don't really copy the data since // Clone implements the mandatory Clone interface. We don't really copy the data since
@ -46,39 +43,8 @@ func (s *preScoreState) Clone() framework.StateData {
return s return s
} }
// A "processed" representation of v1.WeightedAffinityTerm.
type weightedAffinityTerm struct {
framework.AffinityTerm
weight int32
}
func newWeightedAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm, weight int32) (*weightedAffinityTerm, error) {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
return &weightedAffinityTerm{AffinityTerm: framework.AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey}, weight: weight}, nil
}
func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]*weightedAffinityTerm, error) {
if v1Terms == nil {
return nil, nil
}
var terms []*weightedAffinityTerm
for i := range v1Terms {
p, err := newWeightedAffinityTerm(pod, &v1Terms[i].PodAffinityTerm, v1Terms[i].Weight)
if err != nil {
return nil, err
}
terms = append(terms, p)
}
return terms, nil
}
func (m scoreMap) processTerm( func (m scoreMap) processTerm(
term *weightedAffinityTerm, term *framework.WeightedAffinityTerm,
podToCheck *v1.Pod, podToCheck *v1.Pod,
fixedNode *v1.Node, fixedNode *v1.Node,
multiplier int, multiplier int,
@ -93,14 +59,14 @@ func (m scoreMap) processTerm(
if m[term.TopologyKey] == nil { if m[term.TopologyKey] == nil {
m[term.TopologyKey] = make(map[string]int64) m[term.TopologyKey] = make(map[string]int64)
} }
m[term.TopologyKey][tpValue] += int64(term.weight * int32(multiplier)) m[term.TopologyKey][tpValue] += int64(term.Weight * int32(multiplier))
} }
return return
} }
func (m scoreMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) { func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
for _, term := range terms { for _, term := range terms {
m.processTerm(term, podToCheck, fixedNode, multiplier) m.processTerm(&term, podToCheck, fixedNode, multiplier)
} }
} }
@ -117,63 +83,44 @@ func (m scoreMap) append(other scoreMap) {
} }
} }
func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *framework.NodeInfo, incomingPod *v1.Pod, topoScore scoreMap) error { func (pl *InterPodAffinity) processExistingPod(
existingPodAffinity := existingPod.Spec.Affinity state *preScoreState,
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil existingPod *framework.PodInfo,
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil existingPodNodeInfo *framework.NodeInfo,
incomingPod *v1.Pod,
topoScore scoreMap,
) {
existingPodNode := existingPodNodeInfo.Node() existingPodNode := existingPodNodeInfo.Node()
// For every soft pod affinity term of <pod>, if <existingPod> matches the term, // For every soft pod affinity term of <pod>, if <existingPod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey> // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPods>`s node by the term`s weight. // value as that of <existingPods>`s node by the term`s weight.
topoScore.processTerms(state.affinityTerms, existingPod, existingPodNode, 1) topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, existingPodNode, 1)
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term, // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey> // decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>`s node by the term`s weight. // value as that of <existingPod>`s node by the term`s weight.
topoScore.processTerms(state.antiAffinityTerms, existingPod, existingPodNode, -1) topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, existingPodNode, -1)
if existingHasAffinityConstraints { // For every hard pod affinity term of <existingPod>, if <pod> matches the term,
// For every hard pod affinity term of <existingPod>, if <pod> matches the term, // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey> // value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
// value as that of <existingPod>'s node by the constant <ipa.hardPodAffinityWeight> if pl.args.HardPodAffinityWeight > 0 {
if pl.args.HardPodAffinityWeight > 0 { for _, term := range existingPod.RequiredAffinityTerms {
terms := existingPodAffinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution t := framework.WeightedAffinityTerm{AffinityTerm: term, Weight: pl.args.HardPodAffinityWeight}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution. topoScore.processTerm(&t, incomingPod, existingPodNode, 1)
//if len(existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, existingPodAffinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
for i := range terms {
term := &terms[i]
processedTerm, err := newWeightedAffinityTerm(existingPod, term, pl.args.HardPodAffinityWeight)
if err != nil {
return err
}
topoScore.processTerm(processedTerm, incomingPod, existingPodNode, 1)
}
}
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight.
terms, err := getWeightedAffinityTerms(existingPod, existingPodAffinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
if err != nil {
klog.Error(err)
return nil
} }
}
topoScore.processTerms(terms, incomingPod, existingPodNode, 1) // For every soft pod affinity term of <existingPod>, if <pod> matches the term,
} // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
if existingHasAntiAffinityConstraints { // value as that of <existingPod>'s node by the term's weight.
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term, topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, existingPodNode, 1)
// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
// value as that of <existingPod>'s node by the term's weight. // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
terms, err := getWeightedAffinityTerms(existingPod, existingPodAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
if err != nil { // value as that of <existingPod>'s node by the term's weight.
return err topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, existingPodNode, -1)
}
topoScore.processTerms(terms, incomingPod, existingPodNode, -1)
}
return nil
} }
// PreScore builds and writes cycle state used by Score and NormalizeScore. // PreScore builds and writes cycle state used by Score and NormalizeScore.
@ -198,40 +145,25 @@ func (pl *InterPodAffinity) PreScore(
// Unless the pod being scheduled has affinity terms, we only // Unless the pod being scheduled has affinity terms, we only
// need to process nodes hosting pods with affinity. // need to process nodes hosting pods with affinity.
allNodes, err := pl.sharedLister.NodeInfos().HavePodsWithAffinityList() var allNodes []*framework.NodeInfo
if err != nil { var err error
framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err))
}
if hasAffinityConstraints || hasAntiAffinityConstraints { if hasAffinityConstraints || hasAntiAffinityConstraints {
allNodes, err = pl.sharedLister.NodeInfos().List() allNodes, err = pl.sharedLister.NodeInfos().List()
if err != nil { if err != nil {
framework.NewStatus(framework.Error, fmt.Sprintf("get all nodes from shared lister error, err: %v", err)) framework.NewStatus(framework.Error, fmt.Sprintf("get all nodes from shared lister error, err: %v", err))
} }
} } else {
allNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
var affinityTerms []*weightedAffinityTerm if err != nil {
var antiAffinityTerms []*weightedAffinityTerm framework.NewStatus(framework.Error, fmt.Sprintf("get pods with affinity list error, err: %v", err))
if hasAffinityConstraints {
if affinityTerms, err = getWeightedAffinityTerms(pod, affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
}
}
if hasAntiAffinityConstraints {
if antiAffinityTerms, err = getWeightedAffinityTerms(pod, affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution); err != nil {
klog.Error(err)
return nil
} }
} }
state := &preScoreState{ state := &preScoreState{
topologyScore: make(map[string]map[string]int64), topologyScore: make(map[string]map[string]int64),
affinityTerms: affinityTerms, podInfo: framework.NewPodInfo(pod),
antiAffinityTerms: antiAffinityTerms,
} }
errCh := parallelize.NewErrorChannel()
ctx, cancel := context.WithCancel(pCtx)
processNode := func(i int) { processNode := func(i int) {
nodeInfo := allNodes[i] nodeInfo := allNodes[i]
if nodeInfo.Node() == nil { if nodeInfo.Node() == nil {
@ -247,10 +179,7 @@ func (pl *InterPodAffinity) PreScore(
topoScore := make(scoreMap) topoScore := make(scoreMap)
for _, existingPod := range podsToProcess { for _, existingPod := range podsToProcess {
if err := pl.processExistingPod(state, existingPod.Pod, nodeInfo, pod, topoScore); err != nil { pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
errCh.SendErrorWithCancel(err, cancel)
return
}
} }
if len(topoScore) > 0 { if len(topoScore) > 0 {
pl.Lock() pl.Lock()
@ -258,10 +187,7 @@ func (pl *InterPodAffinity) PreScore(
pl.Unlock() pl.Unlock()
} }
} }
parallelize.Until(ctx, len(allNodes), processNode) parallelize.Until(context.Background(), len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
cycleState.Write(preScoreStateKey, state) cycleState.Write(preScoreStateKey, state)
return nil return nil

View File

@ -68,9 +68,11 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
// accelerate processing. This information is typically immutable (e.g., pre-processed // accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors). // inter-pod affinity selectors).
type PodInfo struct { type PodInfo struct {
Pod *v1.Pod Pod *v1.Pod
RequiredAffinityTerms []AffinityTerm RequiredAffinityTerms []AffinityTerm
RequiredAntiAffinityTerms []AffinityTerm RequiredAntiAffinityTerms []AffinityTerm
PreferredAffinityTerms []WeightedAffinityTerm
PreferredAntiAffinityTerms []WeightedAffinityTerm
} }
// AffinityTerm is a processed version of v1.PodAffinityTerm. // AffinityTerm is a processed version of v1.PodAffinityTerm.
@ -80,6 +82,12 @@ type AffinityTerm struct {
TopologyKey string TopologyKey string
} }
// WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm.
type WeightedAffinityTerm struct {
AffinityTerm
Weight int32
}
func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm { func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) *AffinityTerm {
namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term) namespaces := schedutil.GetNamespacesFromPodAffinityTerm(pod, term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
@ -110,12 +118,44 @@ func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) []AffinityTerm
return terms return terms
} }
// getWeightedAffinityTerms returns the list of processed affinity terms.
func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) []WeightedAffinityTerm {
if v1Terms == nil {
return nil
}
var terms []WeightedAffinityTerm
for _, term := range v1Terms {
t := newAffinityTerm(pod, &term.PodAffinityTerm)
if t == nil {
// We get here if the label selector failed to process, this is not supposed
// to happen because the pod should have been validated by the api server.
return nil
}
terms = append(terms, WeightedAffinityTerm{AffinityTerm: *t, Weight: term.Weight})
}
return terms
}
// NewPodInfo return a new PodInfo // NewPodInfo return a new PodInfo
func NewPodInfo(pod *v1.Pod) *PodInfo { func NewPodInfo(pod *v1.Pod) *PodInfo {
var preferredAffinityTerms []v1.WeightedPodAffinityTerm
var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm
if affinity := pod.Spec.Affinity; affinity != nil {
if a := affinity.PodAffinity; a != nil {
preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
}
if a := affinity.PodAntiAffinity; a != nil {
preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
}
}
return &PodInfo{ return &PodInfo{
Pod: pod, Pod: pod,
RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)), RequiredAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAffinityTerms(pod.Spec.Affinity)),
RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)), RequiredAntiAffinityTerms: getAffinityTerms(pod, schedutil.GetPodAntiAffinityTerms(pod.Spec.Affinity)),
PreferredAffinityTerms: getWeightedAffinityTerms(pod, preferredAffinityTerms),
PreferredAntiAffinityTerms: getWeightedAffinityTerms(pod, preferredAntiAffinityTerms),
} }
} }