diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index fcb2c9455a7..e07d6b896e2 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -18,13 +18,19 @@ package core import ( "reflect" + "sync" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + algorithmpredicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) type predicateItemType struct { @@ -649,3 +655,101 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { } } } + +// syncingMockCache delegates method calls to an actual Cache, +// but calls to UpdateNodeNameToInfoMap synchronize with the test. +type syncingMockCache struct { + schedulercache.Cache + cycleStart, cacheInvalidated chan struct{} + once sync.Once +} + +// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it +// synchronizes with the test. +// +// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use +// this point to signal to the test that a scheduling cycle has started. +func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { + err := c.Cache.UpdateNodeNameToInfoMap(infoMap) + c.once.Do(func() { + c.cycleStart <- struct{}{} + <-c.cacheInvalidated + }) + return err +} + +// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly +// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event +// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired. +func TestEquivalenceCacheInvalidationRace(t *testing.T) { + // Create a predicate that returns false the first time and true on subsequent calls. + podWillFit := false + var callCount int + testPredicate := func(pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + callCount++ + if !podWillFit { + podWillFit = true + return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil + } + return true, nil, nil + } + + // Set up the mock cache. + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}) + mockCache := &syncingMockCache{ + Cache: cache, + cycleStart: make(chan struct{}), + cacheInvalidated: make(chan struct{}), + } + + fakeGetEquivalencePod := func(pod *v1.Pod) interface{} { return pod } + eCache := NewEquivalenceCache(fakeGetEquivalencePod) + // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before + // the equivalence cache would be updated. + go func() { + <-mockCache.cycleStart + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, + Spec: v1.PodSpec{NodeName: "machine1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod to cache: %v", err) + } + eCache.InvalidateAllCachedPredicateItemOfNode("machine1") + mockCache.cacheInvalidated <- struct{}{} + }() + + // Set up the scheduler. + predicates := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} + algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + scheduler := NewGenericScheduler( + mockCache, + eCache, + NewSchedulingQueue(), + predicates, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, pvcLister, true, false) + + // First scheduling attempt should fail. + nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} + machine, err := scheduler.Schedule(pod, nodeLister) + if machine != "" || err == nil { + t.Error("First scheduling attempt did not fail") + } + + // Second scheduling attempt should succeed because cache was invalidated. + _, err = scheduler.Schedule(pod, nodeLister) + if err != nil { + t.Errorf("Second scheduling attempt failed: %v", err) + } + if callCount != 2 { + t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) + } +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 3f7567f92b1..5b2f76dae89 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -128,7 +128,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister trace.Step("Computing predicates") startPredicateEvalTime := time.Now() - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates) + filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes) if err != nil { return "", err } @@ -325,21 +325,11 @@ func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName s // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func findNodesThatFit( - pod *v1.Pod, - nodeNameToInfo map[string]*schedulercache.NodeInfo, - nodes []*v1.Node, - predicateFuncs map[string]algorithm.FitPredicate, - extenders []algorithm.SchedulerExtender, - metadataProducer algorithm.PredicateMetadataProducer, - ecache *EquivalenceCache, - schedulingQueue SchedulingQueue, - alwaysCheckAllPredicates bool, -) ([]*v1.Node, FailedPredicateMap, error) { +func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} - if len(predicateFuncs) == 0 { + if len(g.predicates) == 0 { filtered = nodes } else { // Create filtered list with enough space to avoid growing it @@ -350,12 +340,12 @@ func findNodesThatFit( var filteredLen int32 // We can use the same metadata producer for all nodes. - meta := metadataProducer(pod, nodeNameToInfo) + meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) var equivCacheInfo *equivalenceClassInfo - if ecache != nil { + if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found - equivCacheInfo = ecache.getEquivalenceClassInfo(pod) + equivCacheInfo = g.equivalenceCache.getEquivalenceClassInfo(pod) } checkNode := func(i int) { @@ -363,11 +353,12 @@ func findNodesThatFit( fits, failedPredicates, err := podFitsOnNode( pod, meta, - nodeNameToInfo[nodeName], - predicateFuncs, - ecache, - schedulingQueue, - alwaysCheckAllPredicates, + g.cachedNodeInfoMap[nodeName], + g.predicates, + g.cache, + g.equivalenceCache, + g.schedulingQueue, + g.alwaysCheckAllPredicates, equivCacheInfo, ) if err != nil { @@ -391,12 +382,12 @@ func findNodesThatFit( } } - if len(filtered) > 0 && len(extenders) != 0 { - for _, extender := range extenders { + if len(filtered) > 0 && len(g.extenders) != 0 { + for _, extender := range g.extenders { if !extender.IsInterested(pod) { continue } - filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo) + filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap) if err != nil { if extender.IsIgnorable() { glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", @@ -467,6 +458,7 @@ func podFitsOnNode( meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, + cache schedulercache.Cache, ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, @@ -548,10 +540,13 @@ func podFitsOnNode( } else { predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons} } - result := predicateResults[predicateKey] - ecache.UpdateCachedPredicateItem( - pod.GetName(), info.Node().GetName(), - predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) + // Skip update if NodeInfo is stale. + if cache != nil && cache.IsUpToDate(info) { + result := predicateResults[predicateKey] + ecache.UpdateCachedPredicateItem( + pod.GetName(), info.Node().GetName(), + predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) + } } } }() @@ -976,7 +971,7 @@ func selectVictimsOnNode( // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. - if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits { + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -990,7 +985,7 @@ func selectVictimsOnNode( violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) reprievePod := func(p *v1.Pod) bool { addPod(p) - fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil) if !fits { removePod(p) victims = append(victims, p) diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 83efdad37db..01c6d331ff5 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -432,16 +432,35 @@ func TestGenericScheduler(t *testing.T) { } } -func TestFindFitAllError(t *testing.T) { +// makeScheduler makes a simple genericScheduler for testing. +func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler { algorithmpredicates.SetPredicatesOrdering(order) - nodes := []string{"3", "2", "1"} - predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate} - nodeNameToInfo := map[string]*schedulercache.NodeInfo{ - "3": schedulercache.NewNodeInfo(), - "2": schedulercache.NewNodeInfo(), - "1": schedulercache.NewNodeInfo(), + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + for _, n := range nodes { + cache.AddNode(n) } - _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + + s := NewGenericScheduler( + cache, + nil, + NewSchedulingQueue(), + predicates, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, nil, false, false) + cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) + return s.(*genericScheduler) + +} + +func TestFindFitAllError(t *testing.T) { + predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate} + nodes := makeNodeList([]string{"3", "2", "1"}) + scheduler := makeScheduler(predicates, nodes) + + _, predicateMap, err := scheduler.findNodesThatFit(&v1.Pod{}, nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -452,9 +471,9 @@ func TestFindFitAllError(t *testing.T) { } for _, node := range nodes { - failures, found := predicateMap[node] + failures, found := predicateMap[node.Name] if !found { - t.Errorf("failed to find node: %s in %v", node, predicateMap) + t.Errorf("failed to find node: %s in %v", node.Name, predicateMap) } if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate { t.Errorf("unexpected failures: %v", failures) @@ -463,20 +482,13 @@ func TestFindFitAllError(t *testing.T) { } func TestFindFitSomeError(t *testing.T) { - algorithmpredicates.SetPredicatesOrdering(order) - nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate} - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - nodeNameToInfo := map[string]*schedulercache.NodeInfo{ - "3": schedulercache.NewNodeInfo(), - "2": schedulercache.NewNodeInfo(), - "1": schedulercache.NewNodeInfo(pod), - } - for name := range nodeNameToInfo { - nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) - } + nodes := makeNodeList([]string{"3", "2", "1"}) + scheduler := makeScheduler(predicates, nodes) + + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} + _, predicateMap, err := scheduler.findNodesThatFit(pod, nodes) - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -486,12 +498,12 @@ func TestFindFitSomeError(t *testing.T) { } for _, node := range nodes { - if node == pod.Name { + if node.Name == pod.Name { continue } - failures, found := predicateMap[node] + failures, found := predicateMap[node.Name] if !found { - t.Errorf("failed to find node: %s in %v", node, predicateMap) + t.Errorf("failed to find node: %s in %v", node.Name, predicateMap) } if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate { t.Errorf("unexpected failures: %v", failures) diff --git a/pkg/scheduler/schedulercache/BUILD b/pkg/scheduler/schedulercache/BUILD index 7178d4ddc6d..28ca09987f4 100644 --- a/pkg/scheduler/schedulercache/BUILD +++ b/pkg/scheduler/schedulercache/BUILD @@ -43,6 +43,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index fd013a5b19b..483558ed1d8 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -460,6 +460,13 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi return pdbs, nil } +func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool { + cache.mu.Lock() + defer cache.mu.Unlock() + node, ok := cache.nodes[n.Node().Name] + return ok && n.generation == node.generation +} + func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index 99b9bea958a..69cd53a8b64 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" @@ -1241,3 +1242,26 @@ func TestPDBOperations(t *testing.T) { } } } + +func TestIsUpToDate(t *testing.T) { + cache := New(time.Duration(0), wait.NeverStop) + if err := cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}); err != nil { + t.Errorf("Could not add node: %v", err) + } + s := cache.Snapshot() + node := s.Nodes["n1"] + if !cache.IsUpToDate(node) { + t.Errorf("Node incorrectly marked as stale") + } + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", UID: "p1"}, Spec: v1.PodSpec{NodeName: "n1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod: %v", err) + } + if cache.IsUpToDate(node) { + t.Errorf("Node incorrectly marked as up to date") + } + badNode := &NodeInfo{node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}} + if cache.IsUpToDate(badNode) { + t.Errorf("Nonexistant node incorrectly marked as up to date") + } +} diff --git a/pkg/scheduler/schedulercache/interface.go b/pkg/scheduler/schedulercache/interface.go index 8e4f4909c75..475d40fe6ad 100644 --- a/pkg/scheduler/schedulercache/interface.go +++ b/pkg/scheduler/schedulercache/interface.go @@ -122,6 +122,9 @@ type Cache interface { // Snapshot takes a snapshot on current cache Snapshot() *Snapshot + + // IsUpToDate returns true if the given NodeInfo matches the current data in the cache. + IsUpToDate(n *NodeInfo) bool } // Snapshot is a snapshot of cache state diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index e958a14b2f7..9a78493eed8 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -105,3 +105,6 @@ func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector label func (f *FakeCache) Snapshot() *schedulercache.Snapshot { return &schedulercache.Snapshot{} } + +// IsUpToDate is a fake mthod for testing +func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 39467962017..39852279532 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -54,8 +54,6 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) -const enableEquivalenceCache = true - type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) type nodeStateManager struct {