Merge pull request #59479 from tossmilestone/avoid-ecahe-update-race

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Avoid race condition when updating equivalence cache

**What this PR does / why we need it**:
Lock the ecache to update the ecache on each predicate running, to avoid race condition.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fix #58507 

**Special notes for your reviewer**:
None

**Release note**:

```release-note
None
```
This commit is contained in:
Kubernetes Submit Queue 2018-02-12 16:38:07 -08:00 committed by GitHub
commit ab2e1cb02a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 19 deletions

View File

@ -76,9 +76,12 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(
fit bool,
reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64,
needLock bool,
) {
ec.Lock()
defer ec.Unlock()
if needLock {
ec.Lock()
defer ec.Unlock()
}
if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = newAlgorithmCache()
}
@ -107,10 +110,12 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(
podName, nodeName, predicateKey string,
equivalenceHash uint64,
equivalenceHash uint64, needLock bool,
) (bool, []algorithm.PredicateFailureReason, bool) {
ec.RLock()
defer ec.RUnlock()
if needLock {
ec.RLock()
defer ec.RUnlock()
}
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
predicateKey, podName, nodeName)
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {

View File

@ -90,6 +90,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
test.fit,
test.reasons,
test.equivalenceHash,
true,
)
value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey)
@ -201,6 +202,7 @@ func TestPredicateWithECache(t *testing.T) {
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
true,
)
// if we want to do invalid, invalid the cached item
if test.expectedInvalidPredicateKey {
@ -213,6 +215,7 @@ func TestPredicateWithECache(t *testing.T) {
test.nodeName,
test.predicateKey,
test.equivalenceHashForCalPredicate,
true,
)
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
@ -564,6 +567,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
true,
)
}
@ -632,6 +636,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
true,
)
}

View File

@ -108,7 +108,7 @@ type genericScheduler struct {
// Schedule tries to schedule the given pod to one of node in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a Fiterror error with reasons.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond)
@ -469,8 +469,11 @@ func podFitsOnNode(
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
// Lock ecache here to avoid a race condition against cache invalidation invoked
// in event handlers. This race has existed despite locks in eCache implementation.
ecache.Lock()
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash)
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash, false)
}
if !eCacheAvailable || invalid {
@ -488,8 +491,15 @@ func podFitsOnNode(
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
result := predicateResults[predicateKey]
ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
}
}
if eCacheAvailable {
ecache.Unlock()
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
@ -503,18 +513,6 @@ func podFitsOnNode(
}
}
// TODO(bsalamat): This way of updating equiv. cache has a race condition against
// cache invalidations invoked in event handlers. This race has existed despite locks
// in eCache implementation. If cache is invalidated after a predicate is executed
// and before we update the cache, the updates should not be written to the cache.
if eCacheAvailable {
nodeName := info.Node().GetName()
for predKey, result := range predicateResults {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod.GetName(), nodeName, predKey, result.Fit, result.FailReasons, equivCacheInfo.hash)
}
}
return len(failedPredicates) == 0, failedPredicates, nil
}