diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index 2976a3c654b..1b6183ba99f 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" hashutil "k8s.io/kubernetes/pkg/util/hash" "github.com/golang/glog" @@ -37,7 +38,7 @@ const maxCacheEntries = 100 // 1. a map of AlgorithmCache with node name as key // 2. function to get equivalence pod type EquivalenceCache struct { - sync.RWMutex + mu sync.Mutex algorithmCache map[string]AlgorithmCache } @@ -70,18 +71,43 @@ func NewEquivalenceCache() *EquivalenceCache { } } -// UpdateCachedPredicateItem updates pod predicate for equivalence class -func (ec *EquivalenceCache) UpdateCachedPredicateItem( +// RunPredicate will return a cached predicate result. In case of a cache miss, the predicate will +// be run and its results cached for the next call. +// +// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. +func (ec *EquivalenceCache) RunPredicate( + pred algorithm.FitPredicate, + predicateKey string, + pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo, + equivClassInfo *equivalenceClassInfo, + cache schedulercache.Cache, +) (bool, []algorithm.PredicateFailureReason, error) { + ec.mu.Lock() + defer ec.mu.Unlock() + fit, reasons, invalid := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash) + if !invalid { + return fit, reasons, nil + } + fit, reasons, err := pred(pod, meta, nodeInfo) + if err != nil { + return fit, reasons, err + } + // Skip update if NodeInfo is stale. + if cache != nil && cache.IsUpToDate(nodeInfo) { + ec.updateResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash) + } + return fit, reasons, nil +} + +// updateResult updates the cached result of a predicate. +func (ec *EquivalenceCache) updateResult( podName, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64, - needLock bool, ) { - if needLock { - ec.Lock() - defer ec.Unlock() - } if _, exist := ec.algorithmCache[nodeName]; !exist { ec.algorithmCache[nodeName] = newAlgorithmCache() } @@ -103,19 +129,14 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem( glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem) } -// PredicateWithECache returns: -// 1. if fit -// 2. reasons if not fit -// 3. if this cache is invalid -// based on cached predicate results -func (ec *EquivalenceCache) PredicateWithECache( +// lookupResult returns cached predicate results: +// 1. if pod fit +// 2. reasons if pod did not fit +// 3. if cache item is not found +func (ec *EquivalenceCache) lookupResult( podName, nodeName, predicateKey string, - equivalenceHash uint64, needLock bool, + equivalenceHash uint64, ) (bool, []algorithm.PredicateFailureReason, bool) { - if needLock { - ec.RLock() - defer ec.RUnlock() - } glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName) if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { @@ -140,8 +161,8 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predi if len(predicateKeys) == 0 { return } - ec.Lock() - defer ec.Unlock() + ec.mu.Lock() + defer ec.mu.Unlock() if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { for predicateKey := range predicateKeys { algorithmCache.predicatesCache.Remove(predicateKey) @@ -155,8 +176,8 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey if len(predicateKeys) == 0 { return } - ec.Lock() - defer ec.Unlock() + ec.mu.Lock() + defer ec.mu.Unlock() // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates for _, algorithmCache := range ec.algorithmCache { for predicateKey := range predicateKeys { @@ -169,8 +190,8 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey // InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { - ec.Lock() - defer ec.Unlock() + ec.mu.Lock() + defer ec.mu.Unlock() delete(ec.algorithmCache, nodeName) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) } diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index 63e4fd780bf..0129fd2db2a 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "errors" "reflect" "sync" "testing" @@ -157,7 +158,154 @@ type predicateItemType struct { reasons []algorithm.PredicateFailureReason } -func TestUpdateCachedPredicateItem(t *testing.T) { +// upToDateCache is a fake Cache where IsUpToDate always returns true. +type upToDateCache = schedulertesting.FakeCache + +// staleNodeCache is a fake Cache where IsUpToDate always returns false. +type staleNodeCache struct { + schedulertesting.FakeCache +} + +func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false } + +// mockPredicate provides an algorithm.FitPredicate with pre-set return values. +type mockPredicate struct { + fit bool + reasons []algorithm.PredicateFailureReason + err error + callCount int +} + +func (p *mockPredicate) predicate(*v1.Pod, algorithm.PredicateMetadata, *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + p.callCount++ + return p.fit, p.reasons, p.err +} + +func TestRunPredicate(t *testing.T) { + tests := []struct { + name string + pred mockPredicate + cache schedulercache.Cache + expectFit, expectCacheHit, expectCacheWrite bool + expectedReasons []algorithm.PredicateFailureReason + expectedError string + }{ + { + name: "pod fits/cache hit", + pred: mockPredicate{}, + cache: &upToDateCache{}, + expectFit: true, + expectCacheHit: true, + expectCacheWrite: false, + }, + { + name: "pod fits/cache miss", + pred: mockPredicate{fit: true}, + cache: &upToDateCache{}, + expectFit: true, + expectCacheHit: false, + expectCacheWrite: true, + }, + { + name: "pod fits/cache miss/no write", + pred: mockPredicate{fit: true}, + cache: &staleNodeCache{}, + expectFit: true, + expectCacheHit: false, + expectCacheWrite: false, + }, + { + name: "pod doesn't fit/cache miss", + pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}}, + cache: &upToDateCache{}, + expectFit: false, + expectCacheHit: false, + expectCacheWrite: true, + expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, + }, + { + name: "pod doesn't fit/cache hit", + pred: mockPredicate{}, + cache: &upToDateCache{}, + expectFit: false, + expectCacheHit: true, + expectCacheWrite: false, + expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, + }, + { + name: "predicate error", + pred: mockPredicate{err: errors.New("This is expected")}, + cache: &upToDateCache{}, + expectFit: false, + expectCacheHit: false, + expectCacheWrite: false, + expectedError: "This is expected", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + node := schedulercache.NewNodeInfo() + node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}} + meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) + + ecache := NewEquivalenceCache() + equivClass := ecache.getEquivalenceClassInfo(pod) + if test.expectCacheHit { + ecache.mu.Lock() + ecache.updateResult(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash) + ecache.mu.Unlock() + } + + fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) + + if err != nil { + if err.Error() != test.expectedError { + t.Errorf("Expected error %v but got %v", test.expectedError, err) + } + } else if len(test.expectedError) > 0 { + t.Errorf("Expected error %v but got nil", test.expectedError) + } + if fit && !test.expectFit { + t.Errorf("pod should not fit") + } + if !fit && test.expectFit { + t.Errorf("pod should fit") + } + if len(reasons) != len(test.expectedReasons) { + t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons) + } else { + for i, reason := range reasons { + if reason != test.expectedReasons[i] { + t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons) + break + } + } + } + if test.expectCacheHit && test.pred.callCount != 0 { + t.Errorf("Predicate should not be called") + } + if !test.expectCacheHit && test.pred.callCount == 0 { + t.Errorf("Predicate should be called") + } + ecache.mu.Lock() + _, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) + ecache.mu.Unlock() + if invalid && test.expectCacheWrite { + t.Errorf("Cache write should happen") + } + if !test.expectCacheHit && test.expectCacheWrite && invalid { + t.Errorf("Cache write should happen") + } + if !test.expectCacheHit && !test.expectCacheWrite && !invalid { + t.Errorf("Cache write should not happen") + } + }) + } +} + +func TestUpdateResult(t *testing.T) { tests := []struct { name string pod string @@ -206,15 +354,16 @@ func TestUpdateCachedPredicateItem(t *testing.T) { test.equivalenceHash: predicateItem, }) } - ecache.UpdateCachedPredicateItem( + ecache.mu.Lock() + ecache.updateResult( test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash, - true, ) + ecache.mu.Unlock() value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey) if !ok { @@ -230,7 +379,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) { } } -func TestPredicateWithECache(t *testing.T) { +func TestLookupResult(t *testing.T) { tests := []struct { name string podName string @@ -316,15 +465,16 @@ func TestPredicateWithECache(t *testing.T) { for _, test := range tests { ecache := NewEquivalenceCache() // set cached item to equivalence cache - ecache.UpdateCachedPredicateItem( + ecache.mu.Lock() + ecache.updateResult( test.podName, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, - true, ) + ecache.mu.Unlock() // if we want to do invalid, invalid the cached item if test.expectedInvalidPredicateKey { predicateKeys := sets.NewString() @@ -332,12 +482,13 @@ func TestPredicateWithECache(t *testing.T) { ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache - fit, reasons, invalid := ecache.PredicateWithECache(test.podName, + ecache.mu.Lock() + fit, reasons, invalid := ecache.lookupResult(test.podName, test.nodeName, test.predicateKey, test.equivalenceHashForCalPredicate, - true, ) + ecache.mu.Unlock() // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { if invalid != test.expectedInvalidEquivalenceHash { @@ -524,15 +675,16 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { for _, test := range tests { // set cached item to equivalence cache - ecache.UpdateCachedPredicateItem( + ecache.mu.Lock() + ecache.updateResult( test.podName, test.nodeName, testPredicate, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, - true, ) + ecache.mu.Unlock() } // invalidate cached predicate for all nodes @@ -591,15 +743,16 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { for _, test := range tests { // set cached item to equivalence cache - ecache.UpdateCachedPredicateItem( + ecache.mu.Lock() + ecache.updateResult( test.podName, test.nodeName, testPredicate, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, - true, ) + ecache.mu.Unlock() } for _, test := range tests { diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index c222b80d314..960054b0d14 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -468,7 +468,6 @@ func podFitsOnNode( eCacheAvailable bool failedPredicates []algorithm.PredicateFailureReason ) - predicateResults := make(map[string]HostPredicate) podsAdded := false // We run predicates twice in some cases. If the node has greater or equal priority @@ -509,48 +508,11 @@ func podFitsOnNode( ) //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { - // Use an in-line function to guarantee invocation of ecache.Unlock() - // when the in-line function returns. - func() { - var invalid bool - if eCacheAvailable { - // Lock ecache here to avoid a race condition against cache invalidation invoked - // in event handlers. This race has existed despite locks in equivClassCacheimplementation. - ecache.Lock() - defer ecache.Unlock() - // PredicateWithECache will return its cached predicate results. - fit, reasons, invalid = ecache.PredicateWithECache( - pod.GetName(), info.Node().GetName(), - predicateKey, equivCacheInfo.hash, false) - } - - if !eCacheAvailable || invalid { - // we need to execute predicate functions since equivalence cache does not work - fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) - if err != nil { - return - } - - if eCacheAvailable { - // Store data to update equivClassCacheafter this loop. - if res, exists := predicateResults[predicateKey]; exists { - res.Fit = res.Fit && fit - res.FailReasons = append(res.FailReasons, reasons...) - predicateResults[predicateKey] = res - } else { - predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons} - } - // Skip update if NodeInfo is stale. - if cache != nil && cache.IsUpToDate(info) { - result := predicateResults[predicateKey] - ecache.UpdateCachedPredicateItem( - pod.GetName(), info.Node().GetName(), - predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) - } - } - } - }() - + if eCacheAvailable { + fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivCacheInfo, cache) + } else { + fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) + } if err != nil { return false, []algorithm.PredicateFailureReason{}, err }