diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index 2976a3c654b..acc912cedd8 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" hashutil "k8s.io/kubernetes/pkg/util/hash" "github.com/golang/glog" @@ -70,6 +71,36 @@ func NewEquivalenceCache() *EquivalenceCache { } } +// RunPredicate will return a cached predicate result. In case of a cache miss, the predicate will +// be run and its results cached for the next call. +// +// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. +func (ec *EquivalenceCache) RunPredicate( + pred algorithm.FitPredicate, + predicateKey string, + pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo, + equivClassInfo *equivalenceClassInfo, + cache schedulercache.Cache, +) (bool, []algorithm.PredicateFailureReason, error) { + ec.Lock() + defer ec.Unlock() + fit, reasons, invalid := ec.PredicateWithECache(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash, false) + if !invalid { + return fit, reasons, nil + } + fit, reasons, err := pred(pod, meta, nodeInfo) + if err != nil { + return fit, reasons, err + } + // Skip update if NodeInfo is stale. + if cache != nil && cache.IsUpToDate(nodeInfo) { + ec.UpdateCachedPredicateItem(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, fit, reasons, equivClassInfo.hash, false) + } + return fit, reasons, nil +} + // UpdateCachedPredicateItem updates pod predicate for equivalence class func (ec *EquivalenceCache) UpdateCachedPredicateItem( podName, nodeName, predicateKey string, diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index 63e4fd780bf..42623e3a7d7 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "errors" "reflect" "sync" "testing" @@ -157,6 +158,149 @@ type predicateItemType struct { reasons []algorithm.PredicateFailureReason } +// upToDateCache is a fake Cache where IsUpToDate always returns true. +type upToDateCache = schedulertesting.FakeCache + +// staleNodeCache is a fake Cache where IsUpToDate always returns false. +type staleNodeCache struct { + schedulertesting.FakeCache +} + +func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false } + +// mockPredicate provides an algorithm.FitPredicate with pre-set return values. +type mockPredicate struct { + fit bool + reasons []algorithm.PredicateFailureReason + err error + callCount int +} + +func (p *mockPredicate) predicate(*v1.Pod, algorithm.PredicateMetadata, *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + p.callCount++ + return p.fit, p.reasons, p.err +} + +func TestRunPredicate(t *testing.T) { + tests := []struct { + name string + pred mockPredicate + cache schedulercache.Cache + expectFit, expectCacheHit, expectCacheWrite bool + expectedReasons []algorithm.PredicateFailureReason + expectedError string + }{ + { + name: "pod fits/cache hit", + pred: mockPredicate{}, + cache: &upToDateCache{}, + expectFit: true, + expectCacheHit: true, + expectCacheWrite: false, + }, + { + name: "pod fits/cache miss", + pred: mockPredicate{fit: true}, + cache: &upToDateCache{}, + expectFit: true, + expectCacheHit: false, + expectCacheWrite: true, + }, + { + name: "pod fits/cache miss/no write", + pred: mockPredicate{fit: true}, + cache: &staleNodeCache{}, + expectFit: true, + expectCacheHit: false, + expectCacheWrite: false, + }, + { + name: "pod doesn't fit/cache miss", + pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}}, + cache: &upToDateCache{}, + expectFit: false, + expectCacheHit: false, + expectCacheWrite: true, + expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, + }, + { + name: "pod doesn't fit/cache hit", + pred: mockPredicate{}, + cache: &upToDateCache{}, + expectFit: false, + expectCacheHit: true, + expectCacheWrite: false, + expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, + }, + { + name: "predicate error", + pred: mockPredicate{err: errors.New("This is expected")}, + cache: &upToDateCache{}, + expectFit: false, + expectCacheHit: false, + expectCacheWrite: false, + expectedError: "This is expected", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + node := schedulercache.NewNodeInfo() + node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}} + meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) + + ecache := NewEquivalenceCache() + equivClass := ecache.getEquivalenceClassInfo(pod) + if test.expectCacheHit { + ecache.UpdateCachedPredicateItem(pod.Name, node.Node().Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash) + } + + fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) + + if err != nil { + if err.Error() != test.expectedError { + t.Errorf("Expected error %v but got %v", test.expectedError, err) + } + } else if len(test.expectedError) > 0 { + t.Errorf("Expected error %v but got nil", test.expectedError) + } + if fit && !test.expectFit { + t.Errorf("pod should not fit") + } + if !fit && test.expectFit { + t.Errorf("pod should fit") + } + if len(reasons) != len(test.expectedReasons) { + t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons) + } else { + for i, reason := range reasons { + if reason != test.expectedReasons[i] { + t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons) + break + } + } + } + if test.expectCacheHit && test.pred.callCount != 0 { + t.Errorf("Predicate should not be called") + } + if !test.expectCacheHit && test.pred.callCount == 0 { + t.Errorf("Predicate should be called") + } + _, _, invalid := ecache.PredicateWithECache(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) + if invalid && test.expectCacheWrite { + t.Errorf("Cache write should happen") + } + if !test.expectCacheHit && test.expectCacheWrite && invalid { + t.Errorf("Cache write should happen") + } + if !test.expectCacheHit && !test.expectCacheWrite && !invalid { + t.Errorf("Cache write should not happen") + } + }) + } +} + func TestUpdateCachedPredicateItem(t *testing.T) { tests := []struct { name string