Merge pull request #63178 from misterikkit/ecache-locking

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

scheduler: clean up and simplify equivalence cache locking

**What this PR does / why we need it**:
This is a cleanup of the locking code for equivalence cache. There is no change to the current logic or locking. This PR has a couple of implications, though.
1. It deletes (unreachable) code that could have been used to cache predicate results that consider nominated pods.
2. Callers should no longer lock/unlock the eCache manually, so coordinating that lock with other synchronization is restricted.


**Special notes for your reviewer**:

**Release note**:
<!--  Write your release note:
1. Enter your extended release note in the below block. If the PR requires additional action from users switching to the new release, include the string "**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #
**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #
action required".
2. If no release note is required, just write "NONE".
-->
```release-note
NONE
```
/sig scheduling
/kind cleanup
This commit is contained in:
Kubernetes Submit Queue 2018-04-30 19:29:49 -07:00 committed by GitHub
commit 12a6236148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 216 additions and 80 deletions

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
"github.com/golang/glog" "github.com/golang/glog"
@ -37,7 +38,7 @@ const maxCacheEntries = 100
// 1. a map of AlgorithmCache with node name as key // 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod // 2. function to get equivalence pod
type EquivalenceCache struct { type EquivalenceCache struct {
sync.RWMutex mu sync.Mutex
algorithmCache map[string]AlgorithmCache algorithmCache map[string]AlgorithmCache
} }
@ -70,18 +71,43 @@ func NewEquivalenceCache() *EquivalenceCache {
} }
} }
// UpdateCachedPredicateItem updates pod predicate for equivalence class // RunPredicate will return a cached predicate result. In case of a cache miss, the predicate will
func (ec *EquivalenceCache) UpdateCachedPredicateItem( // be run and its results cached for the next call.
//
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
func (ec *EquivalenceCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
equivClassInfo *equivalenceClassInfo,
cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) {
ec.mu.Lock()
defer ec.mu.Unlock()
fit, reasons, invalid := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash)
if !invalid {
return fit, reasons, nil
}
fit, reasons, err := pred(pod, meta, nodeInfo)
if err != nil {
return fit, reasons, err
}
// Skip update if NodeInfo is stale.
if cache != nil && cache.IsUpToDate(nodeInfo) {
ec.updateResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash)
}
return fit, reasons, nil
}
// updateResult updates the cached result of a predicate.
func (ec *EquivalenceCache) updateResult(
podName, nodeName, predicateKey string, podName, nodeName, predicateKey string,
fit bool, fit bool,
reasons []algorithm.PredicateFailureReason, reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64, equivalenceHash uint64,
needLock bool,
) { ) {
if needLock {
ec.Lock()
defer ec.Unlock()
}
if _, exist := ec.algorithmCache[nodeName]; !exist { if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = newAlgorithmCache() ec.algorithmCache[nodeName] = newAlgorithmCache()
} }
@ -103,19 +129,14 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem) glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem)
} }
// PredicateWithECache returns: // lookupResult returns cached predicate results:
// 1. if fit // 1. if pod fit
// 2. reasons if not fit // 2. reasons if pod did not fit
// 3. if this cache is invalid // 3. if cache item is not found
// based on cached predicate results func (ec *EquivalenceCache) lookupResult(
func (ec *EquivalenceCache) PredicateWithECache(
podName, nodeName, predicateKey string, podName, nodeName, predicateKey string,
equivalenceHash uint64, needLock bool, equivalenceHash uint64,
) (bool, []algorithm.PredicateFailureReason, bool) { ) (bool, []algorithm.PredicateFailureReason, bool) {
if needLock {
ec.RLock()
defer ec.RUnlock()
}
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache",
predicateKey, podName, nodeName) predicateKey, podName, nodeName)
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
@ -140,8 +161,8 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predi
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
ec.Lock() ec.mu.Lock()
defer ec.Unlock() defer ec.mu.Unlock()
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys { for predicateKey := range predicateKeys {
algorithmCache.predicatesCache.Remove(predicateKey) algorithmCache.predicatesCache.Remove(predicateKey)
@ -155,8 +176,8 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
ec.Lock() ec.mu.Lock()
defer ec.Unlock() defer ec.mu.Unlock()
// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
for _, algorithmCache := range ec.algorithmCache { for _, algorithmCache := range ec.algorithmCache {
for predicateKey := range predicateKeys { for predicateKey := range predicateKeys {
@ -169,8 +190,8 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey
// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid // InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
ec.Lock() ec.mu.Lock()
defer ec.Unlock() defer ec.mu.Unlock()
delete(ec.algorithmCache, nodeName) delete(ec.algorithmCache, nodeName)
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package core package core
import ( import (
"errors"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
@ -157,7 +158,154 @@ type predicateItemType struct {
reasons []algorithm.PredicateFailureReason reasons []algorithm.PredicateFailureReason
} }
func TestUpdateCachedPredicateItem(t *testing.T) { // upToDateCache is a fake Cache where IsUpToDate always returns true.
type upToDateCache = schedulertesting.FakeCache
// staleNodeCache is a fake Cache where IsUpToDate always returns false.
type staleNodeCache struct {
schedulertesting.FakeCache
}
func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false }
// mockPredicate provides an algorithm.FitPredicate with pre-set return values.
type mockPredicate struct {
fit bool
reasons []algorithm.PredicateFailureReason
err error
callCount int
}
func (p *mockPredicate) predicate(*v1.Pod, algorithm.PredicateMetadata, *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
p.callCount++
return p.fit, p.reasons, p.err
}
func TestRunPredicate(t *testing.T) {
tests := []struct {
name string
pred mockPredicate
cache schedulercache.Cache
expectFit, expectCacheHit, expectCacheWrite bool
expectedReasons []algorithm.PredicateFailureReason
expectedError string
}{
{
name: "pod fits/cache hit",
pred: mockPredicate{},
cache: &upToDateCache{},
expectFit: true,
expectCacheHit: true,
expectCacheWrite: false,
},
{
name: "pod fits/cache miss",
pred: mockPredicate{fit: true},
cache: &upToDateCache{},
expectFit: true,
expectCacheHit: false,
expectCacheWrite: true,
},
{
name: "pod fits/cache miss/no write",
pred: mockPredicate{fit: true},
cache: &staleNodeCache{},
expectFit: true,
expectCacheHit: false,
expectCacheWrite: false,
},
{
name: "pod doesn't fit/cache miss",
pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}},
cache: &upToDateCache{},
expectFit: false,
expectCacheHit: false,
expectCacheWrite: true,
expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate},
},
{
name: "pod doesn't fit/cache hit",
pred: mockPredicate{},
cache: &upToDateCache{},
expectFit: false,
expectCacheHit: true,
expectCacheWrite: false,
expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate},
},
{
name: "predicate error",
pred: mockPredicate{err: errors.New("This is expected")},
cache: &upToDateCache{},
expectFit: false,
expectCacheHit: false,
expectCacheWrite: false,
expectedError: "This is expected",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}})
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
ecache := NewEquivalenceCache()
equivClass := ecache.getEquivalenceClassInfo(pod)
if test.expectCacheHit {
ecache.mu.Lock()
ecache.updateResult(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash)
ecache.mu.Unlock()
}
fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
if err != nil {
if err.Error() != test.expectedError {
t.Errorf("Expected error %v but got %v", test.expectedError, err)
}
} else if len(test.expectedError) > 0 {
t.Errorf("Expected error %v but got nil", test.expectedError)
}
if fit && !test.expectFit {
t.Errorf("pod should not fit")
}
if !fit && test.expectFit {
t.Errorf("pod should fit")
}
if len(reasons) != len(test.expectedReasons) {
t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons)
} else {
for i, reason := range reasons {
if reason != test.expectedReasons[i] {
t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons)
break
}
}
}
if test.expectCacheHit && test.pred.callCount != 0 {
t.Errorf("Predicate should not be called")
}
if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called")
}
ecache.mu.Lock()
_, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
ecache.mu.Unlock()
if invalid && test.expectCacheWrite {
t.Errorf("Cache write should happen")
}
if !test.expectCacheHit && test.expectCacheWrite && invalid {
t.Errorf("Cache write should happen")
}
if !test.expectCacheHit && !test.expectCacheWrite && !invalid {
t.Errorf("Cache write should not happen")
}
})
}
}
func TestUpdateResult(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pod string pod string
@ -206,15 +354,16 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
test.equivalenceHash: predicateItem, test.equivalenceHash: predicateItem,
}) })
} }
ecache.UpdateCachedPredicateItem( ecache.mu.Lock()
ecache.updateResult(
test.pod, test.pod,
test.nodeName, test.nodeName,
test.predicateKey, test.predicateKey,
test.fit, test.fit,
test.reasons, test.reasons,
test.equivalenceHash, test.equivalenceHash,
true,
) )
ecache.mu.Unlock()
value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey) value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey)
if !ok { if !ok {
@ -230,7 +379,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
} }
} }
func TestPredicateWithECache(t *testing.T) { func TestLookupResult(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
podName string podName string
@ -316,15 +465,16 @@ func TestPredicateWithECache(t *testing.T) {
for _, test := range tests { for _, test := range tests {
ecache := NewEquivalenceCache() ecache := NewEquivalenceCache()
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.UpdateCachedPredicateItem( ecache.mu.Lock()
ecache.updateResult(
test.podName, test.podName,
test.nodeName, test.nodeName,
test.predicateKey, test.predicateKey,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
true,
) )
ecache.mu.Unlock()
// if we want to do invalid, invalid the cached item // if we want to do invalid, invalid the cached item
if test.expectedInvalidPredicateKey { if test.expectedInvalidPredicateKey {
predicateKeys := sets.NewString() predicateKeys := sets.NewString()
@ -332,12 +482,13 @@ func TestPredicateWithECache(t *testing.T) {
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
} }
// calculate predicate with equivalence cache // calculate predicate with equivalence cache
fit, reasons, invalid := ecache.PredicateWithECache(test.podName, ecache.mu.Lock()
fit, reasons, invalid := ecache.lookupResult(test.podName,
test.nodeName, test.nodeName,
test.predicateKey, test.predicateKey,
test.equivalenceHashForCalPredicate, test.equivalenceHashForCalPredicate,
true,
) )
ecache.mu.Unlock()
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
if invalid != test.expectedInvalidEquivalenceHash { if invalid != test.expectedInvalidEquivalenceHash {
@ -524,15 +675,16 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
for _, test := range tests { for _, test := range tests {
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.UpdateCachedPredicateItem( ecache.mu.Lock()
ecache.updateResult(
test.podName, test.podName,
test.nodeName, test.nodeName,
testPredicate, testPredicate,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
true,
) )
ecache.mu.Unlock()
} }
// invalidate cached predicate for all nodes // invalidate cached predicate for all nodes
@ -591,15 +743,16 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
for _, test := range tests { for _, test := range tests {
// set cached item to equivalence cache // set cached item to equivalence cache
ecache.UpdateCachedPredicateItem( ecache.mu.Lock()
ecache.updateResult(
test.podName, test.podName,
test.nodeName, test.nodeName,
testPredicate, testPredicate,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
true,
) )
ecache.mu.Unlock()
} }
for _, test := range tests { for _, test := range tests {

View File

@ -468,7 +468,6 @@ func podFitsOnNode(
eCacheAvailable bool eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason failedPredicates []algorithm.PredicateFailureReason
) )
predicateResults := make(map[string]HostPredicate)
podsAdded := false podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority // We run predicates twice in some cases. If the node has greater or equal priority
@ -509,48 +508,11 @@ func podFitsOnNode(
) )
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist { if predicate, exist := predicateFuncs[predicateKey]; exist {
// Use an in-line function to guarantee invocation of ecache.Unlock() if eCacheAvailable {
// when the in-line function returns. fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivCacheInfo, cache)
func() { } else {
var invalid bool fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if eCacheAvailable { }
// Lock ecache here to avoid a race condition against cache invalidation invoked
// in event handlers. This race has existed despite locks in equivClassCacheimplementation.
ecache.Lock()
defer ecache.Unlock()
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(
pod.GetName(), info.Node().GetName(),
predicateKey, equivCacheInfo.hash, false)
}
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return
}
if eCacheAvailable {
// Store data to update equivClassCacheafter this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
// Skip update if NodeInfo is stale.
if cache != nil && cache.IsUpToDate(info) {
result := predicateResults[predicateKey]
ecache.UpdateCachedPredicateItem(
pod.GetName(), info.Node().GetName(),
predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
}
}
}
}()
if err != nil { if err != nil {
return false, []algorithm.PredicateFailureReason{}, err return false, []algorithm.PredicateFailureReason{}, err
} }