Merge pull request #59245 from resouer/equiv-node

Automatic merge from submit-queue (batch tested with PRs 59394, 58769, 59423, 59363, 59245). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Ensure euqiv hash calculation is per schedule

**What this PR does / why we need it**:

Currently, equiv hash is calculated per schedule, but also, per node. This is a potential cause of dragging integration test, see #58881

We should ensure this only happens once during scheduling of specific pod no matter how many nodes we have.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #58989

**Special notes for your reviewer**:

**Release note**:

```release-note
Ensure euqiv hash calculation is per schedule
```
This commit is contained in:
Kubernetes Submit Queue 2018-02-06 21:34:48 -08:00 committed by GitHub
commit 7223729d51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 30 deletions

View File

@ -208,15 +208,22 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
} }
// getHashEquivalencePod returns the hash of equivalence pod. // equivalenceClassInfo holds equivalence hash which is used for checking equivalence cache.
// 1. equivalenceHash // We will pass this to podFitsOnNode to ensure equivalence hash is only calculated per schedule.
// 2. if equivalence hash is valid type equivalenceClassInfo struct {
func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) (uint64, bool) { // Equivalence hash.
hash uint64
}
// getEquivalenceClassInfo returns the equivalence class of given pod.
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
equivalencePod := ec.getEquivalencePod(pod) equivalencePod := ec.getEquivalencePod(pod)
if equivalencePod != nil { if equivalencePod != nil {
hash := fnv.New32a() hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod) hashutil.DeepHashObject(hash, equivalencePod)
return uint64(hash.Sum32()), true return &equivalenceClassInfo{
hash: uint64(hash.Sum32()),
}
} }
return 0, false return nil
} }

View File

@ -487,19 +487,22 @@ func TestGetHashEquivalencePod(t *testing.T) {
for _, test := range tests { for _, test := range tests {
for i, podInfo := range test.podInfoList { for i, podInfo := range test.podInfoList {
testPod := podInfo.pod testPod := podInfo.pod
hash, isValid := ecache.getHashEquivalencePod(testPod) eclassInfo := ecache.getEquivalenceClassInfo(testPod)
if isValid != podInfo.hashIsValid { if eclassInfo == nil && podInfo.hashIsValid {
t.Errorf("Failed: pod %v is expected to have valid hash", testPod) t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
} }
// NOTE(harry): the first element will be used as target so
// this logic can't verify more than two inequivalent pods if eclassInfo != nil {
if i == 0 { // NOTE(harry): the first element will be used as target so
targetHash = hash // this logic can't verify more than two inequivalent pods
targetPodInfo = podInfo if i == 0 {
} else { targetHash = eclassInfo.hash
if targetHash != hash { targetPodInfo = podInfo
if test.isEquivalent { } else {
t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod) if targetHash != eclassInfo.hash {
if test.isEquivalent {
t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod)
}
} }
} }
} }

View File

@ -312,9 +312,25 @@ func findNodesThatFit(
// We can use the same metadata producer for all nodes. // We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo) meta := metadataProducer(pod, nodeNameToInfo)
var equivCacheInfo *equivalenceClassInfo
if ecache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivCacheInfo = ecache.getEquivalenceClassInfo(pod)
}
checkNode := func(i int) { checkNode := func(i int) {
nodeName := nodes[i].Name nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates) fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
nodeNameToInfo[nodeName],
predicateFuncs,
ecache,
schedulingQueue,
alwaysCheckAllPredicates,
equivCacheInfo,
)
if err != nil { if err != nil {
predicateResultLock.Lock() predicateResultLock.Lock()
errs[err.Error()]++ errs[err.Error()]++
@ -389,6 +405,8 @@ func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata,
} }
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions. // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt. // This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable // When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority // on the node with all the existing pods on the node plus higher and equal priority
@ -404,11 +422,11 @@ func podFitsOnNode(
ecache *EquivalenceCache, ecache *EquivalenceCache,
queue SchedulingQueue, queue SchedulingQueue,
alwaysCheckAllPredicates bool, alwaysCheckAllPredicates bool,
equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) { ) (bool, []algorithm.PredicateFailureReason, error) {
var ( var (
equivalenceHash uint64
failedPredicates []algorithm.PredicateFailureReason
eCacheAvailable bool eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
invalid bool invalid bool
fit bool fit bool
reasons []algorithm.PredicateFailureReason reasons []algorithm.PredicateFailureReason
@ -416,10 +434,6 @@ func podFitsOnNode(
) )
predicateResults := make(map[string]HostPredicate) predicateResults := make(map[string]HostPredicate)
if ecache != nil {
// getHashEquivalencePod will return immediately if no equivalence pod found
equivalenceHash, eCacheAvailable = ecache.getHashEquivalencePod(pod)
}
podsAdded := false podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority // We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo. // nominated pods, we run them when those pods are added to meta and nodeInfo.
@ -450,13 +464,13 @@ func podFitsOnNode(
// Bypass eCache if node has any nominated pods. // Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change. // when pods are nominated or their nominations change.
eCacheAvailable = eCacheAvailable && !podsAdded eCacheAvailable = equivCacheInfo != nil && !podsAdded
for _, predicateKey := range predicates.PredicatesOrdering() { for _, predicateKey := range predicates.PredicatesOrdering() {
//TODO (yastij) : compute average predicate restrictiveness to export it as promethus metric //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist { if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable { if eCacheAvailable {
// PredicateWithECache will return its cached predicate results. // PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash) fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash)
} }
if !eCacheAvailable || invalid { if !eCacheAvailable || invalid {
@ -498,7 +512,7 @@ func podFitsOnNode(
for predKey, result := range predicateResults { for predKey, result := range predicateResults {
// update equivalence cache with newly computed fit & reasons // update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race? // TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivalenceHash) ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivCacheInfo.hash)
} }
} }
return len(failedPredicates) == 0, failedPredicates, nil return len(failedPredicates) == 0, failedPredicates, nil
@ -922,7 +936,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity // that we should check is if the "pod" is failing to schedule due to pod affinity
// failure. // failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false); !fits { if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil { if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
} }
@ -936,7 +950,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool { reprievePod := func(p *v1.Pod) bool {
addPod(p) addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false) fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits { if !fits {
removePod(p) removePod(p)
victims = append(victims, p) victims = append(victims, p)