diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 596f1b95df3..9f0c32b093e 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -163,25 +163,25 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []framework.AffinityTerm) boo // getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*framework.NodeInfo) topologyToMatchedTermCount { - topoMaps := make([]topologyToMatchedTermCount, len(allNodes)) +func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodes []*framework.NodeInfo) topologyToMatchedTermCount { + topoMaps := make([]topologyToMatchedTermCount, len(nodes)) index := int32(-1) processNode := func(i int) { - nodeInfo := allNodes[i] + nodeInfo := nodes[i] node := nodeInfo.Node() if node == nil { klog.Error("node not found") return } topoMap := make(topologyToMatchedTermCount) - for _, existingPod := range nodeInfo.PodsWithAffinity { + for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity { topoMap.updateWithAntiAffinityTerms(pod, node, existingPod.RequiredAntiAffinityTerms, 1) } if len(topoMap) != 0 { topoMaps[atomic.AddInt32(&index, 1)] = topoMap } } - parallelize.Until(context.Background(), len(allNodes), processNode) + parallelize.Until(context.Background(), len(nodes), processNode) result := make(topologyToMatchedTermCount) for i := 0; i <= int(index); i++ { @@ -241,12 +241,12 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(podInfo *framework.PodInfo, al // PreFilter invoked at the prefilter extension point. func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { var allNodes []*framework.NodeInfo - var havePodsWithAffinityNodes []*framework.NodeInfo + var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo var err error if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil { return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos: %v", err)) } - if havePodsWithAffinityNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList(); err != nil { + if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil { return framework.NewStatus(framework.Error, fmt.Sprintf("failed to list NodeInfos with pods with affinity: %v", err)) } @@ -256,7 +256,7 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework } // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity - existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes) + existingPodAntiAffinityMap := getTPMapMatchingExistingAntiAffinity(pod, nodesWithRequiredAntiAffinityPods) // incomingPodAffinityMap will be used later for efficient check on incoming pod's affinity // incomingPodAntiAffinityMap will be used later for efficient check on incoming pod's anti-affinity diff --git a/pkg/scheduler/framework/v1alpha1/fake/listers.go b/pkg/scheduler/framework/v1alpha1/fake/listers.go index 2890879628e..863b595cbe6 100644 --- a/pkg/scheduler/framework/v1alpha1/fake/listers.go +++ b/pkg/scheduler/framework/v1alpha1/fake/listers.go @@ -245,6 +245,12 @@ func (nodes NodeInfoLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, e return nodes, nil } +// HavePodsWithRequiredAntiAffinityList is supposed to list nodes with at least one pod with +// required anti-affinity. For the fake lister we just return everything. +func (nodes NodeInfoLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nodes, nil +} + // NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes. func NewNodeInfoLister(nodes []*v1.Node) framework.NodeInfoLister { nodeInfoList := make([]*framework.NodeInfo, len(nodes)) diff --git a/pkg/scheduler/framework/v1alpha1/listers.go b/pkg/scheduler/framework/v1alpha1/listers.go index e6bc9b43db1..91167edc0a0 100644 --- a/pkg/scheduler/framework/v1alpha1/listers.go +++ b/pkg/scheduler/framework/v1alpha1/listers.go @@ -22,6 +22,8 @@ type NodeInfoLister interface { List() ([]*NodeInfo, error) // Returns the list of NodeInfos of nodes with pods with affinity terms. HavePodsWithAffinityList() ([]*NodeInfo, error) + // Returns the list of NodeInfos of nodes with pods with required anti-affinity terms. + HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error) // Returns the NodeInfo of the given node name. Get(nodeName string) (*NodeInfo, error) } diff --git a/pkg/scheduler/framework/v1alpha1/types.go b/pkg/scheduler/framework/v1alpha1/types.go index c878a7c9ffa..b6a3efc6815 100644 --- a/pkg/scheduler/framework/v1alpha1/types.go +++ b/pkg/scheduler/framework/v1alpha1/types.go @@ -196,6 +196,9 @@ type NodeInfo struct { // The subset of pods with affinity. PodsWithAffinity []*PodInfo + // The subset of pods with required anti-affinity. + PodsWithRequiredAntiAffinity []*PodInfo + // Ports allocated on the node. UsedPorts HostPortInfo @@ -457,6 +460,9 @@ func (n *NodeInfo) Clone() *NodeInfo { if len(n.PodsWithAffinity) > 0 { clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...) } + if len(n.PodsWithRequiredAntiAffinity) > 0 { + clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...) + } return clone } @@ -486,10 +492,12 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) { n.NonZeroRequested.MilliCPU += non0CPU n.NonZeroRequested.Memory += non0Mem n.Pods = append(n.Pods, podInfo) - affinity := pod.Spec.Affinity - if affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) { + if podWithAffinity(pod) { n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo) } + if podWithRequiredAntiAffinity(pod) { + n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo) + } // Consume ports when pods added. n.updateUsedPorts(podInfo.Pod, true) @@ -497,33 +505,54 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) { n.Generation = nextGeneration() } -// RemovePod subtracts pod information from this NodeInfo. -func (n *NodeInfo) RemovePod(pod *v1.Pod) error { - k1, err := GetPodKey(pod) - if err != nil { - return err - } +func podWithAffinity(p *v1.Pod) bool { + affinity := p.Spec.Affinity + return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) +} - for i := range n.PodsWithAffinity { - k2, err := GetPodKey(n.PodsWithAffinity[i].Pod) +func podWithRequiredAntiAffinity(p *v1.Pod) bool { + affinity := p.Spec.Affinity + return affinity != nil && affinity.PodAntiAffinity != nil && + len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 +} + +func removeFromSlice(s []*PodInfo, k string) []*PodInfo { + for i := range s { + k2, err := GetPodKey(s[i].Pod) if err != nil { klog.Errorf("Cannot get pod key, err: %v", err) continue } - if k1 == k2 { + if k == k2 { // delete the element - n.PodsWithAffinity[i] = n.PodsWithAffinity[len(n.PodsWithAffinity)-1] - n.PodsWithAffinity = n.PodsWithAffinity[:len(n.PodsWithAffinity)-1] + s[i] = s[len(s)-1] + s = s[:len(s)-1] break } } + return s +} + +// RemovePod subtracts pod information from this NodeInfo. +func (n *NodeInfo) RemovePod(pod *v1.Pod) error { + k, err := GetPodKey(pod) + if err != nil { + return err + } + if podWithAffinity(pod) { + n.PodsWithAffinity = removeFromSlice(n.PodsWithAffinity, k) + } + if podWithRequiredAntiAffinity(pod) { + n.PodsWithRequiredAntiAffinity = removeFromSlice(n.PodsWithRequiredAntiAffinity, k) + } + for i := range n.Pods { k2, err := GetPodKey(n.Pods[i].Pod) if err != nil { klog.Errorf("Cannot get pod key, err: %v", err) continue } - if k1 == k2 { + if k == k2 { // delete the element n.Pods[i] = n.Pods[len(n.Pods)-1] n.Pods = n.Pods[:len(n.Pods)-1] @@ -558,6 +587,9 @@ func (n *NodeInfo) resetSlicesIfEmpty() { if len(n.PodsWithAffinity) == 0 { n.PodsWithAffinity = nil } + if len(n.PodsWithRequiredAntiAffinity) == 0 { + n.PodsWithRequiredAntiAffinity = nil + } if len(n.Pods) == 0 { n.Pods = nil } diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index e0f44e78d59..251e41a6e46 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -213,6 +213,10 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { // status from having pods with affinity to NOT having pods with affinity or the other // way around. updateNodesHavePodsWithAffinity := false + // HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its + // status from having pods with required anti-affinity to NOT having pods with required + // anti-affinity or the other way around. + updateNodesHavePodsWithRequiredAntiAffinity := false // Start from the head of the NodeInfo doubly linked list and update snapshot // of NodeInfos updated after the last snapshot. @@ -239,6 +243,9 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) { updateNodesHavePodsWithAffinity = true } + if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) { + updateNodesHavePodsWithRequiredAntiAffinity = true + } // We need to preserve the original pointer of the NodeInfo struct since it // is used in the NodeInfoList, which we may not update. *existing = *clone @@ -254,7 +261,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { updateAllLists = true } - if updateAllLists || updateNodesHavePodsWithAffinity { + if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity { cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists) } @@ -276,6 +283,7 @@ func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) if updateAll { // Take a snapshot of the nodes order in the tree snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) @@ -287,6 +295,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda if len(n.PodsWithAffinity) > 0 { snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n) } + if len(n.PodsWithRequiredAntiAffinity) > 0 { + snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n) + } } else { klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) } @@ -296,6 +307,9 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda if len(n.PodsWithAffinity) > 0 { snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, n) } + if len(n.PodsWithRequiredAntiAffinity) > 0 { + snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, n) + } } } } diff --git a/pkg/scheduler/internal/cache/snapshot.go b/pkg/scheduler/internal/cache/snapshot.go index a63f67431ee..674de0b276f 100644 --- a/pkg/scheduler/internal/cache/snapshot.go +++ b/pkg/scheduler/internal/cache/snapshot.go @@ -33,7 +33,10 @@ type Snapshot struct { nodeInfoList []*framework.NodeInfo // havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms. havePodsWithAffinityNodeInfoList []*framework.NodeInfo - generation int64 + // havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring + // required anti-affinity terms. + havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo + generation int64 } var _ framework.SharedLister = &Snapshot{} @@ -50,17 +53,22 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot { nodeInfoMap := createNodeInfoMap(pods, nodes) nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap)) havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap)) + havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap)) for _, v := range nodeInfoMap { nodeInfoList = append(nodeInfoList, v) if len(v.PodsWithAffinity) > 0 { havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v) } + if len(v.PodsWithRequiredAntiAffinity) > 0 { + havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v) + } } s := NewEmptySnapshot() s.nodeInfoMap = nodeInfoMap s.nodeInfoList = nodeInfoList s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList + s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList return s } @@ -137,11 +145,17 @@ func (s *Snapshot) List() ([]*framework.NodeInfo, error) { return s.nodeInfoList, nil } -// HavePodsWithAffinityList returns the list of nodes with at least one pods with inter-pod affinity +// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { return s.havePodsWithAffinityNodeInfoList, nil } +// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with +// required inter-pod anti-affinity +func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil +} + // Get returns the NodeInfo of the given node name. func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) { if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {