mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #89162 from alculquicondor/affinity-less-lock
Reduce locking when calculating affinity scores
This commit is contained in:
commit
fe2fdcd695
@ -32,9 +32,11 @@ import (
|
||||
// preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring.
|
||||
const preScoreStateKey = "PreScore" + Name
|
||||
|
||||
type scoreMap map[string]map[string]int64
|
||||
|
||||
// preScoreState computed at PreScore and used at Score.
|
||||
type preScoreState struct {
|
||||
topologyScore map[string]map[string]int64
|
||||
topologyScore scoreMap
|
||||
affinityTerms []*weightedAffinityTerm
|
||||
antiAffinityTerms []*weightedAffinityTerm
|
||||
}
|
||||
@ -76,8 +78,7 @@ func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm)
|
||||
return terms, nil
|
||||
}
|
||||
|
||||
func (pl *InterPodAffinity) processTerm(
|
||||
state *preScoreState,
|
||||
func (m scoreMap) processTerm(
|
||||
term *weightedAffinityTerm,
|
||||
podToCheck *v1.Pod,
|
||||
fixedNode *v1.Node,
|
||||
@ -90,24 +91,34 @@ func (pl *InterPodAffinity) processTerm(
|
||||
match := schedutil.PodMatchesTermsNamespaceAndSelector(podToCheck, term.namespaces, term.selector)
|
||||
tpValue, tpValueExist := fixedNode.Labels[term.topologyKey]
|
||||
if match && tpValueExist {
|
||||
pl.Lock()
|
||||
if state.topologyScore[term.topologyKey] == nil {
|
||||
state.topologyScore[term.topologyKey] = make(map[string]int64)
|
||||
if m[term.topologyKey] == nil {
|
||||
m[term.topologyKey] = make(map[string]int64)
|
||||
}
|
||||
state.topologyScore[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
|
||||
pl.Unlock()
|
||||
m[term.topologyKey][tpValue] += int64(term.weight * int32(multiplier))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pl *InterPodAffinity) processTerms(state *preScoreState, terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) error {
|
||||
func (m scoreMap) processTerms(terms []*weightedAffinityTerm, podToCheck *v1.Pod, fixedNode *v1.Node, multiplier int) {
|
||||
for _, term := range terms {
|
||||
pl.processTerm(state, term, podToCheck, fixedNode, multiplier)
|
||||
m.processTerm(term, podToCheck, fixedNode, multiplier)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod) error {
|
||||
func (m scoreMap) append(other scoreMap) {
|
||||
for topology, oScores := range other {
|
||||
scores := m[topology]
|
||||
if scores == nil {
|
||||
m[topology] = oScores
|
||||
continue
|
||||
}
|
||||
for k, v := range oScores {
|
||||
scores[k] += v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod *v1.Pod, existingPodNodeInfo *nodeinfo.NodeInfo, incomingPod *v1.Pod, topoScore scoreMap) error {
|
||||
existingPodAffinity := existingPod.Spec.Affinity
|
||||
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
|
||||
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
|
||||
@ -116,12 +127,12 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
|
||||
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
|
||||
// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
|
||||
// value as that of <existingPods>`s node by the term`s weight.
|
||||
pl.processTerms(state, state.affinityTerms, existingPod, existingPodNode, 1)
|
||||
topoScore.processTerms(state.affinityTerms, existingPod, existingPodNode, 1)
|
||||
|
||||
// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
|
||||
// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
|
||||
// value as that of <existingPod>`s node by the term`s weight.
|
||||
pl.processTerms(state, state.antiAffinityTerms, existingPod, existingPodNode, -1)
|
||||
topoScore.processTerms(state.antiAffinityTerms, existingPod, existingPodNode, -1)
|
||||
|
||||
if existingHasAffinityConstraints {
|
||||
// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
|
||||
@ -139,7 +150,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pl.processTerm(state, processedTerm, incomingPod, existingPodNode, 1)
|
||||
topoScore.processTerm(processedTerm, incomingPod, existingPodNode, 1)
|
||||
}
|
||||
}
|
||||
// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
|
||||
@ -151,7 +162,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
|
||||
return nil
|
||||
}
|
||||
|
||||
pl.processTerms(state, terms, incomingPod, existingPodNode, 1)
|
||||
topoScore.processTerms(terms, incomingPod, existingPodNode, 1)
|
||||
}
|
||||
if existingHasAntiAffinityConstraints {
|
||||
// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
|
||||
@ -161,7 +172,7 @@ func (pl *InterPodAffinity) processExistingPod(state *preScoreState, existingPod
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pl.processTerms(state, terms, incomingPod, existingPodNode, -1)
|
||||
topoScore.processTerms(terms, incomingPod, existingPodNode, -1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -235,12 +246,18 @@ func (pl *InterPodAffinity) PreScore(
|
||||
podsToProcess = nodeInfo.Pods()
|
||||
}
|
||||
|
||||
topoScore := make(scoreMap)
|
||||
for _, existingPod := range podsToProcess {
|
||||
if err := pl.processExistingPod(state, existingPod, nodeInfo, pod); err != nil {
|
||||
if err := pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore); err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(topoScore) > 0 {
|
||||
pl.Lock()
|
||||
state.topologyScore.append(topoScore)
|
||||
pl.Unlock()
|
||||
}
|
||||
}
|
||||
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user