diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index fcb2c9455a7..e07d6b896e2 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -18,13 +18,19 @@ package core import ( "reflect" + "sync" "testing" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + algorithmpredicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/pkg/scheduler/schedulercache" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) type predicateItemType struct { @@ -649,3 +655,101 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { } } } + +// syncingMockCache delegates method calls to an actual Cache, +// but calls to UpdateNodeNameToInfoMap synchronize with the test. +type syncingMockCache struct { + schedulercache.Cache + cycleStart, cacheInvalidated chan struct{} + once sync.Once +} + +// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it +// synchronizes with the test. +// +// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use +// this point to signal to the test that a scheduling cycle has started. +func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error { + err := c.Cache.UpdateNodeNameToInfoMap(infoMap) + c.once.Do(func() { + c.cycleStart <- struct{}{} + <-c.cacheInvalidated + }) + return err +} + +// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly +// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event +// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired. +func TestEquivalenceCacheInvalidationRace(t *testing.T) { + // Create a predicate that returns false the first time and true on subsequent calls. + podWillFit := false + var callCount int + testPredicate := func(pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + callCount++ + if !podWillFit { + podWillFit = true + return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil + } + return true, nil, nil + } + + // Set up the mock cache. + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}) + mockCache := &syncingMockCache{ + Cache: cache, + cycleStart: make(chan struct{}), + cacheInvalidated: make(chan struct{}), + } + + fakeGetEquivalencePod := func(pod *v1.Pod) interface{} { return pod } + eCache := NewEquivalenceCache(fakeGetEquivalencePod) + // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before + // the equivalence cache would be updated. + go func() { + <-mockCache.cycleStart + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, + Spec: v1.PodSpec{NodeName: "machine1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod to cache: %v", err) + } + eCache.InvalidateAllCachedPredicateItemOfNode("machine1") + mockCache.cacheInvalidated <- struct{}{} + }() + + // Set up the scheduler. + predicates := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} + algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + scheduler := NewGenericScheduler( + mockCache, + eCache, + NewSchedulingQueue(), + predicates, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, pvcLister, true, false) + + // First scheduling attempt should fail. + nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} + machine, err := scheduler.Schedule(pod, nodeLister) + if machine != "" || err == nil { + t.Error("First scheduling attempt did not fail") + } + + // Second scheduling attempt should succeed because cache was invalidated. + _, err = scheduler.Schedule(pod, nodeLister) + if err != nil { + t.Errorf("Second scheduling attempt failed: %v", err) + } + if callCount != 2 { + t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) + } +} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 39467962017..39852279532 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -54,8 +54,6 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) -const enableEquivalenceCache = true - type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) type nodeStateManager struct {