diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index 8322936ff8b..da50d2a81f7 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -35,7 +35,7 @@ import ( // 1. a map of AlgorithmCache with node name as key // 2. function to get equivalence pod type EquivalenceCache struct { - mu sync.Mutex + mu sync.RWMutex algorithmCache map[string]AlgorithmCache } @@ -72,9 +72,6 @@ func (ec *EquivalenceCache) RunPredicate( equivClassInfo *equivalenceClassInfo, cache schedulercache.Cache, ) (bool, []algorithm.PredicateFailureReason, error) { - ec.mu.Lock() - defer ec.mu.Unlock() - if nodeInfo == nil || nodeInfo.Node() == nil { // This may happen during tests. return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") @@ -88,20 +85,32 @@ func (ec *EquivalenceCache) RunPredicate( if err != nil { return fit, reasons, err } - // Skip update if NodeInfo is stale. - if cache != nil && cache.IsUpToDate(nodeInfo) { - ec.updateResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash) + if cache != nil { + ec.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClassInfo.hash, cache, nodeInfo) } return fit, reasons, nil } // updateResult updates the cached result of a predicate. func (ec *EquivalenceCache) updateResult( - podName, nodeName, predicateKey string, + podName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64, + cache schedulercache.Cache, + nodeInfo *schedulercache.NodeInfo, ) { + ec.mu.Lock() + defer ec.mu.Unlock() + if nodeInfo == nil || nodeInfo.Node() == nil { + // This may happen during tests. + return + } + // Skip update if NodeInfo is stale. + if !cache.IsUpToDate(nodeInfo) { + return + } + nodeName := nodeInfo.Node().GetName() if _, exist := ec.algorithmCache[nodeName]; !exist { ec.algorithmCache[nodeName] = AlgorithmCache{} } @@ -130,6 +139,8 @@ func (ec *EquivalenceCache) lookupResult( podName, nodeName, predicateKey string, equivalenceHash uint64, ) (bool, []algorithm.PredicateFailureReason, bool) { + ec.mu.RLock() + defer ec.mu.RUnlock() glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName) if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist { diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index 3b33917a14d..5411ffb0567 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -253,9 +253,7 @@ func TestRunPredicate(t *testing.T) { ecache := NewEquivalenceCache() equivClass := ecache.getEquivalenceClassInfo(pod) if test.expectCacheHit { - ecache.mu.Lock() - ecache.updateResult(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash) - ecache.mu.Unlock() + ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) } fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) @@ -289,9 +287,7 @@ func TestRunPredicate(t *testing.T) { if !test.expectCacheHit && test.pred.callCount == 0 { t.Errorf("Predicate should be called") } - ecache.mu.Lock() _, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) - ecache.mu.Unlock() if invalid && test.expectCacheWrite { t.Errorf("Cache write should happen") } @@ -316,6 +312,7 @@ func TestUpdateResult(t *testing.T) { equivalenceHash uint64 expectPredicateMap bool expectCacheItem HostPredicate + cache schedulercache.Cache }{ { name: "test 1", @@ -328,6 +325,7 @@ func TestUpdateResult(t *testing.T) { expectCacheItem: HostPredicate{ Fit: true, }, + cache: &upToDateCache{}, }, { name: "test 2", @@ -340,6 +338,7 @@ func TestUpdateResult(t *testing.T) { expectCacheItem: HostPredicate{ Fit: false, }, + cache: &upToDateCache{}, }, } for _, test := range tests { @@ -354,16 +353,18 @@ func TestUpdateResult(t *testing.T) { test.equivalenceHash: predicateItem, } } - ecache.mu.Lock() + + node := schedulercache.NewNodeInfo() + node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) ecache.updateResult( test.pod, - test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash, + test.cache, + node, ) - ecache.mu.Unlock() cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey] if !ok { @@ -390,6 +391,7 @@ func TestLookupResult(t *testing.T) { expectedInvalidPredicateKey bool expectedInvalidEquivalenceHash bool expectedPredicateItem predicateItemType + cache schedulercache.Cache }{ { name: "test 1", @@ -407,6 +409,7 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{}, }, + cache: &upToDateCache{}, }, { name: "test 2", @@ -423,6 +426,7 @@ func TestLookupResult(t *testing.T) { fit: true, reasons: []algorithm.PredicateFailureReason{}, }, + cache: &upToDateCache{}, }, { name: "test 3", @@ -440,6 +444,7 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, + cache: &upToDateCache{}, }, { name: "test 4", @@ -458,22 +463,24 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{}, }, + cache: &upToDateCache{}, }, } for _, test := range tests { ecache := NewEquivalenceCache() + node := schedulercache.NewNodeInfo() + node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) // set cached item to equivalence cache - ecache.mu.Lock() ecache.updateResult( test.podName, - test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, + test.cache, + node, ) - ecache.mu.Unlock() // if we want to do invalid, invalid the cached item if test.expectedInvalidPredicateKey { predicateKeys := sets.NewString() @@ -481,13 +488,11 @@ func TestLookupResult(t *testing.T) { ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache - ecache.mu.Lock() fit, reasons, invalid := ecache.lookupResult(test.podName, test.nodeName, test.predicateKey, test.equivalenceHashForCalPredicate, ) - ecache.mu.Unlock() // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { if invalid != test.expectedInvalidEquivalenceHash { @@ -637,6 +642,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { nodeName string equivalenceHashForUpdatePredicate uint64 cachedItem predicateItemType + cache schedulercache.Cache }{ { podName: "testPod", @@ -648,6 +654,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { predicates.ErrPodNotFitsHostPorts, }, }, + cache: &upToDateCache{}, }, { podName: "testPod", @@ -659,6 +666,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { predicates.ErrPodNotFitsHostPorts, }, }, + cache: &upToDateCache{}, }, { podName: "testPod", @@ -667,22 +675,24 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { cachedItem: predicateItemType{ fit: true, }, + cache: &upToDateCache{}, }, } ecache := NewEquivalenceCache() for _, test := range tests { + node := schedulercache.NewNodeInfo() + node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) // set cached item to equivalence cache - ecache.mu.Lock() ecache.updateResult( test.podName, - test.nodeName, testPredicate, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, + test.cache, + node, ) - ecache.mu.Unlock() } // invalidate cached predicate for all nodes @@ -708,6 +718,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { nodeName string equivalenceHashForUpdatePredicate uint64 cachedItem predicateItemType + cache schedulercache.Cache }{ { podName: "testPod", @@ -717,6 +728,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, + cache: &upToDateCache{}, }, { podName: "testPod", @@ -726,6 +738,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, + cache: &upToDateCache{}, }, { podName: "testPod", @@ -734,22 +747,24 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { cachedItem: predicateItemType{ fit: true, }, + cache: &upToDateCache{}, }, } ecache := NewEquivalenceCache() for _, test := range tests { + node := schedulercache.NewNodeInfo() + node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) // set cached item to equivalence cache - ecache.mu.Lock() ecache.updateResult( test.podName, - test.nodeName, testPredicate, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, + test.cache, + node, ) - ecache.mu.Unlock() } for _, test := range tests {