From 482dc31937d0b4a16ee58ea3b3ac6f75e614124a Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Thu, 1 Feb 2018 22:47:07 -0800 Subject: [PATCH] Ensure euqiv hash calculation per schedule --- pkg/scheduler/core/equivalence_cache.go | 19 +++++++--- pkg/scheduler/core/equivalence_cache_test.go | 25 ++++++------ pkg/scheduler/core/generic_scheduler.go | 40 +++++++++++++------- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index 5d9bda7eafe..cd44e66ed9f 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -208,15 +208,22 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) } -// getHashEquivalencePod returns the hash of equivalence pod. -// 1. equivalenceHash -// 2. if equivalence hash is valid -func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) (uint64, bool) { +// equivalenceClassInfo holds equivalence hash which is used for checking equivalence cache. +// We will pass this to podFitsOnNode to ensure equivalence hash is only calculated per schedule. +type equivalenceClassInfo struct { + // Equivalence hash. + hash uint64 +} + +// getEquivalenceClassInfo returns the equivalence class of given pod. +func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo { equivalencePod := ec.getEquivalencePod(pod) if equivalencePod != nil { hash := fnv.New32a() hashutil.DeepHashObject(hash, equivalencePod) - return uint64(hash.Sum32()), true + return &equivalenceClassInfo{ + hash: uint64(hash.Sum32()), + } } - return 0, false + return nil } diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index 54b903e2fbd..35d7592761e 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -487,19 +487,22 @@ func TestGetHashEquivalencePod(t *testing.T) { for _, test := range tests { for i, podInfo := range test.podInfoList { testPod := podInfo.pod - hash, isValid := ecache.getHashEquivalencePod(testPod) - if isValid != podInfo.hashIsValid { + eclassInfo := ecache.getEquivalenceClassInfo(testPod) + if eclassInfo == nil && podInfo.hashIsValid { t.Errorf("Failed: pod %v is expected to have valid hash", testPod) } - // NOTE(harry): the first element will be used as target so - // this logic can't verify more than two inequivalent pods - if i == 0 { - targetHash = hash - targetPodInfo = podInfo - } else { - if targetHash != hash { - if test.isEquivalent { - t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod) + + if eclassInfo != nil { + // NOTE(harry): the first element will be used as target so + // this logic can't verify more than two inequivalent pods + if i == 0 { + targetHash = eclassInfo.hash + targetPodInfo = podInfo + } else { + if targetHash != eclassInfo.hash { + if test.isEquivalent { + t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod) + } } } } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index da04ff45ad6..017d1505dc5 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -312,9 +312,25 @@ func findNodesThatFit( // We can use the same metadata producer for all nodes. meta := metadataProducer(pod, nodeNameToInfo) + + var equivCacheInfo *equivalenceClassInfo + if ecache != nil { + // getEquivalenceClassInfo will return immediately if no equivalence pod found + equivCacheInfo = ecache.getEquivalenceClassInfo(pod) + } + checkNode := func(i int) { nodeName := nodes[i].Name - fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates) + fits, failedPredicates, err := podFitsOnNode( + pod, + meta, + nodeNameToInfo[nodeName], + predicateFuncs, + ecache, + schedulingQueue, + alwaysCheckAllPredicates, + equivCacheInfo, + ) if err != nil { predicateResultLock.Lock() errs[err.Error()]++ @@ -389,6 +405,8 @@ func addNominatedPods(podPriority int32, meta algorithm.PredicateMetadata, } // podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions. +// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached +// predicate results as possible. // This function is called from two different places: Schedule and Preempt. // When it is called from Schedule, we want to test whether the pod is schedulable // on the node with all the existing pods on the node plus higher and equal priority @@ -404,11 +422,11 @@ func podFitsOnNode( ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, + equivCacheInfo *equivalenceClassInfo, ) (bool, []algorithm.PredicateFailureReason, error) { var ( - equivalenceHash uint64 - failedPredicates []algorithm.PredicateFailureReason eCacheAvailable bool + failedPredicates []algorithm.PredicateFailureReason invalid bool fit bool reasons []algorithm.PredicateFailureReason @@ -416,10 +434,6 @@ func podFitsOnNode( ) predicateResults := make(map[string]HostPredicate) - if ecache != nil { - // getHashEquivalencePod will return immediately if no equivalence pod found - equivalenceHash, eCacheAvailable = ecache.getHashEquivalencePod(pod) - } podsAdded := false // We run predicates twice in some cases. If the node has greater or equal priority // nominated pods, we run them when those pods are added to meta and nodeInfo. @@ -450,13 +464,13 @@ func podFitsOnNode( // Bypass eCache if node has any nominated pods. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations // when pods are nominated or their nominations change. - eCacheAvailable = eCacheAvailable && !podsAdded + eCacheAvailable = equivCacheInfo != nil && !podsAdded for _, predicateKey := range predicates.PredicatesOrdering() { - //TODO (yastij) : compute average predicate restrictiveness to export it as promethus metric + //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { if eCacheAvailable { // PredicateWithECache will return its cached predicate results. - fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash) + fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash) } if !eCacheAvailable || invalid { @@ -498,7 +512,7 @@ func podFitsOnNode( for predKey, result := range predicateResults { // update equivalence cache with newly computed fit & reasons // TODO(resouer) should we do this in another thread? any race? - ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivalenceHash) + ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivCacheInfo.hash) } } return len(failedPredicates) == 0, failedPredicates, nil @@ -922,7 +936,7 @@ func selectVictimsOnNode( // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. - if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false); !fits { + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -936,7 +950,7 @@ func selectVictimsOnNode( violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) reprievePod := func(p *v1.Pod) bool { addPod(p) - fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil) if !fits { removePod(p) victims = append(victims, p)