From 02d657827c8bf6ed2062b905a2660c5f2c021f02 Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Fri, 20 Apr 2018 19:34:34 -0700 Subject: [PATCH 1/3] Test race condition in equivalence cache. Add a unit test that invalidates equivalence cache during a scheduling cycle. This exercises the bug described in https://github.com/kubernetes/kubernetes/issues/62921 --- pkg/scheduler/core/equivalence_cache_test.go | 104 +++++++++++++++++++ test/integration/scheduler/scheduler_test.go | 2 - 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index fcb2c9455a7..e07d6b896e2 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -18,13 +18,19 @@ package core import ( "reflect" + "sync" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + algorithmpredicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) type predicateItemType struct { @@ -649,3 +655,101 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { } } } + +// syncingMockCache delegates method calls to an actual Cache, +// but calls to UpdateNodeNameToInfoMap synchronize with the test. +type syncingMockCache struct { + schedulercache.Cache + cycleStart, cacheInvalidated chan struct{} + once sync.Once +} + +// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it +// synchronizes with the test. +// +// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use +// this point to signal to the test that a scheduling cycle has started. +func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { + err := c.Cache.UpdateNodeNameToInfoMap(infoMap) + c.once.Do(func() { + c.cycleStart <- struct{}{} + <-c.cacheInvalidated + }) + return err +} + +// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly +// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event +// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired. +func TestEquivalenceCacheInvalidationRace(t *testing.T) { + // Create a predicate that returns false the first time and true on subsequent calls. + podWillFit := false + var callCount int + testPredicate := func(pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + callCount++ + if !podWillFit { + podWillFit = true + return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil + } + return true, nil, nil + } + + // Set up the mock cache. + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}) + mockCache := &syncingMockCache{ + Cache: cache, + cycleStart: make(chan struct{}), + cacheInvalidated: make(chan struct{}), + } + + fakeGetEquivalencePod := func(pod *v1.Pod) interface{} { return pod } + eCache := NewEquivalenceCache(fakeGetEquivalencePod) + // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before + // the equivalence cache would be updated. + go func() { + <-mockCache.cycleStart + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, + Spec: v1.PodSpec{NodeName: "machine1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod to cache: %v", err) + } + eCache.InvalidateAllCachedPredicateItemOfNode("machine1") + mockCache.cacheInvalidated <- struct{}{} + }() + + // Set up the scheduler. + predicates := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} + algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + scheduler := NewGenericScheduler( + mockCache, + eCache, + NewSchedulingQueue(), + predicates, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, pvcLister, true, false) + + // First scheduling attempt should fail. + nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} + machine, err := scheduler.Schedule(pod, nodeLister) + if machine != "" || err == nil { + t.Error("First scheduling attempt did not fail") + } + + // Second scheduling attempt should succeed because cache was invalidated. + _, err = scheduler.Schedule(pod, nodeLister) + if err != nil { + t.Errorf("Second scheduling attempt failed: %v", err) + } + if callCount != 2 { + t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) + } +} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 39467962017..39852279532 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -54,8 +54,6 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) -const enableEquivalenceCache = true - type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) type nodeStateManager struct { From ca7bfc02ee085467e97b123dc924c016daa2b690 Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Mon, 23 Apr 2018 15:19:12 -0700 Subject: [PATCH 2/3] Add IsUpTodate() to Cache interface. This allows scheduler implementations to check if a NodeInfo object matches the current state of the cache. Useful if the NodeInfo in question came from a Snapshot() for example. --- pkg/scheduler/schedulercache/BUILD | 1 + pkg/scheduler/schedulercache/cache.go | 7 +++++++ pkg/scheduler/schedulercache/cache_test.go | 24 ++++++++++++++++++++++ pkg/scheduler/schedulercache/interface.go | 3 +++ pkg/scheduler/testing/fake_cache.go | 3 +++ 5 files changed, 38 insertions(+) diff --git a/pkg/scheduler/schedulercache/BUILD b/pkg/scheduler/schedulercache/BUILD index 7178d4ddc6d..28ca09987f4 100644 --- a/pkg/scheduler/schedulercache/BUILD +++ b/pkg/scheduler/schedulercache/BUILD @@ -43,6 +43,7 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/scheduler/schedulercache/cache.go b/pkg/scheduler/schedulercache/cache.go index fd013a5b19b..483558ed1d8 100644 --- a/pkg/scheduler/schedulercache/cache.go +++ b/pkg/scheduler/schedulercache/cache.go @@ -460,6 +460,13 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi return pdbs, nil } +func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool { + cache.mu.Lock() + defer cache.mu.Unlock() + node, ok := cache.nodes[n.Node().Name] + return ok && n.generation == node.generation +} + func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index 99b9bea958a..69cd53a8b64 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" @@ -1241,3 +1242,26 @@ func TestPDBOperations(t *testing.T) { } } } + +func TestIsUpToDate(t *testing.T) { + cache := New(time.Duration(0), wait.NeverStop) + if err := cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}); err != nil { + t.Errorf("Could not add node: %v", err) + } + s := cache.Snapshot() + node := s.Nodes["n1"] + if !cache.IsUpToDate(node) { + t.Errorf("Node incorrectly marked as stale") + } + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", UID: "p1"}, Spec: v1.PodSpec{NodeName: "n1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod: %v", err) + } + if cache.IsUpToDate(node) { + t.Errorf("Node incorrectly marked as up to date") + } + badNode := &NodeInfo{node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}} + if cache.IsUpToDate(badNode) { + t.Errorf("Nonexistant node incorrectly marked as up to date") + } +} diff --git a/pkg/scheduler/schedulercache/interface.go b/pkg/scheduler/schedulercache/interface.go index 8e4f4909c75..475d40fe6ad 100644 --- a/pkg/scheduler/schedulercache/interface.go +++ b/pkg/scheduler/schedulercache/interface.go @@ -122,6 +122,9 @@ type Cache interface { // Snapshot takes a snapshot on current cache Snapshot() *Snapshot + + // IsUpToDate returns true if the given NodeInfo matches the current data in the cache. + IsUpToDate(n *NodeInfo) bool } // Snapshot is a snapshot of cache state diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index e958a14b2f7..9a78493eed8 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -105,3 +105,6 @@ func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector label func (f *FakeCache) Snapshot() *schedulercache.Snapshot { return &schedulercache.Snapshot{} } + +// IsUpToDate is a fake mthod for testing +func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true } From dacc1a8d52319cf380d15312bd44c73f54f6e9a9 Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Mon, 23 Apr 2018 14:24:00 -0700 Subject: [PATCH 3/3] Check for old NodeInfo when updating equiv. cache. Because the scheduler takes a snapshot of cache data at the start of each scheduling cycle, updates to the equivalence cache should be skipped if there was a cache update during the cycle. If the current NodeInfo becomes stale while we evaluate predicates, we will not write any results into the equivalence cache. We will still use the results for the current scheduling cycle, though. --- pkg/scheduler/core/generic_scheduler.go | 55 ++++++++--------- pkg/scheduler/core/generic_scheduler_test.go | 62 ++++++++++++-------- 2 files changed, 62 insertions(+), 55 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 3f7567f92b1..5b2f76dae89 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -128,7 +128,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister trace.Step("Computing predicates") startPredicateEvalTime := time.Now() - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates) + filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes) if err != nil { return "", err } @@ -325,21 +325,11 @@ func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName s // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func findNodesThatFit( - pod *v1.Pod, - nodeNameToInfo map[string]*schedulercache.NodeInfo, - nodes []*v1.Node, - predicateFuncs map[string]algorithm.FitPredicate, - extenders []algorithm.SchedulerExtender, - metadataProducer algorithm.PredicateMetadataProducer, - ecache *EquivalenceCache, - schedulingQueue SchedulingQueue, - alwaysCheckAllPredicates bool, -) ([]*v1.Node, FailedPredicateMap, error) { +func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} - if len(predicateFuncs) == 0 { + if len(g.predicates) == 0 { filtered = nodes } else { // Create filtered list with enough space to avoid growing it @@ -350,12 +340,12 @@ func findNodesThatFit( var filteredLen int32 // We can use the same metadata producer for all nodes. - meta := metadataProducer(pod, nodeNameToInfo) + meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) var equivCacheInfo *equivalenceClassInfo - if ecache != nil { + if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found - equivCacheInfo = ecache.getEquivalenceClassInfo(pod) + equivCacheInfo = g.equivalenceCache.getEquivalenceClassInfo(pod) } checkNode := func(i int) { @@ -363,11 +353,12 @@ func findNodesThatFit( fits, failedPredicates, err := podFitsOnNode( pod, meta, - nodeNameToInfo[nodeName], - predicateFuncs, - ecache, - schedulingQueue, - alwaysCheckAllPredicates, + g.cachedNodeInfoMap[nodeName], + g.predicates, + g.cache, + g.equivalenceCache, + g.schedulingQueue, + g.alwaysCheckAllPredicates, equivCacheInfo, ) if err != nil { @@ -391,12 +382,12 @@ func findNodesThatFit( } } - if len(filtered) > 0 && len(extenders) != 0 { - for _, extender := range extenders { + if len(filtered) > 0 && len(g.extenders) != 0 { + for _, extender := range g.extenders { if !extender.IsInterested(pod) { continue } - filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo) + filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap) if err != nil { if extender.IsIgnorable() { glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", @@ -467,6 +458,7 @@ func podFitsOnNode( meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, + cache schedulercache.Cache, ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, @@ -548,10 +540,13 @@ func podFitsOnNode( } else { predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons} } - result := predicateResults[predicateKey] - ecache.UpdateCachedPredicateItem( - pod.GetName(), info.Node().GetName(), - predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) + // Skip update if NodeInfo is stale. + if cache != nil && cache.IsUpToDate(info) { + result := predicateResults[predicateKey] + ecache.UpdateCachedPredicateItem( + pod.GetName(), info.Node().GetName(), + predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false) + } } } }() @@ -976,7 +971,7 @@ func selectVictimsOnNode( // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. - if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits { + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -990,7 +985,7 @@ func selectVictimsOnNode( violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) reprievePod := func(p *v1.Pod) bool { addPod(p) - fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil) if !fits { removePod(p) victims = append(victims, p) diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 83efdad37db..01c6d331ff5 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -432,16 +432,35 @@ func TestGenericScheduler(t *testing.T) { } } -func TestFindFitAllError(t *testing.T) { +// makeScheduler makes a simple genericScheduler for testing. +func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler { algorithmpredicates.SetPredicatesOrdering(order) - nodes := []string{"3", "2", "1"} - predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate} - nodeNameToInfo := map[string]*schedulercache.NodeInfo{ - "3": schedulercache.NewNodeInfo(), - "2": schedulercache.NewNodeInfo(), - "1": schedulercache.NewNodeInfo(), + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + for _, n := range nodes { + cache.AddNode(n) } - _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + + s := NewGenericScheduler( + cache, + nil, + NewSchedulingQueue(), + predicates, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, nil, false, false) + cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) + return s.(*genericScheduler) + +} + +func TestFindFitAllError(t *testing.T) { + predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate} + nodes := makeNodeList([]string{"3", "2", "1"}) + scheduler := makeScheduler(predicates, nodes) + + _, predicateMap, err := scheduler.findNodesThatFit(&v1.Pod{}, nodes) if err != nil { t.Errorf("unexpected error: %v", err) @@ -452,9 +471,9 @@ func TestFindFitAllError(t *testing.T) { } for _, node := range nodes { - failures, found := predicateMap[node] + failures, found := predicateMap[node.Name] if !found { - t.Errorf("failed to find node: %s in %v", node, predicateMap) + t.Errorf("failed to find node: %s in %v", node.Name, predicateMap) } if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate { t.Errorf("unexpected failures: %v", failures) @@ -463,20 +482,13 @@ func TestFindFitAllError(t *testing.T) { } func TestFindFitSomeError(t *testing.T) { - algorithmpredicates.SetPredicatesOrdering(order) - nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate} - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} - nodeNameToInfo := map[string]*schedulercache.NodeInfo{ - "3": schedulercache.NewNodeInfo(), - "2": schedulercache.NewNodeInfo(), - "1": schedulercache.NewNodeInfo(pod), - } - for name := range nodeNameToInfo { - nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) - } + nodes := makeNodeList([]string{"3", "2", "1"}) + scheduler := makeScheduler(predicates, nodes) + + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} + _, predicateMap, err := scheduler.findNodesThatFit(pod, nodes) - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -486,12 +498,12 @@ func TestFindFitSomeError(t *testing.T) { } for _, node := range nodes { - if node == pod.Name { + if node.Name == pod.Name { continue } - failures, found := predicateMap[node] + failures, found := predicateMap[node.Name] if !found { - t.Errorf("failed to find node: %s in %v", node, predicateMap) + t.Errorf("failed to find node: %s in %v", node.Name, predicateMap) } if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate { t.Errorf("unexpected failures: %v", failures)