mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Track pods with required anti-affinity
This is a performance optimization that reduces the overhead of inter-pod affinity PreFilter calculaitons. Basically eliminates that overhead when no pods in the cluster use required pod anti-affinity. This offered 20% improvement on 5k clusters for preferred anti-affinity benchmarks.
This commit is contained in:
parent
3b5aedcef4
commit
a8873e1a43
@ -163,25 +163,25 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo
|
||||
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
|
||||
// (1) Whether it has PodAntiAffinity
|
||||
// (2) Whether any AffinityTerm matches the incoming pod
|
||||
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount {
|
||||
topoMaps := make([]topologyToMatchedTermCount, len(allNodes))
|
||||
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
|
||||
topoMaps := make([]topologyToMatchedTermCount, len(nodes))
|
||||
index := int32(-1)
|
||||
processNode := func(i int) {
|
||||
nodeInfo := allNodes[i]
|
||||
nodeInfo := nodes[i]
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
klog.Error("node not found")
|
||||
return
|
||||
}
|
||||
topoMap := make(topologyToMatchedTermCount)
|
||||
for _, existingPod := range nodeInfo.PodsWithAffinity {
|
||||
for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
|
||||
topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1)
|
||||
}
|
||||
if len(topoMap) != 0 {
|
||||
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
|
||||
}
|
||||
}
|
||||
parallelize.Until(context.Background(), len(allNodes), processNode)
|
||||
parallelize.Until(context.Background(), len(nodes), processNode)
|
||||
|
||||
result := make(topologyToMatchedTermCount)
|
||||
for i := 0; i <= int(index); i++ {
|
||||
@ -241,12 +241,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al
|
||||
// PreFilter invoked at the prefilter extension point.
|
||||
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
var allNodes []*framework.NodeInfo
|
||||
var havePodsWithAffinityNodes []*framework.NodeInfo
|
||||
var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo
|
||||
var err error
|
||||
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos: %v", err))
|
||||
}
|
||||
if havePodsWithAffinityNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil {
|
||||
if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil {
|
||||
return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err))
|
||||
}
|
||||
|
||||
@ -256,7 +256,7 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
|
||||
}
|
||||
|
||||
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
|
||||
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
|
||||
existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods)
|
||||
|
||||
// incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity
|
||||
// incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity
|
||||
|
@ -245,6 +245,12 @@ func (nodes NodeInfoLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, e
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// HavePodsWithRequiredAntiAffinityList is supposed to list nodes with at least one pod with
|
||||
// required anti-affinity. For the fake lister we just return everything.
|
||||
func (nodes NodeInfoLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes.
|
||||
func NewNodeInfoLister(nodes []*v1.Node) framework.NodeInfoLister {
|
||||
nodeInfoList := make([]*framework.NodeInfo, len(nodes))
|
||||
|
@ -22,6 +22,8 @@ type NodeInfoLister interface {
|
||||
List() ([]*NodeInfo, error)
|
||||
// Returns the list of NodeInfos of nodes with pods with affinity terms.
|
||||
HavePodsWithAffinityList() ([]*NodeInfo, error)
|
||||
// Returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
|
||||
HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
|
||||
// Returns the NodeInfo of the given node name.
|
||||
Get(nodeName string) (*NodeInfo, error)
|
||||
}
|
||||
|
@ -196,6 +196,9 @@ type NodeInfo struct {
|
||||
// The subset of pods with affinity.
|
||||
PodsWithAffinity []*PodInfo
|
||||
|
||||
// The subset of pods with required anti-affinity.
|
||||
PodsWithRequiredAntiAffinity []*PodInfo
|
||||
|
||||
// Ports allocated on the node.
|
||||
UsedPorts HostPortInfo
|
||||
|
||||
@ -457,6 +460,9 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
|
||||
}
|
||||
if len(n.PodsWithRequiredAntiAffinity) > 0 {
|
||||
clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
|
||||
}
|
||||
return clone
|
||||
}
|
||||
|
||||
@ -486,10 +492,12 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
|
||||
n.NonZeroRequested.MilliCPU += non0CPU
|
||||
n.NonZeroRequested.Memory += non0Mem
|
||||
n.Pods = append(n.Pods, podInfo)
|
||||
affinity := pod.Spec.Affinity
|
||||
if affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) {
|
||||
if podWithAffinity(pod) {
|
||||
n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
|
||||
}
|
||||
if podWithRequiredAntiAffinity(pod) {
|
||||
n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
|
||||
}
|
||||
|
||||
// Consume ports when pods added.
|
||||
n.updateUsedPorts(podInfo.Pod, true)
|
||||
@ -497,33 +505,54 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
|
||||
n.Generation = nextGeneration()
|
||||
}
|
||||
|
||||
// RemovePod subtracts pod information from this NodeInfo.
|
||||
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
|
||||
k1, err := GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func podWithAffinity(p *v1.Pod) bool {
|
||||
affinity := p.Spec.Affinity
|
||||
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
|
||||
}
|
||||
|
||||
for i := range n.PodsWithAffinity {
|
||||
k2, err := GetPodKey(n.PodsWithAffinity[i].Pod)
|
||||
func podWithRequiredAntiAffinity(p *v1.Pod) bool {
|
||||
affinity := p.Spec.Affinity
|
||||
return affinity != nil && affinity.PodAntiAffinity != nil &&
|
||||
len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
|
||||
}
|
||||
|
||||
func removeFromSlice(s []*PodInfo, k string) []*PodInfo {
|
||||
for i := range s {
|
||||
k2, err := GetPodKey(s[i].Pod)
|
||||
if err != nil {
|
||||
klog.Errorf("Cannot get pod key, err: %v", err)
|
||||
continue
|
||||
}
|
||||
if k1 == k2 {
|
||||
if k == k2 {
|
||||
// delete the element
|
||||
n.PodsWithAffinity[i] = n.PodsWithAffinity[len(n.PodsWithAffinity)-1]
|
||||
n.PodsWithAffinity = n.PodsWithAffinity[:len(n.PodsWithAffinity)-1]
|
||||
s[i] = s[len(s)-1]
|
||||
s = s[:len(s)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// RemovePod subtracts pod information from this NodeInfo.
|
||||
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
|
||||
k, err := GetPodKey(pod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if podWithAffinity(pod) {
|
||||
n.PodsWithAffinity = removeFromSlice(n.PodsWithAffinity, k)
|
||||
}
|
||||
if podWithRequiredAntiAffinity(pod) {
|
||||
n.PodsWithRequiredAntiAffinity = removeFromSlice(n.PodsWithRequiredAntiAffinity, k)
|
||||
}
|
||||
|
||||
for i := range n.Pods {
|
||||
k2, err := GetPodKey(n.Pods[i].Pod)
|
||||
if err != nil {
|
||||
klog.Errorf("Cannot get pod key, err: %v", err)
|
||||
continue
|
||||
}
|
||||
if k1 == k2 {
|
||||
if k == k2 {
|
||||
// delete the element
|
||||
n.Pods[i] = n.Pods[len(n.Pods)-1]
|
||||
n.Pods = n.Pods[:len(n.Pods)-1]
|
||||
@ -558,6 +587,9 @@ func (n *NodeInfo) resetSlicesIfEmpty() {
|
||||
if len(n.PodsWithAffinity) == 0 {
|
||||
n.PodsWithAffinity = nil
|
||||
}
|
||||
if len(n.PodsWithRequiredAntiAffinity) == 0 {
|
||||
n.PodsWithRequiredAntiAffinity = nil
|
||||
}
|
||||
if len(n.Pods) == 0 {
|
||||
n.Pods = nil
|
||||
}
|
||||
|
16
pkg/scheduler/internal/cache/cache.go
vendored
16
pkg/scheduler/internal/cache/cache.go
vendored
@ -213,6 +213,10 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
// status from having pods with affinity to NOT having pods with affinity or the other
|
||||
// way around.
|
||||
updateNodesHavePodsWithAffinity := false
|
||||
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
|
||||
// status from having pods with required anti-affinity to NOT having pods with required
|
||||
// anti-affinity or the other way around.
|
||||
updateNodesHavePodsWithRequiredAntiAffinity := false
|
||||
|
||||
// Start from the head of the NodeInfo doubly linked list and update snapshot
|
||||
// of NodeInfos updated after the last snapshot.
|
||||
@ -239,6 +243,9 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
|
||||
updateNodesHavePodsWithAffinity = true
|
||||
}
|
||||
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
|
||||
updateNodesHavePodsWithRequiredAntiAffinity = true
|
||||
}
|
||||
// We need to preserve the original pointer of the NodeInfo struct since it
|
||||
// is used in the NodeInfoList, which we may not update.
|
||||
*existing = *clone
|
||||
@ -254,7 +261,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
updateAllLists = true
|
||||
}
|
||||
|
||||
if updateAllLists || updateNodesHavePodsWithAffinity {
|
||||
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
|
||||
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
|
||||
}
|
||||
|
||||
@ -276,6 +283,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
|
||||
|
||||
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
|
||||
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
|
||||
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
|
||||
if updateAll {
|
||||
// Take a snapshot of the nodes order in the tree
|
||||
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
|
||||
@ -287,6 +295,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
|
||||
}
|
||||
if len(n.PodsWithRequiredAntiAffinity) > 0 {
|
||||
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n)
|
||||
}
|
||||
} else {
|
||||
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
|
||||
}
|
||||
@ -296,6 +307,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
|
||||
if len(n.PodsWithAffinity) > 0 {
|
||||
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n)
|
||||
}
|
||||
if len(n.PodsWithRequiredAntiAffinity) > 0 {
|
||||
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
18
pkg/scheduler/internal/cache/snapshot.go
vendored
18
pkg/scheduler/internal/cache/snapshot.go
vendored
@ -33,7 +33,10 @@ type Snapshot struct {
|
||||
nodeInfoList []*framework.NodeInfo
|
||||
// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
|
||||
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
|
||||
generation int64
|
||||
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
|
||||
// required anti-affinity terms.
|
||||
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
|
||||
generation int64
|
||||
}
|
||||
|
||||
var _ framework.SharedLister = &Snapshot{}
|
||||
@ -50,17 +53,22 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
|
||||
nodeInfoMap := createNodeInfoMap(pods, nodes)
|
||||
nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
|
||||
havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
|
||||
havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
|
||||
for _, v := range nodeInfoMap {
|
||||
nodeInfoList = append(nodeInfoList, v)
|
||||
if len(v.PodsWithAffinity) > 0 {
|
||||
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
|
||||
}
|
||||
if len(v.PodsWithRequiredAntiAffinity) > 0 {
|
||||
havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v)
|
||||
}
|
||||
}
|
||||
|
||||
s := NewEmptySnapshot()
|
||||
s.nodeInfoMap = nodeInfoMap
|
||||
s.nodeInfoList = nodeInfoList
|
||||
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
|
||||
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
|
||||
|
||||
return s
|
||||
}
|
||||
@ -137,11 +145,17 @@ func (s *Snapshot) List() ([]*framework.NodeInfo, error) {
|
||||
return s.nodeInfoList, nil
|
||||
}
|
||||
|
||||
// HavePodsWithAffinityList returns the list of nodes with at least one pods with inter-pod affinity
|
||||
// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity
|
||||
func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
|
||||
return s.havePodsWithAffinityNodeInfoList, nil
|
||||
}
|
||||
|
||||
// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with
|
||||
// required inter-pod anti-affinity
|
||||
func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
|
||||
return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil
|
||||
}
|
||||
|
||||
// Get returns the NodeInfo of the given node name.
|
||||
func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
|
||||
if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user