Merge pull request #63040 from misterikkit/ecache-race

Automatic merge from submit-queue (batch tested with PRs 62432, 62868, 63040). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

scheduler: fix race condition in equivalence cache

**What this PR does / why we need it**:
This adds an equivalence cache test to exercise the race condition observed in https://github.com/kubernetes/kubernetes/issues/62921 and then fixes the race.

The `Cache` interface needed a new method to check whether a `NodeInfo` is stale, and `genericScheduler` needed some plumbing to make the `Cache` object available to `podFitsOnNode()`.

The solution is, right before writing to the eCache, check the scheduler cache to see if the current `NodeInfo` object is out of date. If the node is out of date, then don't write to the eCache. If the `NodeInfo` is stale, it is because of a cache update that should also invalidate the eCache entry. That invalidation either happens before `podFitsOnNode()` acquires the eCache lock (original bug, so we don't do the write) or blocks until we release that lock (removing the potentially bad entry).

Fixes #62921 

**Special notes for your reviewer**:

**Release note**:

equivalence cache is still alpha, so no release note.
```release-note
NONE
```
/sig scheduling
/assign bsalalamat
/assign resouer
This commit is contained in:
Kubernetes Submit Queue 2018-04-25 12:56:14 -07:00 committed by GitHub
commit 6251402266
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 204 additions and 57 deletions

View File

