Merge pull request #84824 from ahg-g/ahg-nodes-with-affinity-pods

Tracking nodes with pods with affinity
This commit is contained in:
Kubernetes Prow Robot 2019-11-05 22:19:41 -08:00 committed by GitHub
commit f35a92ccf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 48 additions and 9 deletions

View File

@ -338,13 +338,20 @@ func GetPredicateMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedListe
} }
var allNodes []*schedulernodeinfo.NodeInfo var allNodes []*schedulernodeinfo.NodeInfo
var havePodsWithAffinityNodes []*schedulernodeinfo.NodeInfo
if sharedLister != nil { if sharedLister != nil {
n, err := sharedLister.NodeInfos().List() var err error
allNodes, err = sharedLister.NodeInfos().List()
if err != nil { if err != nil {
klog.Errorf("failed to list NodeInfos: %v", err) klog.Errorf("failed to list NodeInfos: %v", err)
return nil return nil
} }
allNodes = n havePodsWithAffinityNodes, err = sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
klog.Errorf("failed to list NodeInfos: %v", err)
return nil
}
} }
// evenPodsSpreadMetadata represents how existing pods match "pod" // evenPodsSpreadMetadata represents how existing pods match "pod"
@ -355,7 +362,7 @@ func GetPredicateMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedListe
return nil return nil
} }
podAffinityMetadata, err := getPodAffinityMetadata(pod, allNodes) podAffinityMetadata, err := getPodAffinityMetadata(pod, allNodes, havePodsWithAffinityNodes)
if err != nil { if err != nil {
klog.Errorf("Error calculating podAffinityMetadata: %v", err) klog.Errorf("Error calculating podAffinityMetadata: %v", err)
return nil return nil
@ -387,9 +394,9 @@ func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata {
} }
} }
func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) { func getPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo, havePodsWithAffinityNodes []*schedulernodeinfo.NodeInfo) (*podAffinityMetadata, error) {
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, allNodes) existingPodAntiAffinityMap, err := getTPMapMatchingExistingAntiAffinity(pod, havePodsWithAffinityNodes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -759,9 +766,11 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulernode
errCh.SendErrorWithCancel(err, cancel) errCh.SendErrorWithCancel(err, cancel)
return return
} }
if existingPodTopologyMaps != nil {
appendTopologyPairsMaps(existingPodTopologyMaps) appendTopologyPairsMaps(existingPodTopologyMaps)
} }
} }
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode) workqueue.ParallelizeUntil(ctx, 16, len(allNodes), processNode)
if err := errCh.ReceiveError(); err != nil { if err := errCh.ReceiveError(); err != nil {

View File

@ -106,10 +106,17 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, shar
// pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node. // pm stores (1) all nodes that should be considered and (2) the so-far computed score for each node.
pm := newPodAffinityPriorityMap(nodes) pm := newPodAffinityPriorityMap(nodes)
allNodes, err := sharedLister.NodeInfos().List()
allNodes, err := sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if hasAffinityConstraints || hasAntiAffinityConstraints {
allNodes, err = sharedLister.NodeInfos().List()
if err != nil {
return nil, err
}
}
// convert the topology key based weights to the node name based weights // convert the topology key based weights to the node name based weights
var maxCount, minCount int64 var maxCount, minCount int64

View File

@ -179,7 +179,6 @@ func (g *genericScheduler) snapshot() error {
// for cluster autoscaler integration. // for cluster autoscaler integration.
func (g *genericScheduler) PredicateMetadataProducer() predicates.PredicateMetadataProducer { func (g *genericScheduler) PredicateMetadataProducer() predicates.PredicateMetadataProducer {
return g.predicateMetaProducer return g.predicateMetaProducer
} }
// Schedule tries to schedule the given pod to one of the nodes in the node list. // Schedule tries to schedule the given pod to one of the nodes in the node list.

View File

@ -239,10 +239,14 @@ func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapsh
// Take a snapshot of the nodes order in the tree // Take a snapshot of the nodes order in the tree
nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes) nodeSnapshot.NodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
nodeSnapshot.HavePodsWithAffinityNodeInfoList = make([]*schedulernodeinfo.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ { for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next() nodeName := cache.nodeTree.next()
if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil { if n := nodeSnapshot.NodeInfoMap[nodeName]; n != nil {
nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n) nodeSnapshot.NodeInfoList = append(nodeSnapshot.NodeInfoList, n)
if len(n.PodsWithAffinity()) > 0 {
nodeSnapshot.HavePodsWithAffinityNodeInfoList = append(nodeSnapshot.HavePodsWithAffinityNodeInfoList, n)
}
} else { } else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
} }

View File

@ -262,6 +262,12 @@ func (nodes NodeInfoLister) List() ([]*schedulernodeinfo.NodeInfo, error) {
return nodes, nil return nodes, nil
} }
// HavePodsWithAffinityList is supposed to list nodes with at least one pod with affinity. For the fake lister
// we just return everything.
func (nodes NodeInfoLister) HavePodsWithAffinityList() ([]*schedulernodeinfo.NodeInfo, error) {
return nodes, nil
}
// NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes. // NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes.
func NewNodeInfoLister(nodes []*v1.Node) schedulerlisters.NodeInfoLister { func NewNodeInfoLister(nodes []*v1.Node) schedulerlisters.NodeInfoLister {
nodeInfoList := make([]*schedulernodeinfo.NodeInfo, len(nodes)) nodeInfoList := make([]*schedulernodeinfo.NodeInfo, len(nodes))

View File

@ -38,6 +38,8 @@ type PodLister interface {
type NodeInfoLister interface { type NodeInfoLister interface {
// Returns the list of NodeInfos. // Returns the list of NodeInfos.
List() ([]*schedulernodeinfo.NodeInfo, error) List() ([]*schedulernodeinfo.NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with affinity terms.
HavePodsWithAffinityList() ([]*schedulernodeinfo.NodeInfo, error)
// Returns the NodeInfo of the given node name. // Returns the NodeInfo of the given node name.
Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error)
} }

View File

@ -32,6 +32,8 @@ type Snapshot struct {
NodeInfoMap map[string]*schedulernodeinfo.NodeInfo NodeInfoMap map[string]*schedulernodeinfo.NodeInfo
// NodeInfoList is the list of nodes as ordered in the cache's nodeTree. // NodeInfoList is the list of nodes as ordered in the cache's nodeTree.
NodeInfoList []*schedulernodeinfo.NodeInfo NodeInfoList []*schedulernodeinfo.NodeInfo
// HavePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
HavePodsWithAffinityNodeInfoList []*schedulernodeinfo.NodeInfo
Generation int64 Generation int64
} }
@ -48,13 +50,18 @@ func NewEmptySnapshot() *Snapshot {
func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot { func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(pods, nodes) nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(pods, nodes)
nodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, len(nodes)) nodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, len(nodes))
havePodsWithAffinityNodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, len(nodes))
for _, v := range nodeInfoMap { for _, v := range nodeInfoMap {
nodeInfoList = append(nodeInfoList, v) nodeInfoList = append(nodeInfoList, v)
if len(v.PodsWithAffinity()) > 0 {
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
}
} }
s := NewEmptySnapshot() s := NewEmptySnapshot()
s.NodeInfoMap = nodeInfoMap s.NodeInfoMap = nodeInfoMap
s.NodeInfoList = nodeInfoList s.NodeInfoList = nodeInfoList
s.HavePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
return s return s
} }
@ -119,6 +126,11 @@ func (n *nodeInfoLister) List() ([]*schedulernodeinfo.NodeInfo, error) {
return n.snapshot.NodeInfoList, nil return n.snapshot.NodeInfoList, nil
} }
// HavePodsWithAffinityList returns the list of nodes with at least one pods with inter-pod affinity
func (n *nodeInfoLister) HavePodsWithAffinityList() ([]*schedulernodeinfo.NodeInfo, error) {
return n.snapshot.HavePodsWithAffinityNodeInfoList, nil
}
// Returns the NodeInfo of the given node name. // Returns the NodeInfo of the given node name.
func (n *nodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) { func (n *nodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) {
if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok { if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok {