Speedup pod affintiy predicate function

This commit is contained in:
Wojciech Tyczynski 2016-07-21 16:16:24 +02:00
parent 37d3d390df
commit 4bc410e47a
3 changed files with 215 additions and 127 deletions

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -29,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -70,17 +72,28 @@ type predicateMetadata struct {
podBestEffort bool podBestEffort bool
podRequest *schedulercache.Resource podRequest *schedulercache.Resource
podPorts map[int]bool podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
} }
func PredicateMetadata(pod *api.Pod) interface{} { type matchingPodAntiAffinityTerm struct {
term *api.PodAffinityTerm
node *api.Node
}
func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil { if pod == nil {
// We cannot compute metadata, just return nil return nil
}
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeInfoMap)
if err != nil {
return nil return nil
} }
return &predicateMetadata{ return &predicateMetadata{
podBestEffort: isPodBestEffort(pod), podBestEffort: isPodBestEffort(pod),
podRequest: getResourceRequest(pod), podRequest: getResourceRequest(pod),
podPorts: getUsedPorts(pod), podPorts: getUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
} }
} }
@ -793,38 +806,33 @@ func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister, failu
return checker.InterPodAffinityMatches return checker.InterPodAffinityMatches
} }
func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { func (c *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
return false, fmt.Errorf("node not found") return false, fmt.Errorf("node not found")
} }
allPods, err := checker.podLister.List(labels.Everything()) if !c.satisfiesExistingPodsAntiAffinity(pod, meta, node) {
if err != nil { return false, nil
return false, err
} }
// Now check if <pod> requirements will be satisfied on this node.
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil { if err != nil {
return false, err return false, err
} }
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
// Check if the current node match the inter-pod affinity scheduling constraints. return true, nil
// Hard inter-pod affinity is not symmetric, check only when affinity.PodAffinity exists.
if affinity != nil && affinity.PodAffinity != nil {
if !checker.NodeMatchesHardPodAffinity(pod, allPods, node, affinity.PodAffinity) {
return false, ErrPodAffinityNotMatch
} }
if !c.satisfiesPodsAffinityAntiAffinity(pod, node, affinity) {
return false, nil
} }
// Hard inter-pod anti-affinity is symmetric, we should always check it if glog.V(10) {
// (also when affinity or affinity.PodAntiAffinity is nil). // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
var antiAffinity *api.PodAntiAffinity // not logged. There is visible performance gain from it.
if affinity != nil { glog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",
antiAffinity = affinity.PodAntiAffinity podName(pod), node.Name)
} }
if !checker.NodeMatchesHardPodAntiAffinity(pod, allPods, node, antiAffinity) {
return false, ErrPodAffinityNotMatch
}
return true, nil return true, nil
} }
@ -832,21 +840,20 @@ func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta in
// First return value indicates whether a matching pod exists on a node that matches the topology key, // First return value indicates whether a matching pod exists on a node that matches the topology key,
// while the second return value indicates whether a matching pod exists anywhere. // while the second return value indicates whether a matching pod exists anywhere.
// TODO: Do we really need any pod matching, or all pods matching? I think the latter. // TODO: Do we really need any pod matching, or all pods matching? I think the latter.
func (checker *PodAffinityChecker) AnyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinityTerm api.PodAffinityTerm) (bool, bool, error) { func (c *PodAffinityChecker) anyPodMatchesPodAffinityTerm(pod *api.Pod, allPods []*api.Pod, node *api.Node, term *api.PodAffinityTerm) (bool, bool, error) {
matchingPodExists := false matchingPodExists := false
for _, ep := range allPods { for _, existingPod := range allPods {
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName) match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(existingPod, pod, term)
if err != nil { if err != nil {
return false, matchingPodExists, err return false, matchingPodExists, err
} }
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(ep, pod, &podAffinityTerm)
if err != nil {
return false, matchingPodExists, err
}
if match { if match {
matchingPodExists = true matchingPodExists = true
if checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, podAffinityTerm.TopologyKey) { existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return false, matchingPodExists, err
}
if c.failureDomains.NodesHaveSameTopologyKey(node, existingPodNode, term.TopologyKey) {
return true, matchingPodExists, nil return true, matchingPodExists, nil
} }
} }
@ -880,87 +887,167 @@ func getPodAntiAffinityTerms(podAntiAffinity *api.PodAntiAffinity) (terms []api.
return terms return terms
} }
// Checks whether the given node has pods which satisfy all the required pod affinity scheduling rules. func getMatchingAntiAffinityTerms(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) ([]matchingPodAntiAffinityTerm, error) {
// If node has pods which satisfy all the required pod affinity scheduling rules then return true. allNodeNames := make([]string, 0, len(nodeInfoMap))
func (checker *PodAffinityChecker) NodeMatchesHardPodAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAffinity *api.PodAffinity) bool { for name := range nodeInfoMap {
for _, podAffinityTerm := range getPodAffinityTerms(podAffinity) { allNodeNames = append(allNodeNames, name)
podAffinityTermMatches, matchingPodExists, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAffinityTerm)
if err != nil {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, an error ocurred when checking existing pods on the node for PodAffinityTerm %v err: %v",
podName(pod), node.Name, podAffinityTerm, err)
return false
} }
if !podAffinityTermMatches { var lock sync.Mutex
// If the requiredDuringScheduling affinity requirement matches a pod's own labels and namespace, and there are no other such pods var result []matchingPodAntiAffinityTerm
// anywhere, then disregard the requirement. var firstError error
// This allows rules like "schedule all of the pods of this collection to the same zone" to not block forever appendResult := func(toAppend []matchingPodAntiAffinityTerm) {
// because the first pod of the collection can't be scheduled. lock.Lock()
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, pod, &podAffinityTerm) defer lock.Unlock()
if err != nil || !match || matchingPodExists { result = append(result, toAppend...)
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because none of the existing pods on this node satisfy the PodAffinityTerm %v, err: %+v", }
podName(pod), node.Name, podAffinityTerm, err) catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]]
node := nodeInfo.Node()
if node == nil {
catchError(fmt.Errorf("node not found"))
return
}
var nodeResult []matchingPodAntiAffinityTerm
for _, existingPod := range nodeInfo.PodsWithAffinity() {
affinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
catchError(err)
return
}
if affinity == nil {
continue
}
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, existingPod, &term)
if err != nil {
catchError(err)
return
}
if match {
nodeResult = append(nodeResult, matchingPodAntiAffinityTerm{term: &term, node: node})
}
}
}
if len(nodeResult) > 0 {
appendResult(nodeResult)
}
}
workqueue.Parallelize(16, len(allNodeNames), processNode)
return result, firstError
}
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *api.Pod, allPods []*api.Pod) ([]matchingPodAntiAffinityTerm, error) {
var result []matchingPodAntiAffinityTerm
for _, existingPod := range allPods {
affinity, err := api.GetAffinityFromPodAnnotations(existingPod.Annotations)
if err != nil {
return nil, err
}
if affinity.PodAntiAffinity != nil {
existingPodNode, err := c.info.GetNodeInfo(existingPod.Spec.NodeName)
if err != nil {
return nil, err
}
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, existingPod, &term)
if err != nil {
return nil, err
}
if match {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: existingPodNode})
}
}
}
}
return result, nil
}
// Checks if scheduling the pod onto this node would break any anti-affinity
// rules indicated by the existing pods.
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *api.Pod, meta interface{}, node *api.Node) bool {
var matchingTerms []matchingPodAntiAffinityTerm
if predicateMeta, ok := meta.(*predicateMetadata); ok {
matchingTerms = predicateMeta.matchingAntiAffinityTerms
} else {
allPods, err := c.podLister.List(labels.Everything())
if err != nil {
glog.V(10).Infof("Failed to get all pods, %+v", err)
return false
}
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, allPods); err != nil {
glog.V(10).Infof("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
return false return false
} }
} }
for _, term := range matchingTerms {
if c.failureDomains.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return false
}
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("Schedule Pod %+v on Node %+v is allowed, existing pods anti-affinity rules satisfied.",
podName(pod), node.Name)
} }
// all the required pod affinity scheduling rules satisfied
glog.V(10).Infof("All the required pod affinity scheduling rules are satisfied for Pod %+v, on node %v", podName(pod), node.Name)
return true return true
} }
// Checks whether the given node has pods which satisfy all the // Checks if scheduling the pod onto this node would break any rules of this pod.
// required pod anti-affinity scheduling rules. func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *api.Pod, node *api.Node, affinity *api.Affinity) bool {
// Also checks whether putting the pod onto the node would break allPods, err := c.podLister.List(labels.Everything())
// any anti-affinity scheduling rules indicated by existing pods. if err != nil {
// If node has pods which satisfy all the required pod anti-affinity return false
// scheduling rules and scheduling the pod onto the node won't }
// break any existing pods' anti-affinity rules, then return true.
func (checker *PodAffinityChecker) NodeMatchesHardPodAntiAffinity(pod *api.Pod, allPods []*api.Pod, node *api.Node, podAntiAffinity *api.PodAntiAffinity) bool { // Check all affinity terms.
// foreach element podAntiAffinityTerm of podAntiAffinityTerms for _, term := range getPodAffinityTerms(affinity.PodAffinity) {
// if the pod matches the term (breaks the anti-affinity), termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
// don't schedule the pod onto this node. if err != nil {
for _, podAntiAffinityTerm := range getPodAntiAffinityTerms(podAntiAffinity) { glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v",
podAntiAffinityTermMatches, _, err := checker.AnyPodMatchesPodAffinityTerm(pod, allPods, node, podAntiAffinityTerm) podName(pod), node.Name, term, err)
if err != nil || podAntiAffinityTermMatches { return false
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because not all the existing pods on this node satisfy the PodAntiAffinityTerm %v, err: %v", }
podName(pod), node.Name, podAntiAffinityTerm, err) if !termMatches {
// If the requirement matches a pod's own labels ane namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, pod, &term)
if err != nil || !match || matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return false
}
}
}
// Check all anti-affinity terms.
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return false return false
} }
} }
// Check if scheduling the pod onto this node would break if glog.V(10) {
// any anti-affinity rules indicated by the existing pods on the node. // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// If it would break, system should not schedule pod onto this node. // not logged. There is visible performance gain from it.
for _, ep := range allPods { glog.Infof("Schedule Pod %+v on Node %+v is allowed, pod afinnity/anti-affinity constraints satisfied.",
epAffinity, err := api.GetAffinityFromPodAnnotations(ep.Annotations) podName(pod), node.Name)
if err != nil {
glog.V(10).Infof("Failed to get Affinity from Pod %+v, err: %+v", podName(pod), err)
return false
} }
if epAffinity == nil {
continue
}
epNode, err := checker.info.GetNodeInfo(ep.Spec.NodeName)
if err != nil {
glog.V(10).Infof("Failed to get node from Pod %+v, err: %+v", podName(ep), err)
return false
}
for _, epAntiAffinityTerm := range getPodAntiAffinityTerms(epAffinity.PodAntiAffinity) {
match, err := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, ep, &epAntiAffinityTerm)
if err != nil {
glog.V(10).Infof("Failed to get label selector from anti-affinityterm %+v of existing pod %+v, err: %+v", epAntiAffinityTerm, podName(pod), err)
return false
}
if match && checker.failureDomains.NodesHaveSameTopologyKey(node, epNode, epAntiAffinityTerm.TopologyKey) {
glog.V(10).Infof("Cannot schedule Pod %+v, onto node %v because the pod would break the PodAntiAffinityTerm %+v, of existing pod %+v, err: %v",
podName(pod), node.Name, epAntiAffinityTerm, podName(ep), err)
return false
}
}
}
// all the required pod anti-affinity scheduling rules are satisfied
glog.V(10).Infof("Can schedule Pod %+v, on node %v because all the required pod anti-affinity scheduling rules are satisfied", podName(pod), node.Name)
return true return true
} }
@ -1026,9 +1113,7 @@ func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *
} }
var podBestEffort bool var podBestEffort bool
if predicateMeta, ok := meta.(*predicateMetadata); ok {
predicateMeta, ok := meta.(*predicateMetadata)
if ok {
podBestEffort = predicateMeta.podBestEffort podBestEffort = predicateMeta.podBestEffort
} else { } else {
// We couldn't parse metadata - fallback to computing it. // We couldn't parse metadata - fallback to computing it.

View File

@ -237,7 +237,7 @@ func TestPodFitsResources(t *testing.T) {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}} node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}}
test.nodeInfo.SetNode(&node) test.nodeInfo.SetNode(&node)
fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod), test.nodeInfo) fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
} }
@ -290,7 +290,7 @@ func TestPodFitsResources(t *testing.T) {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}} node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}}
test.nodeInfo.SetNode(&node) test.nodeInfo.SetNode(&node)
fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod), test.nodeInfo) fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
} }
@ -346,7 +346,7 @@ func TestPodFitsHost(t *testing.T) {
for _, test := range tests { for _, test := range tests {
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node) nodeInfo.SetNode(test.node)
result, err := PodFitsHost(test.pod, PredicateMetadata(test.pod), nodeInfo) result, err := PodFitsHost(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrPodNotMatchHostName) && err != nil { if !reflect.DeepEqual(err, ErrPodNotMatchHostName) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -419,7 +419,7 @@ func TestPodFitsHostPorts(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
fits, err := PodFitsHostPorts(test.pod, PredicateMetadata(test.pod), test.nodeInfo) fits, err := PodFitsHostPorts(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) && err != nil { if !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -504,7 +504,7 @@ func TestDiskConflicts(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod), test.nodeInfo) ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -556,7 +556,7 @@ func TestAWSDiskConflicts(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod), test.nodeInfo) ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -614,7 +614,7 @@ func TestRBDDiskConflicts(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod), test.nodeInfo) ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1093,7 +1093,7 @@ func TestPodFitsSelector(t *testing.T) {
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node) nodeInfo.SetNode(&node)
fits, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) fits, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil { if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1158,7 +1158,7 @@ func TestNodeLabelPresence(t *testing.T) {
nodeInfo.SetNode(&node) nodeInfo.SetNode(&node)
labelChecker := NodeLabelChecker{test.labels, test.presence} labelChecker := NodeLabelChecker{test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, PredicateMetadata(test.pod), nodeInfo) fits, err := labelChecker.CheckNodeLabelPresence(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil { if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1303,7 +1303,7 @@ func TestServiceAffinity(t *testing.T) {
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node) nodeInfo.SetNode(test.node)
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod), nodeInfo) fits, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrServiceAffinityViolated) && err != nil { if !reflect.DeepEqual(err, ErrServiceAffinityViolated) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1584,7 +1584,7 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
for _, test := range tests { for _, test := range tests {
pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo) pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo)
fits, err := pred(test.newPod, PredicateMetadata(test.newPod), schedulercache.NewNodeInfo(test.existingPods...)) fits, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), schedulercache.NewNodeInfo(test.existingPods...))
if err != nil && !reflect.DeepEqual(err, ErrMaxVolumeCountExceeded) { if err != nil && !reflect.DeepEqual(err, ErrMaxVolumeCountExceeded) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1778,7 +1778,7 @@ func TestRunGeneralPredicates(t *testing.T) {
} }
for _, test := range resourceTests { for _, test := range resourceTests {
test.nodeInfo.SetNode(test.node) test.nodeInfo.SetNode(test.node)
fits, err := GeneralPredicates(test.pod, PredicateMetadata(test.pod), test.nodeInfo) fits, err := GeneralPredicates(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
} }
@ -2326,7 +2326,8 @@ func TestInterPodAffinity(t *testing.T) {
} }
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...) nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(test.node) nodeInfo.SetNode(test.node)
fits, err := fit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) nodeInfoMap := map[string]*schedulercache.NodeInfo{test.node.Name: nodeInfo}
fits, err := fit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil { if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil {
t.Errorf("%s: unexpected error %v", test.test, err) t.Errorf("%s: unexpected error %v", test.test, err)
} }
@ -2492,7 +2493,8 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
} }
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...) nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(&node) nodeInfo.SetNode(&node)
fits, err := testFit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
fits, err := testFit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil { if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil {
t.Errorf("%s: unexpected error %v", test.test, err) t.Errorf("%s: unexpected error %v", test.test, err)
} }
@ -2503,7 +2505,8 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
if affinity != nil && affinity.NodeAffinity != nil { if affinity != nil && affinity.NodeAffinity != nil {
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node) nodeInfo.SetNode(&node)
fits2, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
fits2, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil { if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -2789,7 +2792,7 @@ func TestPodToleratesTaints(t *testing.T) {
for _, test := range podTolerateTaintsTests { for _, test := range podTolerateTaintsTests {
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&test.node) nodeInfo.SetNode(&test.node)
fits, err := PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod), nodeInfo) fits, err := PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if fits == false && !reflect.DeepEqual(err, ErrTaintsTolerationsNotMatch) { if fits == false && !reflect.DeepEqual(err, ErrTaintsTolerationsNotMatch) {
t.Errorf("%s, unexpected error: %v", test.test, err) t.Errorf("%s, unexpected error: %v", test.test, err)
} }
@ -2895,7 +2898,7 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
fits, err := CheckNodeMemoryPressurePredicate(test.pod, PredicateMetadata(test.pod), test.nodeInfo) fits, err := CheckNodeMemoryPressurePredicate(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if fits != test.fits { if fits != test.fits {
t.Errorf("%s: expected %v got %v", test.name, test.fits, fits) t.Errorf("%s: expected %v got %v", test.name, test.fits, fits)
} }

View File

@ -152,7 +152,7 @@ func findNodesThatFit(
// Create filtered list with enough space to avoid growing it // Create filtered list with enough space to avoid growing it
// and allow assigning. // and allow assigning.
filtered = make([]*api.Node, len(nodes)) filtered = make([]*api.Node, len(nodes))
meta := predicates.PredicateMetadata(pod) meta := predicates.PredicateMetadata(pod, nodeNameToInfo)
errs := []error{} errs := []error{}
var predicateResultLock sync.Mutex var predicateResultLock sync.Mutex