@ -18,13 +18,19 @@ package core
import (
"reflect"
"sync"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
algorithmpredicates "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)
type predicateItemType struct {
@ -649,3 +655,101 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
}
}
}
// syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct {
schedulercache.Cache
cycleStart, cacheInvalidated chan struct{}
once sync.Once
}
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
// synchronizes with the test.
//
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
// this point to signal to the test that a scheduling cycle has started.
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
c.once.Do(func() {
c.cycleStart <- struct{}{}
<-c.cacheInvalidated
})
return err
}
// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
func TestEquivalenceCacheInvalidationRace(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
podWillFit := false
var callCount int
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),
cacheInvalidated: make(chan struct{}),
}
fakeGetEquivalencePod := func(pod *v1.Pod) interface{} { return pod }
eCache := NewEquivalenceCache(fakeGetEquivalencePod)
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-mockCache.cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllCachedPredicateItemOfNode("machine1")
mockCache.cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
predicates := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler(
mockCache,
eCache,
NewSchedulingQueue(),
predicates,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -128,7 +128,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
trace.Step("Computing predicates")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates)
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
return "", err
}
@ -325,21 +325,11 @@ func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName s
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func findNodesThatFit(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*v1.Node,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
if len(predicateFuncs) == 0 {
if len(g.predicates) == 0 {
filtered = nodes
} else {
// Create filtered list with enough space to avoid growing it
@ -350,12 +340,12 @@ func findNodesThatFit(
var filteredLen int32
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
var equivCacheInfo *equivalenceClassInfo
if ecache != nil {
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivCacheInfo = ecache.getEquivalenceClassInfo(pod)
equivCacheInfo = g.equivalenceCache.getEquivalenceClassInfo(pod)
}
checkNode := func(i int) {
@ -363,11 +353,12 @@ func findNodesThatFit(
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
nodeNameToInfo[nodeName],
predicateFuncs,
ecache,
schedulingQueue,
alwaysCheckAllPredicates,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.cache,
g.equivalenceCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivCacheInfo,
)
if err != nil {
@ -391,12 +382,12 @@ func findNodesThatFit(
}
}
if len(filtered) > 0 && len(extenders) != 0 {
for _, extender := range extenders {
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo)
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
if err != nil {
if extender.IsIgnorable() {
glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@ -467,6 +458,7 @@ func podFitsOnNode(
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache,
ecache *EquivalenceCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
@ -548,10 +540,13 @@ func podFitsOnNode(
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
result := predicateResults[predicateKey]
ecache.UpdateCachedPredicateItem(
pod.GetName(), info.Node().GetName(),
predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
// Skip update if NodeInfo is stale.
if cache != nil && cache.IsUpToDate(info) {
result := predicateResults[predicateKey]
ecache.UpdateCachedPredicateItem(
pod.GetName(), info.Node().GetName(),
predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
}
}
}
}()
@ -976,7 +971,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@ -990,7 +985,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil)
if !fits {
removePod(p)
victims = append(victims, p)

View File

@ -432,16 +432,35 @@ func TestGenericScheduler(t *testing.T) {
}
}
func TestFindFitAllError(t *testing.T) {
// makeScheduler makes a simple genericScheduler for testing.
func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Node) *genericScheduler {
algorithmpredicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate}
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
"3": schedulercache.NewNodeInfo(),
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes {
cache.AddNode(n)
}
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
s := NewGenericScheduler(
cache,
nil,
NewSchedulingQueue(),
predicates,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, nil, false, false)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
return s.(*genericScheduler)
}
func TestFindFitAllError(t *testing.T) {
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate}
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)
_, predicateMap, err := scheduler.findNodesThatFit(&v1.Pod{}, nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -452,9 +471,9 @@ func TestFindFitAllError(t *testing.T) {
}
for _, node := range nodes {
failures, found := predicateMap[node]
failures, found := predicateMap[node.Name]
if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap)
t.Errorf("failed to find node: %s in %v", node.Name, predicateMap)
}
if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate {
t.Errorf("unexpected failures: %v", failures)
@ -463,20 +482,13 @@ func TestFindFitAllError(t *testing.T) {
}
func TestFindFitSomeError(t *testing.T) {
algorithmpredicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
"3": schedulercache.NewNodeInfo(),
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(pod),
}
for name := range nodeNameToInfo {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, predicateMap, err := scheduler.findNodesThatFit(pod, nodes)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -486,12 +498,12 @@ func TestFindFitSomeError(t *testing.T) {
}
for _, node := range nodes {
if node == pod.Name {
if node.Name == pod.Name {
continue
}
failures, found := predicateMap[node]
failures, found := predicateMap[node.Name]
if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap)
t.Errorf("failed to find node: %s in %v", node.Name, predicateMap)
}
if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate {
t.Errorf("unexpected failures: %v", failures)

View File

@ -43,6 +43,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -460,6 +460,13 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi
return pdbs, nil
}
func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool {
cache.mu.Lock()
defer cache.mu.Unlock()
node, ok := cache.nodes[n.Node().Name]
return ok && n.generation == node.generation
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
@ -1241,3 +1242,26 @@ func TestPDBOperations(t *testing.T) {
}
}
}
func TestIsUpToDate(t *testing.T) {
cache := New(time.Duration(0), wait.NeverStop)
if err := cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}); err != nil {
t.Errorf("Could not add node: %v", err)
}
s := cache.Snapshot()
node := s.Nodes["n1"]
if !cache.IsUpToDate(node) {
t.Errorf("Node incorrectly marked as stale")
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", UID: "p1"}, Spec: v1.PodSpec{NodeName: "n1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod: %v", err)
}
if cache.IsUpToDate(node) {
t.Errorf("Node incorrectly marked as up to date")
}
badNode := &NodeInfo{node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}}
if cache.IsUpToDate(badNode) {
t.Errorf("Nonexistant node incorrectly marked as up to date")
}
}

View File

@ -122,6 +122,9 @@ type Cache interface {
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
// IsUpToDate returns true if the given NodeInfo matches the current data in the cache.
IsUpToDate(n *NodeInfo) bool
}
// Snapshot is a snapshot of cache state

View File

@ -105,3 +105,6 @@ func (f *FakeCache) FilteredList(filter schedulercache.PodFilter, selector label
func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
return &schedulercache.Snapshot{}
}
// IsUpToDate is a fake mthod for testing
func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true }

View File

@ -54,8 +54,6 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)
const enableEquivalenceCache = true
type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
type nodeStateManager struct {